diff --git a/apps/dashboard-app/pom.xml b/apps/dashboard-app/pom.xml
index a2047af5f..a2d983c8e 100644
--- a/apps/dashboard-app/pom.xml
+++ b/apps/dashboard-app/pom.xml
@@ -1,207 +1,212 @@
shrine-base
net.shrine
1.23.5.1-SNAPSHOT
../../pom.xml
4.0.0
dashboard-app
1.23.5.1-SNAPSHOT
Dashboard App
jar
net.alchim31.maven
scala-maven-plugin
com.github.eirslett
frontend-maven-plugin
1.0
src/main/js
install node and npm
install-node-and-npm
v6.10.3
4.1.2
npm install
npm
generate-resources
install -s
bower install
bower
install -s
grunt default
grunt
--no-color
log4j
log4j
io.spray
spray-routing_2.11
${spray-version}
io.spray
spray-servlet_2.11
${spray-version}
io.spray
spray-util_2.11
${spray-version}
io.spray
spray-testkit_2.11
${spray-version}
test
com.typesafe.akka
akka-actor_2.11
${akka-version}
com.typesafe.akka
akka-slf4j_2.11
${akka-version}
com.typesafe.akka
akka-testkit_2.11
${akka-testkit-version}
test
org.json4s
json4s-native_2.11
${json4s-version}
com.typesafe.slick
slick_2.11
${slick-version}
org.slf4j
slf4j-log4j12
${slf4j-version}
com.h2database
h2
${h2-version}
test
net.shrine
shrine-protocol
${project.version}
net.shrine
shrine-utility-commons
${project.version}
net.shrine
shrine-crypto
${project.version}
test-jar
test
net.shrine
shrine-auth
${project.version}
net.shrine
shrine-data-commons
${project.version}
+
+ net.shrine
+ shrine-mom
+ ${project.version}
+
mysql
mysql-connector-java
${mysql-version}
io.jsonwebtoken
jjwt
${jjwt-version}
net.sourceforge.jtds
jtds
${jtds-version}
net.shrine
shrine-adapter-client-api
${project.version}
com.typesafe
config
${typesafe-config-version}
diff --git a/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala b/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala
index e40f2e7cf..b7a9d12c6 100644
--- a/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala
+++ b/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala
@@ -1,207 +1,134 @@
package net.shrine.dashboard.httpclient
import java.io.InputStream
import java.security.cert.X509Certificate
import javax.net.ssl.{SSLContext, X509TrustManager}
-
+import net.shrine.mom.HttpClient
import net.shrine.log.Loggable
import spray.can.Http
import akka.io.IO
import akka.actor.{ActorRef, ActorSystem}
import spray.can.Http.{ConnectionAttemptFailedException, HostConnectorSetup}
import spray.http.{HttpCredentials, HttpEntity, HttpHeader, HttpHeaders, HttpRequest, HttpResponse, StatusCodes, Uri}
import spray.io.ClientSSLEngineProvider
import spray.routing.{RequestContext, Route}
import akka.pattern.ask
import net.shrine.source.ConfigSource
import scala.concurrent.{Await, Future, TimeoutException, blocking}
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.NonFatal
/**
* From https://github.com/bthuillier/spray/commit/d31fc1b5e1415e1b908fe7d1f01f364a727e2593 with extra bits from http://www.cakesolutions.net/teamblogs/http-proxy-with-spray .
* Replace when Spray has its own version.
*
* @author david
* @since 9/14/15
*/
trait HttpClientDirectives extends Loggable {
implicit val system = ActorSystem("dashboardServer",ConfigSource.config)
/**
* Proxy the request to the specified base uri appended with the unmatched path.
*
*/
def forwardUnmatchedPath(baseUri: Uri,maybeCredentials:Option[HttpCredentials] = None): Route = {
def completeWithEntityAsString(httpResponse:HttpResponse,uri:Uri):Route = {
ctx => {
ctx.complete(httpResponse.entity.asString)
}
}
requestWithUnmatchedPath(baseUri,completeWithEntityAsString,maybeCredentials)
}
/**
* Make the request to the specified base uri appended with the unmatched path, then use the returned entity (as a string) to complete the route.
*
*/
def requestWithUnmatchedPath(baseUri:Uri, route:(HttpResponse,Uri) => Route,maybeCredentials:Option[HttpCredentials] = None): Route = {
ctx => {
val resourceUri = baseUri.withPath(baseUri.path.++(ctx.unmatchedPath)).withQuery(ctx.request.uri.query)
requestUriThenRoute(resourceUri,route,maybeCredentials)(ctx)
}
}
/**
* Just pass the result through
*/
def passThrough(httpResponse: HttpResponse,uri: Uri):Route = ctx => ctx.complete(httpResponse.entity.asString)
/**
* proxy the request to the specified uri with the unmatched path, then use the returned entity (as a string) to complete the route.
*
*/
def requestUriThenRoute(
resourceUri:Uri,
route:(HttpResponse,Uri) => Route = passThrough,
maybeCredentials:Option[HttpCredentials] = None
): Route = {
ctx => {
val httpResponse = httpResponseForUri(resourceUri,ctx,maybeCredentials)
info(s"Got $httpResponse for $resourceUri")
handleCommonErrorsOrRoute(route)(httpResponse,resourceUri)(ctx)
}
}
private def httpResponseForUri(resourceUri:Uri,ctx: RequestContext,maybeCredentials:Option[HttpCredentials] = None):HttpResponse = {
info(s"Requesting $resourceUri")
if(resourceUri.scheme == "classpath") ClasspathResourceHttpClient.loadFromResource(resourceUri.path.toString())
else {
val basicRequest = HttpRequest(ctx.request.method,resourceUri)
val request = maybeCredentials.fold(basicRequest){ (credentials: HttpCredentials) =>
val headers: List[HttpHeader] = basicRequest.headers :+ HttpHeaders.Authorization(credentials)
basicRequest.copy(headers = headers)
}
HttpClient.webApiCall(request)
}
}
def handleCommonErrorsOrRoute(route:(HttpResponse,Uri) => Route)(httpResponse: HttpResponse,uri:Uri): Route = {
ctx => {
if(httpResponse.status != StatusCodes.OK) {
//todo create and report a problem
val ctxCopy: RequestContext = ctx.withHttpResponseMapped(_.copy(status = httpResponse.status))
ctxCopy.complete(s"$uri replied with $httpResponse")
}
else route(httpResponse,uri)(ctx)
}
}
}
object HttpClientDirectives extends HttpClientDirectives
-
-/**
- * A simple HttpClient to use inside the HttpDirectives
- */
-object HttpClient extends Loggable {
-
- //todo hand back a Try, Failures with custom exceptions instead of a crappy response
- def webApiCall(request:HttpRequest)(implicit system: ActorSystem): HttpResponse = {
- val transport: ActorRef = IO(Http)(system)
-
- debug(s"Requesting $request uri is ${request.uri} path is ${request.uri.path}")
- blocking {
- val future:Future[HttpResponse] = for {
- Http.HostConnectorInfo(connector, _) <- transport.ask(createConnector(request))(10 seconds) //todo make this timeout configurable
- response <- connector.ask(request)(10 seconds).mapTo[HttpResponse] //todo make this timeout configurable
- } yield response
- try {
- Await.result(future, 10 seconds) //todo make this timeout configurable
- }
- catch {
- case x:TimeoutException => HttpResponse(status = StatusCodes.RequestTimeout,entity = HttpEntity(s"${request.uri} timed out after 10 seconds. ${x.getMessage}"))
- //todo is there a better message? What comes up in real life?
- case x:ConnectionAttemptFailedException => {
- //no web service is there to respond
- info(s"${request.uri} failed with ${x.getMessage}",x)
- HttpResponse(status = StatusCodes.NotFound,entity = HttpEntity(s"${request.uri} failed with ${x.getMessage}"))
- }
- case NonFatal(x) => {
- info(s"${request.uri} failed with ${x.getMessage}",x)
- HttpResponse(status = StatusCodes.InternalServerError,entity = HttpEntity(s"${request.uri} failed with ${x.getMessage}"))
- }
- }
- }
- }
-
-//from https://github.com/TimothyKlim/spray-ssl-poc/blob/master/src/main/scala/Main.scala
-//trust all SSL contexts. We just want encrypted comms.
- implicit val trustfulSslContext: SSLContext = {
-
- class IgnoreX509TrustManager extends X509TrustManager {
- def checkClientTrusted(chain: Array[X509Certificate], authType: String) {}
-
- def checkServerTrusted(chain: Array[X509Certificate], authType: String) {}
-
- def getAcceptedIssuers = null
- }
-
- val context = SSLContext.getInstance("TLS")
- context.init(null, Array(new IgnoreX509TrustManager), null)
- info("trustfulSslContex initialized")
- context
- }
-
- implicit val clientSSLEngineProvider =
- //todo lookup this constructor
- ClientSSLEngineProvider {
- _ =>
- val engine = trustfulSslContext.createSSLEngine()
- engine.setUseClientMode(true)
- engine
- }
-
- def createConnector(request: HttpRequest) = {
-
- val connector = new HostConnectorSetup(host = request.uri.authority.host.toString,
- port = request.uri.effectivePort,
- sslEncryption = request.uri.scheme == "https",
- defaultHeaders = request.headers)
- connector
- }
-
-}
-
/**
* For testing, get an HttpResponse for a classpath resource
*/
object ClasspathResourceHttpClient extends Loggable {
def loadFromResource(resourceName:String):HttpResponse = {
blocking {
val cleanResourceName = if (resourceName.startsWith ("/") ) resourceName.drop(1)
else resourceName
val classLoader = getClass.getClassLoader
try {
val is: InputStream = classLoader.getResourceAsStream (cleanResourceName)
val string:String = scala.io.Source.fromInputStream (is).mkString
HttpResponse(entity = HttpEntity(string))
}
catch{
case NonFatal(x) => {
info(s"Could not load $resourceName",x)
HttpResponse(status = StatusCodes.NotFound,entity = HttpEntity(s"Could not load $resourceName due to ${x.getMessage}"))
}
}
}
}
}
\ No newline at end of file
diff --git a/apps/meta-app/pom.xml b/apps/meta-app/pom.xml
index 5d1fd225f..d7ccfe292 100644
--- a/apps/meta-app/pom.xml
+++ b/apps/meta-app/pom.xml
@@ -1,129 +1,129 @@
shrine-base
net.shrine
1.23.5.1-SNAPSHOT
../../pom.xml
4.0.0
meta-app
MetaData App
jar
net.alchim31.maven
scala-maven-plugin
com.propensive
rapture-json_2.11
${rapture-version}
com.propensive
rapture-json-jawn_2.11
${rapture-version}
-
+
net.shrine
shrine-utility-commons
${project.version}
net.shrine
shrine-config
${project.version}
net.shrine
shrine-auth
${project.version}
net.shrine
shrine-mom
${project.version}
net.shrine
shrine-qep
${project.version}
log4j
log4j
io.spray
spray-routing_2.11
${spray-version}
io.spray
spray-servlet_2.11
${spray-version}
io.spray
spray-util_2.11
${spray-version}
io.spray
spray-testkit_2.11
${spray-version}
test
com.typesafe.akka
akka-actor_2.11
${akka-version}
com.typesafe.akka
akka-slf4j_2.11
${akka-version}
com.typesafe.akka
akka-testkit_2.11
${akka-testkit-version}
test
org.json4s
json4s-native_2.11
${json4s-version}
com.typesafe
config
${typesafe-config-version}
mysql
mysql-connector-java
${mysql-version}
com.h2database
h2
test
org.slf4j
slf4j-log4j12
${slf4j-version}
test
\ No newline at end of file
diff --git a/apps/meta-app/src/main/resources/reference.conf b/apps/meta-app/src/main/resources/reference.conf
index 5b22fe2c1..f7b112866 100644
--- a/apps/meta-app/src/main/resources/reference.conf
+++ b/apps/meta-app/src/main/resources/reference.conf
@@ -1,21 +1,22 @@
shrine {
metaData {
ping = "pong"
+ hornetQMomUrl = "https://localhost:6443/shrine/mom"
}
}
//todo typesafe config precedence seems to do the right thing, but I haven't found the rules that say this reference.conf should override others
akka {
loglevel = INFO
// log-config-on-start = on
loggers = ["akka.event.slf4j.Slf4jLogger"]
// logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
// Toggles whether the threads created by this ActorSystem should be daemons or not
daemonic = on
}
spray.servlet {
boot-class = "net.shrine.metadata.Boot"
request-timeout = 30s
}
diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebApi.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebApi.scala
index c087df5be..a5d257101 100644
--- a/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebApi.scala
+++ b/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebApi.scala
@@ -1,100 +1,106 @@
package net.shrine.metadata
import akka.event.Logging
import net.shrine.log.Loggable
import net.shrine.mom.{LocalHornetQMom, Message, MessageSerializer, Queue}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.json4s.native.Serialization.write
import org.json4s.{JValue, NoTypeHints, _}
import spray.http.StatusCodes
import spray.routing.directives.LogEntry
import spray.routing.{HttpService, Route}
import scala.concurrent.duration.Duration
/**
* A web API that provides access to the internal HornetQMom library.
* Allows client to createQueue, deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt
*
* Created by yifan on 7/24/17.
*/
trait HornetQMomWebApi extends HttpService
with Loggable {
def momRoute: Route = pathPrefix("mom") {
put {
createQueue ~
deleteQueue ~
sendMessage ~
acknowledge
} ~ receiveMessage ~ getQueues
}
// SQS returns CreateQueueResult, which contains queueUrl: String
def createQueue: Route = path("createQueue" / Segment) { queueName =>
val createdQueue: Queue = LocalHornetQMom.createQueueIfAbsent(queueName)
implicit val formats = Serialization.formats(NoTypeHints)
val response: String = write[Queue](createdQueue)(formats)
respondWithStatus(StatusCodes.Created) { complete(response) }
}
// SQS takes in DeleteMessageRequest, which contains a queueUrl: String and a ReceiptHandle: String
// returns a DeleteMessageResult, toString for debugging
def deleteQueue: Route = path("deleteQueue" / Segment) { queueName =>
LocalHornetQMom.deleteQueue(queueName)
complete(StatusCodes.OK)
}
// SQS sendMessage(String queueUrl, String messageBody) => SendMessageResult
def sendMessage: Route = path("sendMessage" / Segment / Segment) { (messageContent, toQueue) =>
LocalHornetQMom.send(messageContent, Queue.apply(toQueue))
complete(StatusCodes.Accepted)
}
// SQS ReceiveMessageResult receiveMessage(String queueUrl)
def receiveMessage: Route =
get {
path("receiveMessage" / Segment) { fromQueue =>
parameter('timeOutSeconds ? 20) { timeOutSeconds =>
val timeout: Duration = Duration.create(timeOutSeconds, "seconds")
- val response: Message = LocalHornetQMom.receive(Queue.apply(fromQueue), timeout).get
- implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer
- respondWithStatus(StatusCodes.OK) {
- complete(write[Message](response)(formats))
+ val response: Option[Message] = LocalHornetQMom.receive(Queue.apply(fromQueue), timeout)
+
+ response match {
+ case Some(response) =>
+ implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer
+ respondWithStatus(StatusCodes.OK) {
+ complete(write[Message](response)(formats))
+ }
+ case None =>
+ complete(StatusCodes.NotFound)
}
}
}
}
// SQS has DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle)
def acknowledge: Route = path("acknowledge") {
entity(as[String]) { messageJSON =>
implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer
val messageJValue: JValue = parse(messageJSON)
try {
val msg: Message = messageJValue.extract[Message](formats, manifest[Message])
LocalHornetQMom.completeMessage(msg)
complete(StatusCodes.NoContent)
} catch {
case x => {
LogEntry(s"\n Request: acknowledge/$messageJSON\n Response: $x", Logging.DebugLevel)
throw x}
}
}
}
// Returns the names of the queues created on this server. Seq[Any]
def getQueues: Route = path("getQueues") {
get {
val queues = LocalHornetQMom.queues
implicit val formats = Serialization.formats(NoTypeHints)
val response = write(queues)
respondWithStatus(StatusCodes.OK) {complete(response)}
}
}
}
\ No newline at end of file
diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebClient.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebClient.scala
new file mode 100644
index 000000000..172f74766
--- /dev/null
+++ b/apps/meta-app/src/main/scala/net/shrine/metadata/HornetQMomWebClient.scala
@@ -0,0 +1,120 @@
+package net.shrine.metadata
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.event.Logging
+import akka.io.IO
+import net.shrine.log.Loggable
+import net.shrine.mom.{HttpClient, Message, MessageQueueService, MessageSerializer, Queue}
+import spray.can.Http
+import spray.http.{HttpEntity, HttpMethods, HttpRequest, HttpResponse, StatusCode, StatusCodes, Uri}
+import akka.pattern.ask
+import net.shrine.source.ConfigSource
+import org.json4s.{Formats, NoTypeHints}
+import org.json4s.native.Serialization
+import org.json4s.native.Serialization.{read, write}
+import spray.routing.directives.LogEntry
+
+import scala.collection.immutable.Seq
+import scala.concurrent.duration.DurationInt
+import scala.collection.immutable
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+/**
+ * A simple HornetQMomWebClient that uses HornetQMomWebApi to createQueue,
+ * deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt
+ *
+ * @author yifan
+ * @since 8/10/17
+ */
+object HornetQMomWebClient extends MessageQueueService with Loggable {
+ implicit val system = ActorSystem()
+ // implicit val system = ActorSystem("dashboardServer",ConfigSource.config)
+ import system.dispatcher
+
+ val momUrl: String = ConfigSource.config.getString("shrine.metaData.hornetQMomUrl")
+
+
+ override def createQueueIfAbsent(queueName: String): Queue = {
+ val createQueueUrl = momUrl + s"/createQueue/$queueName"
+ val request: HttpRequest = HttpRequest(HttpMethods.PUT, createQueueUrl)
+ val response: HttpResponse = HttpClient.webApiCall(request)
+ if (response.status == StatusCodes.Created) {
+ val queueString = response.entity.asString
+ implicit val formats = Serialization.formats(NoTypeHints)
+ val queue: Queue = read[Queue](queueString)(formats, manifest[Queue])
+ queue
+ } else {
+ handleUnsuccessfulRequest(request, response)
+ }
+ }
+
+ override def deleteQueue(queueName: String): Unit = {
+ val deleteQueueUrl = momUrl + s"/deleteQueue/$queueName"
+ val request: HttpRequest = HttpRequest(HttpMethods.PUT, deleteQueueUrl)
+ val response: HttpResponse = HttpClient.webApiCall(request) // StatusCodes.OK
+ if (response.status != StatusCodes.OK) {
+ handleUnsuccessfulRequest(request, response)
+ }
+ }
+
+ override def queues: Seq[Queue] = {
+ val getQueuesUrl = momUrl + s"/getQueues"
+ val request: HttpRequest = HttpRequest(HttpMethods.GET, getQueuesUrl)
+
+ val response: HttpResponse = HttpClient.webApiCall(request)
+ if (response.status == StatusCodes.OK) {
+ val allQueues: String = response.entity.asString
+ implicit val formats = Serialization.formats(NoTypeHints)
+ val queues: Seq[Queue] = read[Seq[Queue]](allQueues)(formats, manifest[Seq[Queue]])
+ queues
+ } else {
+ handleUnsuccessfulRequest(request, response)
+ }
+ }
+
+ override def send(contents: String, to: Queue): Unit = {
+ val sendMessageUrl = momUrl + s"/sendMessage/$contents/$to"
+ val request: HttpRequest = HttpRequest(HttpMethods.PUT, sendMessageUrl)
+ val response: HttpResponse = HttpClient.webApiCall(request)
+ if (response.status != StatusCodes.Accepted) {
+ handleUnsuccessfulRequest(request, response)
+ }
+ }
+
+ override def receive(from: Queue, timeout: Duration): Option[Message] = {
+ val seconds = timeout.toSeconds
+ val receiveMessageUrl = momUrl + s"/receiveMessage/$from?timeOutSeconds=$seconds"
+ val request: HttpRequest = HttpRequest(HttpMethods.GET, receiveMessageUrl)
+ val response: HttpResponse = HttpClient.webApiCall(request)
+ if (response.status == StatusCodes.OK) {
+ val responseString: String = response.entity.asString
+ implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer
+ val messageResponse: Message = read[Message](responseString)(formats, manifest[Message])
+ val result: Option[Message] = Option(messageResponse)
+ result
+ } else {
+ handleUnsuccessfulRequest(request, response)
+ }
+ }
+
+ override def completeMessage(message: Message): Unit = {
+ implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer
+ val messageString: String = write[Message](message)(formats)
+
+ val entity: HttpEntity = HttpEntity(messageString)
+ val completeMessageUrl: String = momUrl + s"/acknowledge/$entity" // HttpEntity
+ val request: HttpRequest = HttpRequest(HttpMethods.PUT, completeMessageUrl)
+ val response: HttpResponse = HttpClient.webApiCall(request)
+ if (response.status != StatusCodes.NoContent) {
+ handleUnsuccessfulRequest(request, response)
+ }
+ }
+
+ private def handleUnsuccessfulRequest(request: HttpRequest, response: HttpResponse) = {
+ LogEntry(s"\n Request: ${request.uri} failed with response status: ${response.status}" +
+ s"\n response entity: ${response.entity.asString}", Logging.DebugLevel)
+ throw new Exception(s"\n Request: ${request.uri} failed with response status: ${response.status}" +
+ s"\n response entity: ${response.entity.asString}")
+ }
+}
\ No newline at end of file
diff --git a/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebApiTest.scala b/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebApiTest.scala
index 7e060877d..2f5ad4019 100644
--- a/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebApiTest.scala
+++ b/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebApiTest.scala
@@ -1,74 +1,75 @@
+package net.shrine.metadata
+
import akka.actor.ActorRefFactory
-import net.shrine.metadata.HornetQMomWebApi
import net.shrine.mom.{Message, MessageSerializer, Queue}
import org.json4s.NoTypeHints
import org.json4s.native.Serialization
import org.json4s.native.Serialization.read
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import spray.http.HttpEntity
import spray.http.StatusCodes._
import spray.testkit.ScalatestRouteTest
/**
* Created by yifan on 7/27/17.
*/
@RunWith(classOf[JUnitRunner])
class HornetQMomWebApiTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi {
override def actorRefFactory: ActorRefFactory = system
private val queueName = "testQueue"
private val messageContent = "testContent"
private var receivedMessage: String = ""
"RemoteHornetQMom" should "create/delete the given queue, send/receive message, get queues" in {
Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check {
val response = new String(body.data.toByteArray)
implicit val formats = Serialization.formats(NoTypeHints)
val jsonToQueue = read[Queue](response)(formats, manifest[Queue])
val responseQueueName = jsonToQueue.name
assertResult(Created)(status)
assertResult(queueName)(responseQueueName)
}
Put(s"/mom/sendMessage/$messageContent/$queueName") ~> momRoute ~> check {
assertResult(Accepted)(status)
}
Get(s"/mom/getQueues") ~> momRoute ~> check {
implicit val formats = Serialization.formats(NoTypeHints)
val response: String = new String(body.data.toByteArray)
val jsonToSeq: Seq[Queue] = read[Seq[Queue]](response, false)(formats, manifest[Seq[Queue]])
assertResult(OK)(status)
assertResult(queueName)(jsonToSeq.head.name)
}
// given timeout is 2 seconds
Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=2") ~> momRoute ~> check {
val response = new String(body.data.toByteArray)
receivedMessage = response
implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer
val responseToMessage: Message = read[Message](response)(formats, manifest[Message])
assertResult(OK)(status)
assert(responseToMessage.isInstanceOf[Message])
}
Put("/mom/acknowledge", HttpEntity(s"""$receivedMessage""")) ~>
momRoute ~> check {
implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer
assertResult(NoContent)(status)
}
Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check {
assertResult(OK)(status)
}
}
}
\ No newline at end of file
diff --git a/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebClientTest.scala b/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebClientTest.scala
new file mode 100644
index 000000000..462fd548a
--- /dev/null
+++ b/apps/meta-app/src/test/scala/net/shrine/metadata/HornetQMomWebClientTest.scala
@@ -0,0 +1,115 @@
+package net.shrine.metadata
+
+import akka.actor.ActorRefFactory
+import net.shrine.metadata.{HornetQMomWebApi, HornetQMomWebClient}
+import net.shrine.mom.Message
+import org.junit.runner.RunWith
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+import spray.testkit.ScalatestRouteTest
+
+import scala.concurrent.duration._
+import scala.collection.immutable.Seq
+
+/**
+ * Test create, delete queue, send, and receive message, getQueueNames, and acknowledge using HornetQMomWebClient
+ * Created by yifan on 8/10/17.
+ */
+
+@RunWith(classOf[JUnitRunner])
+class HornetQMomWebClientTest extends FlatSpec with BeforeAndAfterAll with ScalaFutures with Matchers {
+
+ // override def actorRefFactory: ActorRefFactory = system
+
+ "HornetQMomWebClient" should "be able to send and receive just one message" in {
+
+ val queueName = "testQueue"
+
+ assert(HornetQMomWebClient.queues.isEmpty)
+
+
+ val queue = HornetQMomWebClient.createQueueIfAbsent(queueName)
+
+ assert(HornetQMomWebClient.queues == Seq(queue))
+
+ val testContents = "Test message"
+ HornetQMomWebClient.send(testContents,queue)
+
+ val message: Option[Message] = HornetQMomWebClient.receive(queue,1 second)
+
+ assert(message.isDefined)
+ assert(message.get.contents == testContents)
+
+
+ HornetQMomWebClient.completeMessage(message.get)
+
+ val shouldBeNoMessage: Option[Message] = HornetQMomWebClient.receive(queue,1 second)
+
+ assert(shouldBeNoMessage.isEmpty)
+
+ HornetQMomWebClient.deleteQueue(queueName)
+ assert(HornetQMomWebClient.queues.isEmpty)
+ }
+
+ "HornetQ" should "be able to send and receive a few messages" in {
+
+ val queueName = "testQueue"
+
+ assert(HornetQMomWebClient.queues.isEmpty)
+
+ val queue = HornetQMomWebClient.createQueueIfAbsent(queueName)
+
+ assert(HornetQMomWebClient.queues == Seq(queue))
+
+ val testContents1 = "Test message1"
+ HornetQMomWebClient.send(testContents1,queue)
+
+ val message1: Option[Message] = HornetQMomWebClient.receive(queue,1 second)
+
+ assert(message1.isDefined)
+ assert(message1.get.contents == testContents1)
+
+ HornetQMomWebClient.completeMessage(message1.get)
+
+ val shouldBeNoMessage1: Option[Message] = HornetQMomWebClient.receive(queue,1 second)
+
+ assert(shouldBeNoMessage1.isEmpty)
+
+ val testContents2 = "Test message2"
+ HornetQMomWebClient.send(testContents2,queue)
+
+ val testContents3 = "Test message3"
+ HornetQMomWebClient.send(testContents3,queue)
+
+ val message2: Option[Message] = HornetQMomWebClient.receive(queue,1 second)
+
+ assert(message2.isDefined)
+ assert(message2.get.contents == testContents2)
+
+ HornetQMomWebClient.completeMessage(message2.get)
+
+ val message3: Option[Message] = HornetQMomWebClient.receive(queue,1 second)
+
+ assert(message3.isDefined)
+ assert(message3.get.contents == testContents3)
+
+ HornetQMomWebClient.completeMessage(message3.get)
+
+ val shouldBeNoMessage4: Option[Message] = HornetQMomWebClient.receive(queue,1 second)
+
+ assert(shouldBeNoMessage4.isEmpty)
+
+ HornetQMomWebClient.deleteQueue(queueName)
+ assert(HornetQMomWebClient.queues.isEmpty)
+ }
+
+ "HornetQ" should "be OK if asked to create the same queue twice " in {
+
+ val queueName = "testQueue"
+ HornetQMomWebClient.createQueueIfAbsent(queueName)
+ HornetQMomWebClient.createQueueIfAbsent(queueName)
+
+ HornetQMomWebClient.deleteQueue(queueName)
+ }
+}
\ No newline at end of file
diff --git a/commons/mom/pom.xml b/commons/mom/pom.xml
index f2d318574..a4e149c81 100644
--- a/commons/mom/pom.xml
+++ b/commons/mom/pom.xml
@@ -1,60 +1,108 @@
4.0.0
SHRINE Data Access Classes
shrine-mom
jar
net.shrine
shrine-base
1.23.5.1-SNAPSHOT
../../pom.xml
src/main/scala
src/test/scala
net.alchim31.maven
scala-maven-plugin
org.apache.maven.plugins
maven-jar-plugin
test-jar
net.shrine
shrine-test-commons
${project.version}
test-jar
test
net.shrine
shrine-config
${project.version}
net.shrine
shrine-util
${project.version}
+
+ com.typesafe.akka
+ akka-actor_2.11
+ ${akka-version}
+
+
+ io.spray
+ spray-routing_2.11
+ ${spray-version}
+
+
+ io.spray
+ spray-servlet_2.11
+ ${spray-version}
+
+
+ io.spray
+ spray-util_2.11
+ ${spray-version}
+
+
+ io.spray
+ spray-testkit_2.11
+ ${spray-version}
+ test
+
+
+ io.spray
+ spray-can_2.11
+ 1.3.4
+ provided
+
+
+ com.typesafe.akka
+ akka-actor_2.11
+ ${akka-version}
+
+
+ com.typesafe.akka
+ akka-slf4j_2.11
+ ${akka-version}
+
+
+ com.typesafe.akka
+ akka-testkit_2.11
+ ${akka-testkit-version}
+ test
+
org.hornetq
hornetq-server
2.4.7.Final
diff --git a/commons/mom/src/main/scala/net/shrine/mom/HttpClient.scala b/commons/mom/src/main/scala/net/shrine/mom/HttpClient.scala
new file mode 100644
index 000000000..54fb656ed
--- /dev/null
+++ b/commons/mom/src/main/scala/net/shrine/mom/HttpClient.scala
@@ -0,0 +1,91 @@
+package net.shrine.mom
+
+import java.security.cert.X509Certificate
+import javax.net.ssl.{SSLContext, X509TrustManager}
+
+import akka.actor.{ActorRef, ActorSystem}
+import akka.io.IO
+import akka.pattern.ask
+import net.shrine.log.Loggable
+import spray.can.Http
+import spray.can.Http.{ConnectionAttemptFailedException, HostConnectorSetup}
+import spray.http.{HttpEntity, HttpRequest, HttpResponse, StatusCodes}
+import spray.io.ClientSSLEngineProvider
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.DurationInt
+import scala.concurrent.{Await, Future, TimeoutException, blocking}
+import scala.language.postfixOps
+import scala.util.control.NonFatal
+
+/**
+ * A simple HttpClient to use inside the HttpDirectives
+ */
+object HttpClient extends Loggable {
+
+ //todo hand back a Try, Failures with custom exceptions instead of a crappy response
+ def webApiCall(request:HttpRequest)(implicit system: ActorSystem): HttpResponse = {
+ val transport: ActorRef = IO(Http)(system)
+
+ debug(s"Requesting $request uri is ${request.uri} path is ${request.uri.path}")
+ blocking {
+ val future:Future[HttpResponse] = for {
+ Http.HostConnectorInfo(connector, _) <- transport.ask(createConnector(request))(10 seconds) //todo make this timeout configurable
+ response <- connector.ask(request)(10 seconds).mapTo[HttpResponse] //todo make this timeout configurable
+ } yield response
+ try {
+ Await.result(future, 10 seconds) //todo make this timeout configurable
+ }
+ catch {
+ case x:TimeoutException => HttpResponse(status = StatusCodes.RequestTimeout,entity = HttpEntity(s"${request.uri} timed out after 10 seconds. ${x.getMessage}"))
+ //todo is there a better message? What comes up in real life?
+ case x:ConnectionAttemptFailedException => {
+ //no web service is there to respond
+ info(s"${request.uri} failed with ${x.getMessage}",x)
+ HttpResponse(status = StatusCodes.NotFound,entity = HttpEntity(s"${request.uri} failed with ${x.getMessage}"))
+ }
+ case NonFatal(x) => {
+ info(s"${request.uri} failed with ${x.getMessage}",x)
+ HttpResponse(status = StatusCodes.InternalServerError,entity = HttpEntity(s"${request.uri} failed with ${x.getMessage}"))
+ }
+ }
+ }
+ }
+
+ //from https://github.com/TimothyKlim/spray-ssl-poc/blob/master/src/main/scala/Main.scala
+ //trust all SSL contexts. We just want encrypted comms.
+ implicit val trustfulSslContext: SSLContext = {
+
+ class IgnoreX509TrustManager extends X509TrustManager {
+ def checkClientTrusted(chain: Array[X509Certificate], authType: String) {}
+
+ def checkServerTrusted(chain: Array[X509Certificate], authType: String) {}
+
+ def getAcceptedIssuers = null
+ }
+
+ val context = SSLContext.getInstance("TLS")
+ context.init(null, Array(new IgnoreX509TrustManager), null)
+ info("trustfulSslContex initialized")
+ context
+ }
+
+ implicit val clientSSLEngineProvider =
+ //todo lookup this constructor
+ ClientSSLEngineProvider {
+ _ =>
+ val engine = trustfulSslContext.createSSLEngine()
+ engine.setUseClientMode(true)
+ engine
+ }
+
+ def createConnector(request: HttpRequest) = {
+
+ val connector = new HostConnectorSetup(host = request.uri.authority.host.toString,
+ port = request.uri.effectivePort,
+ sslEncryption = request.uri.scheme == "https",
+ defaultHeaders = request.headers)
+ connector
+ }
+
+}