diff --git a/apps/dashboard-app/pom.xml b/apps/dashboard-app/pom.xml index a2047af5f..a2d983c8e 100644 --- a/apps/dashboard-app/pom.xml +++ b/apps/dashboard-app/pom.xml @@ -1,207 +1,212 @@ shrine-base net.shrine 1.23.5.1-SNAPSHOT ../../pom.xml 4.0.0 dashboard-app 1.23.5.1-SNAPSHOT Dashboard App jar net.alchim31.maven scala-maven-plugin com.github.eirslett frontend-maven-plugin 1.0 src/main/js install node and npm install-node-and-npm v6.10.3 4.1.2 npm install npm generate-resources install -s bower install bower install -s grunt default grunt --no-color log4j log4j io.spray spray-routing_2.11 ${spray-version} io.spray spray-servlet_2.11 ${spray-version} io.spray spray-util_2.11 ${spray-version} io.spray spray-testkit_2.11 ${spray-version} test com.typesafe.akka akka-actor_2.11 ${akka-version} com.typesafe.akka akka-slf4j_2.11 ${akka-version} com.typesafe.akka akka-testkit_2.11 ${akka-testkit-version} test org.json4s json4s-native_2.11 ${json4s-version} com.typesafe.slick slick_2.11 ${slick-version} org.slf4j slf4j-log4j12 ${slf4j-version} com.h2database h2 ${h2-version} test net.shrine shrine-protocol ${project.version} net.shrine shrine-utility-commons ${project.version} net.shrine shrine-crypto ${project.version} test-jar test net.shrine shrine-auth ${project.version} net.shrine shrine-data-commons ${project.version} + + net.shrine + shrine-mom + ${project.version} + mysql mysql-connector-java ${mysql-version} io.jsonwebtoken jjwt ${jjwt-version} net.sourceforge.jtds jtds ${jtds-version} net.shrine shrine-adapter-client-api ${project.version} com.typesafe config ${typesafe-config-version} diff --git a/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala b/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala index e40f2e7cf..b7a9d12c6 100644 --- a/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala +++ b/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala @@ -1,207 +1,134 @@ package net.shrine.dashboard.httpclient import java.io.InputStream import java.security.cert.X509Certificate import javax.net.ssl.{SSLContext, X509TrustManager} - +import net.shrine.mom.HttpClient import net.shrine.log.Loggable import spray.can.Http import akka.io.IO import akka.actor.{ActorRef, ActorSystem} import spray.can.Http.{ConnectionAttemptFailedException, HostConnectorSetup} import spray.http.{HttpCredentials, HttpEntity, HttpHeader, HttpHeaders, HttpRequest, HttpResponse, StatusCodes, Uri} import spray.io.ClientSSLEngineProvider import spray.routing.{RequestContext, Route} import akka.pattern.ask import net.shrine.source.ConfigSource import scala.concurrent.{Await, Future, TimeoutException, blocking} import scala.concurrent.duration.DurationInt import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.util.control.NonFatal /** * From https://github.com/bthuillier/spray/commit/d31fc1b5e1415e1b908fe7d1f01f364a727e2593 with extra bits from http://www.cakesolutions.net/teamblogs/http-proxy-with-spray . * Replace when Spray has its own version. * * @author david * @since 9/14/15 */ trait HttpClientDirectives extends Loggable { implicit val system = ActorSystem("dashboardServer",ConfigSource.config) /** * Proxy the request to the specified base uri appended with the unmatched path. * */ def forwardUnmatchedPath(baseUri: Uri,maybeCredentials:Option[HttpCredentials] = None): Route = { def completeWithEntityAsString(httpResponse:HttpResponse,uri:Uri):Route = { ctx => { ctx.complete(httpResponse.entity.asString) } } requestWithUnmatchedPath(baseUri,completeWithEntityAsString,maybeCredentials) } /** * Make the request to the specified base uri appended with the unmatched path, then use the returned entity (as a string) to complete the route. * */ def requestWithUnmatchedPath(baseUri:Uri, route:(HttpResponse,Uri) => Route,maybeCredentials:Option[HttpCredentials] = None): Route = { ctx => { val resourceUri = baseUri.withPath(baseUri.path.++(ctx.unmatchedPath)).withQuery(ctx.request.uri.query) requestUriThenRoute(resourceUri,route,maybeCredentials)(ctx) } } /** * Just pass the result through */ def passThrough(httpResponse: HttpResponse,uri: Uri):Route = ctx => ctx.complete(httpResponse.entity.asString) /** * proxy the request to the specified uri with the unmatched path, then use the returned entity (as a string) to complete the route. * */ def requestUriThenRoute( resourceUri:Uri, route:(HttpResponse,Uri) => Route = passThrough, maybeCredentials:Option[HttpCredentials] = None ): Route = { ctx => { val httpResponse = httpResponseForUri(resourceUri,ctx,maybeCredentials) info(s"Got $httpResponse for $resourceUri") handleCommonErrorsOrRoute(route)(httpResponse,resourceUri)(ctx) } } private def httpResponseForUri(resourceUri:Uri,ctx: RequestContext,maybeCredentials:Option[HttpCredentials] = None):HttpResponse = { info(s"Requesting $resourceUri") if(resourceUri.scheme == "classpath") ClasspathResourceHttpClient.loadFromResource(resourceUri.path.toString()) else { val basicRequest = HttpRequest(ctx.request.method,resourceUri) val request = maybeCredentials.fold(basicRequest){ (credentials: HttpCredentials) => val headers: List[HttpHeader] = basicRequest.headers :+ HttpHeaders.Authorization(credentials) basicRequest.copy(headers = headers) } HttpClient.webApiCall(request) } } def handleCommonErrorsOrRoute(route:(HttpResponse,Uri) => Route)(httpResponse: HttpResponse,uri:Uri): Route = { ctx => { if(httpResponse.status != StatusCodes.OK) { //todo create and report a problem val ctxCopy: RequestContext = ctx.withHttpResponseMapped(_.copy(status = httpResponse.status)) ctxCopy.complete(s"$uri replied with $httpResponse") } else route(httpResponse,uri)(ctx) } } } object HttpClientDirectives extends HttpClientDirectives - -/** - * A simple HttpClient to use inside the HttpDirectives - */ -object HttpClient extends Loggable { - - //todo hand back a Try, Failures with custom exceptions instead of a crappy response - def webApiCall(request:HttpRequest)(implicit system: ActorSystem): HttpResponse = { - val transport: ActorRef = IO(Http)(system) - - debug(s"Requesting $request uri is ${request.uri} path is ${request.uri.path}") - blocking { - val future:Future[HttpResponse] = for { - Http.HostConnectorInfo(connector, _) <- transport.ask(createConnector(request))(10 seconds) //todo make this timeout configurable - response <- connector.ask(request)(10 seconds).mapTo[HttpResponse] //todo make this timeout configurable - } yield response - try { - Await.result(future, 10 seconds) //todo make this timeout configurable - } - catch { - case x:TimeoutException => HttpResponse(status = StatusCodes.RequestTimeout,entity = HttpEntity(s"${request.uri} timed out after 10 seconds. ${x.getMessage}")) - //todo is there a better message? What comes up in real life? - case x:ConnectionAttemptFailedException => { - //no web service is there to respond - info(s"${request.uri} failed with ${x.getMessage}",x) - HttpResponse(status = StatusCodes.NotFound,entity = HttpEntity(s"${request.uri} failed with ${x.getMessage}")) - } - case NonFatal(x) => { - info(s"${request.uri} failed with ${x.getMessage}",x) - HttpResponse(status = StatusCodes.InternalServerError,entity = HttpEntity(s"${request.uri} failed with ${x.getMessage}")) - } - } - } - } - -//from https://github.com/TimothyKlim/spray-ssl-poc/blob/master/src/main/scala/Main.scala -//trust all SSL contexts. We just want encrypted comms. - implicit val trustfulSslContext: SSLContext = { - - class IgnoreX509TrustManager extends X509TrustManager { - def checkClientTrusted(chain: Array[X509Certificate], authType: String) {} - - def checkServerTrusted(chain: Array[X509Certificate], authType: String) {} - - def getAcceptedIssuers = null - } - - val context = SSLContext.getInstance("TLS") - context.init(null, Array(new IgnoreX509TrustManager), null) - info("trustfulSslContex initialized") - context - } - - implicit val clientSSLEngineProvider = - //todo lookup this constructor - ClientSSLEngineProvider { - _ => - val engine = trustfulSslContext.createSSLEngine() - engine.setUseClientMode(true) - engine - } - - def createConnector(request: HttpRequest) = { - - val connector = new HostConnectorSetup(host = request.uri.authority.host.toString, - port = request.uri.effectivePort, - sslEncryption = request.uri.scheme == "https", - defaultHeaders = request.headers) - connector - } - -} - /** * For testing, get an HttpResponse for a classpath resource */ object ClasspathResourceHttpClient extends Loggable { def loadFromResource(resourceName:String):HttpResponse = { blocking { val cleanResourceName = if (resourceName.startsWith ("/") ) resourceName.drop(1) else resourceName val classLoader = getClass.getClassLoader try { val is: InputStream = classLoader.getResourceAsStream (cleanResourceName) val string:String = scala.io.Source.fromInputStream (is).mkString HttpResponse(entity = HttpEntity(string)) } catch{ case NonFatal(x) => { info(s"Could not load $resourceName",x) HttpResponse(status = StatusCodes.NotFound,entity = HttpEntity(s"Could not load $resourceName due to ${x.getMessage}")) } } } } } \ No newline at end of file diff --git a/apps/meta-app/pom.xml b/apps/meta-app/pom.xml index 5d1fd225f..d7ccfe292 100644 --- a/apps/meta-app/pom.xml +++ b/apps/meta-app/pom.xml @@ -1,129 +1,129 @@ shrine-base net.shrine 1.23.5.1-SNAPSHOT ../../pom.xml 4.0.0 meta-app MetaData App jar net.alchim31.maven scala-maven-plugin com.propensive rapture-json_2.11 ${rapture-version} com.propensive rapture-json-jawn_2.11 ${rapture-version} - + net.shrine shrine-utility-commons ${project.version} net.shrine shrine-config ${project.version} net.shrine shrine-auth ${project.version} net.shrine shrine-mom ${project.version} net.shrine shrine-qep ${project.version} log4j log4j io.spray spray-routing_2.11 ${spray-version} io.spray spray-servlet_2.11 ${spray-version} io.spray spray-util_2.11 ${spray-version} io.spray spray-testkit_2.11 ${spray-version} test com.typesafe.akka akka-actor_2.11 ${akka-version} com.typesafe.akka akka-slf4j_2.11 ${akka-version} com.typesafe.akka akka-testkit_2.11 ${akka-testkit-version} test org.json4s json4s-native_2.11 ${json4s-version} com.typesafe config ${typesafe-config-version} mysql mysql-connector-java ${mysql-version} com.h2database h2 test org.slf4j slf4j-log4j12 ${slf4j-version} test \ No newline at end of file diff --git a/apps/meta-app/src/main/resources/reference.conf b/apps/meta-app/src/main/resources/reference.conf index 5b22fe2c1..f7b112866 100644 --- a/apps/meta-app/src/main/resources/reference.conf +++ b/apps/meta-app/src/main/resources/reference.conf @@ -1,21 +1,22 @@ shrine { metaData { ping = "pong" + hornetQMomUrl = "https://localhost:6443/shrine/mom" } } //todo typesafe config precedence seems to do the right thing, but I haven't found the rules that say this reference.conf should override others akka { loglevel = INFO // log-config-on-start = on loggers = ["akka.event.slf4j.Slf4jLogger"] // logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" // Toggles whether the threads created by this ActorSystem should be daemons or not daemonic = on } spray.servlet { boot-class = "net.shrine.metadata.Boot" request-timeout = 30s } diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebApi.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebApi.scala index c087df5be..a5d257101 100644 --- a/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebApi.scala +++ b/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebApi.scala @@ -1,100 +1,106 @@ package net.shrine.metadata import akka.event.Logging import net.shrine.log.Loggable import net.shrine.mom.{LocalHornetQMom, Message, MessageSerializer, Queue} import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization import org.json4s.native.Serialization.write import org.json4s.{JValue, NoTypeHints, _} import spray.http.StatusCodes import spray.routing.directives.LogEntry import spray.routing.{HttpService, Route} import scala.concurrent.duration.Duration /** * A web API that provides access to the internal HornetQMom library. * Allows client to createQueue, deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * Created by yifan on 7/24/17. */ trait HornetQMomWebApi extends HttpService with Loggable { def momRoute: Route = pathPrefix("mom") { put { createQueue ~ deleteQueue ~ sendMessage ~ acknowledge } ~ receiveMessage ~ getQueues } // SQS returns CreateQueueResult, which contains queueUrl: String def createQueue: Route = path("createQueue" / Segment) { queueName => val createdQueue: Queue = LocalHornetQMom.createQueueIfAbsent(queueName) implicit val formats = Serialization.formats(NoTypeHints) val response: String = write[Queue](createdQueue)(formats) respondWithStatus(StatusCodes.Created) { complete(response) } } // SQS takes in DeleteMessageRequest, which contains a queueUrl: String and a ReceiptHandle: String // returns a DeleteMessageResult, toString for debugging def deleteQueue: Route = path("deleteQueue" / Segment) { queueName => LocalHornetQMom.deleteQueue(queueName) complete(StatusCodes.OK) } // SQS sendMessage(String queueUrl, String messageBody) => SendMessageResult def sendMessage: Route = path("sendMessage" / Segment / Segment) { (messageContent, toQueue) => LocalHornetQMom.send(messageContent, Queue.apply(toQueue)) complete(StatusCodes.Accepted) } // SQS ReceiveMessageResult receiveMessage(String queueUrl) def receiveMessage: Route = get { path("receiveMessage" / Segment) { fromQueue => parameter('timeOutSeconds ? 20) { timeOutSeconds => val timeout: Duration = Duration.create(timeOutSeconds, "seconds") - val response: Message = LocalHornetQMom.receive(Queue.apply(fromQueue), timeout).get - implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer - respondWithStatus(StatusCodes.OK) { - complete(write[Message](response)(formats)) + val response: Option[Message] = LocalHornetQMom.receive(Queue.apply(fromQueue), timeout) + + response match { + case Some(response) => + implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer + respondWithStatus(StatusCodes.OK) { + complete(write[Message](response)(formats)) + } + case None => + complete(StatusCodes.NotFound) } } } } // SQS has DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) def acknowledge: Route = path("acknowledge") { entity(as[String]) { messageJSON => implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer val messageJValue: JValue = parse(messageJSON) try { val msg: Message = messageJValue.extract[Message](formats, manifest[Message]) LocalHornetQMom.completeMessage(msg) complete(StatusCodes.NoContent) } catch { case x => { LogEntry(s"\n Request: acknowledge/$messageJSON\n Response: $x", Logging.DebugLevel) throw x} } } } // Returns the names of the queues created on this server. Seq[Any] def getQueues: Route = path("getQueues") { get { val queues = LocalHornetQMom.queues implicit val formats = Serialization.formats(NoTypeHints) val response = write(queues) respondWithStatus(StatusCodes.OK) {complete(response)} } } } \ No newline at end of file diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebClient.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebClient.scala new file mode 100644 index 000000000..172f74766 --- /dev/null +++ b/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebClient.scala @@ -0,0 +1,120 @@ +package net.shrine.metadata + +import akka.actor.{ActorRef, ActorSystem} +import akka.event.Logging +import akka.io.IO +import net.shrine.log.Loggable +import net.shrine.mom.{HttpClient, Message, MessageQueueService, MessageSerializer, Queue} +import spray.can.Http +import spray.http.{HttpEntity, HttpMethods, HttpRequest, HttpResponse, StatusCode, StatusCodes, Uri} +import akka.pattern.ask +import net.shrine.source.ConfigSource +import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.Serialization +import org.json4s.native.Serialization.{read, write} +import spray.routing.directives.LogEntry + +import scala.collection.immutable.Seq +import scala.concurrent.duration.DurationInt +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration.Duration + +/** + * A simple HornetQMomWebClient that uses HornetQMomWebApi to createQueue, + * deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt + * + * @author yifan + * @since 8/10/17 + */ +object HornetQMomWebClient extends MessageQueueService with Loggable { + implicit val system = ActorSystem() + // implicit val system = ActorSystem("dashboardServer",ConfigSource.config) + import system.dispatcher + + val momUrl: String = ConfigSource.config.getString("shrine.metaData.hornetQMomUrl") + + + override def createQueueIfAbsent(queueName: String): Queue = { + val createQueueUrl = momUrl + s"/createQueue/$queueName" + val request: HttpRequest = HttpRequest(HttpMethods.PUT, createQueueUrl) + val response: HttpResponse = HttpClient.webApiCall(request) + if (response.status == StatusCodes.Created) { + val queueString = response.entity.asString + implicit val formats = Serialization.formats(NoTypeHints) + val queue: Queue = read[Queue](queueString)(formats, manifest[Queue]) + queue + } else { + handleUnsuccessfulRequest(request, response) + } + } + + override def deleteQueue(queueName: String): Unit = { + val deleteQueueUrl = momUrl + s"/deleteQueue/$queueName" + val request: HttpRequest = HttpRequest(HttpMethods.PUT, deleteQueueUrl) + val response: HttpResponse = HttpClient.webApiCall(request) // StatusCodes.OK + if (response.status != StatusCodes.OK) { + handleUnsuccessfulRequest(request, response) + } + } + + override def queues: Seq[Queue] = { + val getQueuesUrl = momUrl + s"/getQueues" + val request: HttpRequest = HttpRequest(HttpMethods.GET, getQueuesUrl) + + val response: HttpResponse = HttpClient.webApiCall(request) + if (response.status == StatusCodes.OK) { + val allQueues: String = response.entity.asString + implicit val formats = Serialization.formats(NoTypeHints) + val queues: Seq[Queue] = read[Seq[Queue]](allQueues)(formats, manifest[Seq[Queue]]) + queues + } else { + handleUnsuccessfulRequest(request, response) + } + } + + override def send(contents: String, to: Queue): Unit = { + val sendMessageUrl = momUrl + s"/sendMessage/$contents/$to" + val request: HttpRequest = HttpRequest(HttpMethods.PUT, sendMessageUrl) + val response: HttpResponse = HttpClient.webApiCall(request) + if (response.status != StatusCodes.Accepted) { + handleUnsuccessfulRequest(request, response) + } + } + + override def receive(from: Queue, timeout: Duration): Option[Message] = { + val seconds = timeout.toSeconds + val receiveMessageUrl = momUrl + s"/receiveMessage/$from?timeOutSeconds=$seconds" + val request: HttpRequest = HttpRequest(HttpMethods.GET, receiveMessageUrl) + val response: HttpResponse = HttpClient.webApiCall(request) + if (response.status == StatusCodes.OK) { + val responseString: String = response.entity.asString + implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer + val messageResponse: Message = read[Message](responseString)(formats, manifest[Message]) + val result: Option[Message] = Option(messageResponse) + result + } else { + handleUnsuccessfulRequest(request, response) + } + } + + override def completeMessage(message: Message): Unit = { + implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer + val messageString: String = write[Message](message)(formats) + + val entity: HttpEntity = HttpEntity(messageString) + val completeMessageUrl: String = momUrl + s"/acknowledge/$entity" // HttpEntity + val request: HttpRequest = HttpRequest(HttpMethods.PUT, completeMessageUrl) + val response: HttpResponse = HttpClient.webApiCall(request) + if (response.status != StatusCodes.NoContent) { + handleUnsuccessfulRequest(request, response) + } + } + + private def handleUnsuccessfulRequest(request: HttpRequest, response: HttpResponse) = { + LogEntry(s"\n Request: ${request.uri} failed with response status: ${response.status}" + + s"\n response entity: ${response.entity.asString}", Logging.DebugLevel) + throw new Exception(s"\n Request: ${request.uri} failed with response status: ${response.status}" + + s"\n response entity: ${response.entity.asString}") + } +} \ No newline at end of file diff --git a/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebApiTest.scala b/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebApiTest.scala index 7e060877d..2f5ad4019 100644 --- a/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebApiTest.scala +++ b/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebApiTest.scala @@ -1,74 +1,75 @@ +package net.shrine.metadata + import akka.actor.ActorRefFactory -import net.shrine.metadata.HornetQMomWebApi import net.shrine.mom.{Message, MessageSerializer, Queue} import org.json4s.NoTypeHints import org.json4s.native.Serialization import org.json4s.native.Serialization.read import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import spray.http.HttpEntity import spray.http.StatusCodes._ import spray.testkit.ScalatestRouteTest /** * Created by yifan on 7/27/17. */ @RunWith(classOf[JUnitRunner]) class HornetQMomWebApiTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi { override def actorRefFactory: ActorRefFactory = system private val queueName = "testQueue" private val messageContent = "testContent" private var receivedMessage: String = "" "RemoteHornetQMom" should "create/delete the given queue, send/receive message, get queues" in { Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { val response = new String(body.data.toByteArray) implicit val formats = Serialization.formats(NoTypeHints) val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) val responseQueueName = jsonToQueue.name assertResult(Created)(status) assertResult(queueName)(responseQueueName) } Put(s"/mom/sendMessage/$messageContent/$queueName") ~> momRoute ~> check { assertResult(Accepted)(status) } Get(s"/mom/getQueues") ~> momRoute ~> check { implicit val formats = Serialization.formats(NoTypeHints) val response: String = new String(body.data.toByteArray) val jsonToSeq: Seq[Queue] = read[Seq[Queue]](response, false)(formats, manifest[Seq[Queue]]) assertResult(OK)(status) assertResult(queueName)(jsonToSeq.head.name) } // given timeout is 2 seconds Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=2") ~> momRoute ~> check { val response = new String(body.data.toByteArray) receivedMessage = response implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer val responseToMessage: Message = read[Message](response)(formats, manifest[Message]) assertResult(OK)(status) assert(responseToMessage.isInstanceOf[Message]) } Put("/mom/acknowledge", HttpEntity(s"""$receivedMessage""")) ~> momRoute ~> check { implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer assertResult(NoContent)(status) } Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { assertResult(OK)(status) } } } \ No newline at end of file diff --git a/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebClientTest.scala b/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebClientTest.scala new file mode 100644 index 000000000..462fd548a --- /dev/null +++ b/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebClientTest.scala @@ -0,0 +1,115 @@ +package net.shrine.metadata + +import akka.actor.ActorRefFactory +import net.shrine.metadata.{HornetQMomWebApi, HornetQMomWebClient} +import net.shrine.mom.Message +import org.junit.runner.RunWith +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import org.scalatest.junit.JUnitRunner +import spray.testkit.ScalatestRouteTest + +import scala.concurrent.duration._ +import scala.collection.immutable.Seq + +/** + * Test create, delete queue, send, and receive message, getQueueNames, and acknowledge using HornetQMomWebClient + * Created by yifan on 8/10/17. + */ + +@RunWith(classOf[JUnitRunner]) +class HornetQMomWebClientTest extends FlatSpec with BeforeAndAfterAll with ScalaFutures with Matchers { + + // override def actorRefFactory: ActorRefFactory = system + + "HornetQMomWebClient" should "be able to send and receive just one message" in { + + val queueName = "testQueue" + + assert(HornetQMomWebClient.queues.isEmpty) + + + val queue = HornetQMomWebClient.createQueueIfAbsent(queueName) + + assert(HornetQMomWebClient.queues == Seq(queue)) + + val testContents = "Test message" + HornetQMomWebClient.send(testContents,queue) + + val message: Option[Message] = HornetQMomWebClient.receive(queue,1 second) + + assert(message.isDefined) + assert(message.get.contents == testContents) + + + HornetQMomWebClient.completeMessage(message.get) + + val shouldBeNoMessage: Option[Message] = HornetQMomWebClient.receive(queue,1 second) + + assert(shouldBeNoMessage.isEmpty) + + HornetQMomWebClient.deleteQueue(queueName) + assert(HornetQMomWebClient.queues.isEmpty) + } + + "HornetQ" should "be able to send and receive a few messages" in { + + val queueName = "testQueue" + + assert(HornetQMomWebClient.queues.isEmpty) + + val queue = HornetQMomWebClient.createQueueIfAbsent(queueName) + + assert(HornetQMomWebClient.queues == Seq(queue)) + + val testContents1 = "Test message1" + HornetQMomWebClient.send(testContents1,queue) + + val message1: Option[Message] = HornetQMomWebClient.receive(queue,1 second) + + assert(message1.isDefined) + assert(message1.get.contents == testContents1) + + HornetQMomWebClient.completeMessage(message1.get) + + val shouldBeNoMessage1: Option[Message] = HornetQMomWebClient.receive(queue,1 second) + + assert(shouldBeNoMessage1.isEmpty) + + val testContents2 = "Test message2" + HornetQMomWebClient.send(testContents2,queue) + + val testContents3 = "Test message3" + HornetQMomWebClient.send(testContents3,queue) + + val message2: Option[Message] = HornetQMomWebClient.receive(queue,1 second) + + assert(message2.isDefined) + assert(message2.get.contents == testContents2) + + HornetQMomWebClient.completeMessage(message2.get) + + val message3: Option[Message] = HornetQMomWebClient.receive(queue,1 second) + + assert(message3.isDefined) + assert(message3.get.contents == testContents3) + + HornetQMomWebClient.completeMessage(message3.get) + + val shouldBeNoMessage4: Option[Message] = HornetQMomWebClient.receive(queue,1 second) + + assert(shouldBeNoMessage4.isEmpty) + + HornetQMomWebClient.deleteQueue(queueName) + assert(HornetQMomWebClient.queues.isEmpty) + } + + "HornetQ" should "be OK if asked to create the same queue twice " in { + + val queueName = "testQueue" + HornetQMomWebClient.createQueueIfAbsent(queueName) + HornetQMomWebClient.createQueueIfAbsent(queueName) + + HornetQMomWebClient.deleteQueue(queueName) + } +} \ No newline at end of file diff --git a/commons/mom/pom.xml b/commons/mom/pom.xml index f2d318574..a4e149c81 100644 --- a/commons/mom/pom.xml +++ b/commons/mom/pom.xml @@ -1,60 +1,108 @@ 4.0.0 SHRINE Data Access Classes shrine-mom jar net.shrine shrine-base 1.23.5.1-SNAPSHOT ../../pom.xml src/main/scala src/test/scala net.alchim31.maven scala-maven-plugin org.apache.maven.plugins maven-jar-plugin test-jar net.shrine shrine-test-commons ${project.version} test-jar test net.shrine shrine-config ${project.version} net.shrine shrine-util ${project.version} + + com.typesafe.akka + akka-actor_2.11 + ${akka-version} + + + io.spray + spray-routing_2.11 + ${spray-version} + + + io.spray + spray-servlet_2.11 + ${spray-version} + + + io.spray + spray-util_2.11 + ${spray-version} + + + io.spray + spray-testkit_2.11 + ${spray-version} + test + + + io.spray + spray-can_2.11 + 1.3.4 + provided + + + com.typesafe.akka + akka-actor_2.11 + ${akka-version} + + + com.typesafe.akka + akka-slf4j_2.11 + ${akka-version} + + + com.typesafe.akka + akka-testkit_2.11 + ${akka-testkit-version} + test + org.hornetq hornetq-server 2.4.7.Final diff --git a/commons/mom/src/main/scala/net/shrine/mom/HttpClient.scala b/commons/mom/src/main/scala/net/shrine/mom/HttpClient.scala new file mode 100644 index 000000000..54fb656ed --- /dev/null +++ b/commons/mom/src/main/scala/net/shrine/mom/HttpClient.scala @@ -0,0 +1,91 @@ +package net.shrine.mom + +import java.security.cert.X509Certificate +import javax.net.ssl.{SSLContext, X509TrustManager} + +import akka.actor.{ActorRef, ActorSystem} +import akka.io.IO +import akka.pattern.ask +import net.shrine.log.Loggable +import spray.can.Http +import spray.can.Http.{ConnectionAttemptFailedException, HostConnectorSetup} +import spray.http.{HttpEntity, HttpRequest, HttpResponse, StatusCodes} +import spray.io.ClientSSLEngineProvider + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.DurationInt +import scala.concurrent.{Await, Future, TimeoutException, blocking} +import scala.language.postfixOps +import scala.util.control.NonFatal + +/** + * A simple HttpClient to use inside the HttpDirectives + */ +object HttpClient extends Loggable { + + //todo hand back a Try, Failures with custom exceptions instead of a crappy response + def webApiCall(request:HttpRequest)(implicit system: ActorSystem): HttpResponse = { + val transport: ActorRef = IO(Http)(system) + + debug(s"Requesting $request uri is ${request.uri} path is ${request.uri.path}") + blocking { + val future:Future[HttpResponse] = for { + Http.HostConnectorInfo(connector, _) <- transport.ask(createConnector(request))(10 seconds) //todo make this timeout configurable + response <- connector.ask(request)(10 seconds).mapTo[HttpResponse] //todo make this timeout configurable + } yield response + try { + Await.result(future, 10 seconds) //todo make this timeout configurable + } + catch { + case x:TimeoutException => HttpResponse(status = StatusCodes.RequestTimeout,entity = HttpEntity(s"${request.uri} timed out after 10 seconds. ${x.getMessage}")) + //todo is there a better message? What comes up in real life? + case x:ConnectionAttemptFailedException => { + //no web service is there to respond + info(s"${request.uri} failed with ${x.getMessage}",x) + HttpResponse(status = StatusCodes.NotFound,entity = HttpEntity(s"${request.uri} failed with ${x.getMessage}")) + } + case NonFatal(x) => { + info(s"${request.uri} failed with ${x.getMessage}",x) + HttpResponse(status = StatusCodes.InternalServerError,entity = HttpEntity(s"${request.uri} failed with ${x.getMessage}")) + } + } + } + } + + //from https://github.com/TimothyKlim/spray-ssl-poc/blob/master/src/main/scala/Main.scala + //trust all SSL contexts. We just want encrypted comms. + implicit val trustfulSslContext: SSLContext = { + + class IgnoreX509TrustManager extends X509TrustManager { + def checkClientTrusted(chain: Array[X509Certificate], authType: String) {} + + def checkServerTrusted(chain: Array[X509Certificate], authType: String) {} + + def getAcceptedIssuers = null + } + + val context = SSLContext.getInstance("TLS") + context.init(null, Array(new IgnoreX509TrustManager), null) + info("trustfulSslContex initialized") + context + } + + implicit val clientSSLEngineProvider = + //todo lookup this constructor + ClientSSLEngineProvider { + _ => + val engine = trustfulSslContext.createSSLEngine() + engine.setUseClientMode(true) + engine + } + + def createConnector(request: HttpRequest) = { + + val connector = new HostConnectorSetup(host = request.uri.authority.host.toString, + port = request.uri.effectivePort, + sslEncryption = request.uri.scheme == "https", + defaultHeaders = request.headers) + connector + } + +}