diff --git a/messagequeue/hornetqmom/src/main/resources/reference.conf b/messagequeue/hornetqmom/src/main/resources/reference.conf index 7436b4e55..f6c2ac407 100644 --- a/messagequeue/hornetqmom/src/main/resources/reference.conf +++ b/messagequeue/hornetqmom/src/main/resources/reference.conf @@ -1,9 +1,9 @@ shrine { messagequeue { hornetq { } hornetQWebApi { - enabled = true + enabled = false } } } \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala index d1c9c1619..67e5aaff9 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala @@ -1,238 +1,236 @@ package net.shrine.hornetqmom import java.util.UUID import net.shrine.log.Loggable -import net.shrine.messagequeueservice.{Message, Queue} +import net.shrine.messagequeueservice.Queue import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.source.ConfigSource -import net.shrine.spray.DefaultJsonSupport -import org.hornetq.api.core.client.ClientMessage import org.json4s.JsonAST.{JField, JObject} import org.json4s.native.Serialization -import org.json4s.native.Serialization.{read, write} -import org.json4s.{CustomSerializer, Formats, JString, NoTypeHints, ShortTypeHints} +import org.json4s.native.Serialization.write +import org.json4s.{CustomSerializer, JString, NoTypeHints, ShortTypeHints} import spray.http.StatusCodes import spray.routing.{HttpService, Route} import scala.collection.concurrent.TrieMap import scala.collection.immutable.Seq import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} /** * A web API that provides access to the internal HornetQMom library. * Allows client to createQueue, deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * Created by yifan on 7/24/17. */ trait HornetQMomWebApi extends HttpService with Loggable { def enabled: Boolean = ConfigSource.config.getBoolean("shrine.messagequeue.hornetQWebApi.enabled") val warningMessage: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" // keep a map of messages and ids private val idToMessages: TrieMap[UUID, LocalHornetQMom.LocalHornetQMessage] = TrieMap.empty def momRoute: Route = pathPrefix("mom") { if (!enabled) { val configProblem: CannotUseHornetQMomWebApiProblem = CannotUseHornetQMomWebApiProblem(new UnsupportedOperationException) warn(s"HornetQMomWebApi is not available to use due to configProblem ${configProblem.description}!") respondWithStatus(StatusCodes.NotFound) { complete(warningMessage) } } else { put { createQueue ~ sendMessage ~ acknowledge } ~ receiveMessage ~ getQueues ~ deleteQueue } } // SQS returns CreateQueueResult, which contains queueUrl: String def createQueue: Route = path("createQueue" / Segment) { queueName => detach() { val createdQueueTry: Try[Queue] = LocalHornetQMom.createQueueIfAbsent(queueName) createdQueueTry match { case Success(queue) => { implicit val formats = Serialization.formats(NoTypeHints) + QueueSerializer val response: String = write[Queue](queue)(formats) respondWithStatus(StatusCodes.Created) { complete(response) } } case Failure(x) => { internalServerErrorOccured(x, "createQueue") } } } } // SQS takes in DeleteMessageRequest, which contains a queueUrl: String and a ReceiptHandle: String // returns a DeleteMessageResult, toString for debugging def deleteQueue: Route = path("deleteQueue" / Segment) { queueName => put { detach() { val deleteQueueTry: Try[Unit] = LocalHornetQMom.deleteQueue(queueName) deleteQueueTry match { case Success(v) => { complete(StatusCodes.OK) } case Failure(x) => { internalServerErrorOccured(x, "deleteQueue") } } } } } // SQS sendMessage(String queueUrl, String messageBody) => SendMessageResult def sendMessage: Route = path("sendMessage" / Segment) { toQueue => requestInstance { request => val messageContent = request.entity.asString debug(s"sendMessage to $toQueue '$messageContent'") detach() { val sendTry: Try[Unit] = LocalHornetQMom.send(messageContent, Queue(toQueue)) sendTry match { case Success(v) => { complete(StatusCodes.Accepted) } case Failure(x) => { internalServerErrorOccured(x, "sendMessage") } } } } } // SQS ReceiveMessageResult receiveMessage(String queueUrl) def receiveMessage: Route = get { path("receiveMessage" / Segment) { fromQueue => parameter('timeOutSeconds ? 20) { timeOutSeconds => val timeout: Duration = Duration.create(timeOutSeconds, "seconds") detach() { val receiveTry: Try[Option[LocalHornetQMom.LocalHornetQMessage]] = LocalHornetQMom.receive(Queue(fromQueue), timeout) receiveTry match { case Success(optionMessage) => { optionMessage.fold(complete(StatusCodes.NotFound)){localHornetQMessage => // add message in the map with an unique UUID val msgID = UUID.randomUUID() idToMessages.getOrElseUpdate(msgID, localHornetQMessage) complete(MessageContainer(msgID.toString, localHornetQMessage.getContents).toJson) } } case Failure(x) => { internalServerErrorOccured(x, "receiveMessage") } } } } } } // SQS has DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) def acknowledge: Route = path("acknowledge") { entity(as[String]) { messageUUID => detach() { val id: UUID = UUID.fromString(messageUUID) // retrieve the localMessage from the concurrent hashmap Try { if (!idToMessages.contains(id)) { throw new NoSuchElementException(s"Cannot match given $id to any Message in HornetQ server! Message does not exist!") } } val acknowledgeTry: Try[Unit] = idToMessages(id).complete() acknowledgeTry match { case Success(v) => { complete(StatusCodes.ResetContent) } case Failure(x) => { internalServerErrorOccured(x, "acknowledge") } } } } } // Returns the names of the queues created on this server. Seq[Any] def getQueues: Route = path("getQueues") { get { detach() { implicit val formats = Serialization.formats(NoTypeHints) + QueueSerializer respondWithStatus(StatusCodes.OK) { val getQueuesTry: Try[Seq[Queue]] = LocalHornetQMom.queues getQueuesTry match { case Success(seqQueue) => { complete(write[Seq[Queue]](LocalHornetQMom.queues.get)(formats)) } case Failure(x) => { internalServerErrorOccured(x, "getQueues") } } } } } } def internalServerErrorOccured(x: Throwable, function: String): Route = { respondWithStatus(StatusCodes.InternalServerError) { val serverErrorProblem: HornetQMomServerErrorProblem = HornetQMomServerErrorProblem(x, function) debug(s"HornetQ encountered a Problem during $function, Problem Details: $serverErrorProblem") complete(s"HornetQ throws an exception while trying to $function. HornetQ Server response: ${x.getMessage}" + s"Exception is from ${x.getClass}") } } } object QueueSerializer extends CustomSerializer[Queue](format => ( { case JObject(JField("name", JString(s)) :: Nil) => Queue(s) }, { case queue: Queue => JObject(JField("name", JString(queue.name)) :: Nil) } )) case class MessageContainer(id: String, contents: String) { def toJson: String = { Serialization.write(this)(MessageContainer.messageFormats) } } object MessageContainer { val messageFormats = Serialization.formats(ShortTypeHints(List(classOf[MessageContainer]))) def fromJson(jsonString: String): MessageContainer = { implicit val formats = messageFormats Serialization.read[MessageContainer](jsonString) } } case class HornetQMomServerErrorProblem(x:Throwable, function:String) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to a server error occurred in hornetQ." override val description: String = s"HornetQ throws an exception while trying to $function," + s" the server's response is: ${x.getMessage} from ${x.getClass}." } case class CannotUseHornetQMomWebApiProblem(x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to configuration in shrine.conf." override val description: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" } diff --git a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala index e4ebf04e2..0d42ed82f 100644 --- a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala +++ b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala @@ -1,104 +1,107 @@ package net.shrine.hornetqmom import akka.actor.ActorRefFactory import net.shrine.messagequeueservice.Queue import net.shrine.source.ConfigSource import org.json4s.NoTypeHints import org.json4s.native.Serialization import org.json4s.native.Serialization.read import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import spray.http.HttpEntity import spray.http.StatusCodes._ import spray.testkit.ScalatestRouteTest import scala.collection.immutable.Seq /** * Test basic functions of HornetQMomWebApi * Created by yifan on 7/27/17. */ @RunWith(classOf[JUnitRunner]) class HornetQMomWebApiTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi { override def actorRefFactory: ActorRefFactory = system private val proposedQueueName = "test Queue" private val queue: Queue = Queue(proposedQueueName) private val queueName: String = queue.name // "testQueue" private val messageContent = "test Content" "HornetQMomWebApi" should "create/delete the given queue, send/receive message, get queues" in { ConfigSource.atomicConfig.configForBlock("shrine.messagequeue.hornetQWebApi.enabled", "true", "HornetQMomWebApiTest") { Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { val response = new String(body.data.toByteArray) implicit val formats = Serialization.formats(NoTypeHints) + QueueSerializer val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) val responseQueueName = jsonToQueue.name assertResult(Created)(status) assertResult(queueName)(responseQueueName) } // should be OK to create a queue twice Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { val response = new String(body.data.toByteArray) implicit val formats = Serialization.formats(NoTypeHints) + QueueSerializer val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) val responseQueueName = jsonToQueue.name assertResult(Created)(status) assertResult(queueName)(responseQueueName) } Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { assertResult(Accepted)(status) } Get(s"/mom/getQueues") ~> momRoute ~> check { val response: String = new String(body.data.toByteArray) implicit val formats = Serialization.formats(NoTypeHints) + QueueSerializer val jsonToSeq: Seq[Queue] = read[Seq[Queue]](response)(formats, manifest[Seq[Queue]]) assertResult(OK)(status) assertResult(queueName)(jsonToSeq.head.name) } var messageUUID: String = "" // given timeout is 2 seconds Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=2") ~> momRoute ~> check { val response = new String(body.data.toByteArray) assertResult(OK)(status) val responseMsg = MessageContainer.fromJson(response) messageUUID = responseMsg.id assertResult(responseMsg.contents)(messageContent) } Put("/mom/acknowledge", HttpEntity(s"$messageUUID")) ~> momRoute ~> check { assertResult(ResetContent)(status) } Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { assertResult(OK)(status) } } } - "HornetQMomWebApi" should "respond Internal server error with the corresponding error message when " + + "HornetQMomWebApi" should "respond InternalServerError with the corresponding error message when " + "failures occur while creating/deleting the given queue, sending/receiving message, getting queues" in { - Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { - assertResult(InternalServerError)(status) - } + ConfigSource.atomicConfig.configForBlock("shrine.messagequeue.hornetQWebApi.enabled", "true", "HornetQMomWebApiTest") { - Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { - assertResult(InternalServerError)(status) - } + Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { + assertResult(InternalServerError)(status) + } - // given timeout is 1 seconds - Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=1") ~> momRoute ~> check { - assertResult(InternalServerError)(status) + Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { + assertResult(InternalServerError)(status) + } + + // given timeout is 1 seconds + Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=1") ~> momRoute ~> check { + assertResult(InternalServerError)(status) + } } } } \ No newline at end of file