diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala index ae1f9c23e..7c6f0cd34 100644 --- a/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala +++ b/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala @@ -1,74 +1,81 @@ package net.shrine.metadata import com.typesafe.config.Config import net.shrine.config.ConfigExtensions import net.shrine.log.Log import net.shrine.messagequeueservice.{Message, MessageQueueService} +import net.shrine.problem.ProblemNotYetEncoded import net.shrine.protocol.{ResultOutputType, ResultOutputTypes, RunQueryResponse} import net.shrine.qep.querydb.QepQueryDb import net.shrine.source.ConfigSource import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal /** * Receives messages and writes the result to the QEP's cache * * @author david * @since 8/18/17 */ //todo in 1.24, look into a streaming API for messages object QepReceiver { val receiveMessageRunnable: Runnable = new Runnable { override def run(): Unit = { while(true) { //forever try { receiveAMessage() } catch { case NonFatal(x) => Log.error("Exception while receiving a message.",x)//todo new kind of problem //pass-through to blow up the thread, receive no more results, do something dramatic in UncaughtExceptionHandler. } } } } //create a daemon thread that long-polls for messages forever val pollingThread = new Thread(receiveMessageRunnable,"Receive message thread") pollingThread.setDaemon(true) //todo pollingThread.setUncaughtExceptionHandler() pollingThread.start() //todo maybe pass this in from outside and make the thing a case class - val queue = MessageQueueService.service.createQueueIfAbsent(ConfigSource.config.getString("shrine.humanReadableNodeName")) + val queue = MessageQueueService.service.createQueueIfAbsent(ConfigSource.config.getString("shrine.humanReadableNodeName")).get //todo better than get. handle errors val pollDuration = Duration("15 seconds") //todo from config lazy val config: Config = ConfigSource.config val shrineConfig = config.getConfig("shrine") val breakdownTypes: Set[ResultOutputType] = shrineConfig.getOptionConfigured("breakdownResultOutputTypes", ResultOutputTypes.fromConfig).getOrElse(Set.empty) def receiveAMessage(): Unit = { - val message: Option[Message] = MessageQueueService.service.receive(queue, pollDuration) //todo make this configurable (and testable) - message.foreach(interpretAMessage) + val maybeMessage: Try[Option[Message]] = MessageQueueService.service.receive(queue, pollDuration) //todo make this configurable (and testable) + maybeMessage.transform({m => + m.foreach(interpretAMessage) + Success(m) + },{x => + ProblemNotYetEncoded("Something went wrong during receive",x) //todo create full-up problem + Failure(x) + }) } val unit = () def interpretAMessage(message: Message):Unit = { Log.debug(s"Received a message from $queue") val xmlString = message.contents val rqrt: Try[RunQueryResponse] = RunQueryResponse.fromXmlString(breakdownTypes)(xmlString) rqrt.transform({ rqr => QepQueryDb.db.insertQueryResult(rqr.queryId, rqr.singleNodeResult) message.complete() Success(unit) },{ x => Failure(x) }) } //HornetQMomWebClient.send(rqr.toXml(),queue) } \ No newline at end of file diff --git a/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala b/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala index 57b953f7a..91231611a 100644 --- a/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala +++ b/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala @@ -1,156 +1,155 @@ package net.shrine.hornetqclient import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props} import net.shrine.log.Loggable -import net.shrine.messagequeueservice.{Message, MessageQueueService, MessageSerializer, Queue} +import net.shrine.messagequeueservice.{Message, MessageQueueService, MessageSerializer, Queue, QueueSerializer} import net.shrine.source.ConfigSource import org.json4s.native.Serialization import org.json4s.native.Serialization.{read, write} import org.json4s.{Formats, NoTypeHints} import spray.http.{HttpEntity, HttpMethods, HttpRequest, HttpResponse} import scala.collection.immutable.Seq import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration import scala.util.Try import scala.util.control.NonFatal /** * A simple HornetQMomWebClient that uses HornetQMomWebApi to createQueue, * deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * @author yifan * @since 8/10/17 */ object HornetQMomWebClient extends MessageQueueService with Loggable { // we need an ActorSystem to host our application in implicit val system: ActorSystem = ActorSystem("momServer", ConfigSource.config) // TODO in SHRINE-2167: Extract and share a SHRINE actor system // the service actor replies to incoming HttpRequests // implicit val serviceActor: ActorRef = startServiceActor() // def startActorSystem(): ActorSystem = try { // val actorSystem: ActorSystem = ActorSystem("momServer", ConfigSource.config) // info(s"Starting ActorSystem: ${actorSystem.name} for HornetQMomWebClient at time: ${actorSystem.startTime}") // actorSystem // } catch { // case NonFatal(x) => { // debug(s"NonFatalException thrown while starting ActorSystem for HornetQMomWebClient: ${x.getMessage}") // throw x // } // case x: ExceptionInInitializerError => { // debug(s"ExceptionInInitializerError thrown while starting ActorSystem for HornetQMomWebClient: ${x.getMessage}") // throw x // } // } // // def startServiceActor(): ActorRef = try { // // the service actor replies to incoming HttpRequests // val actor: ActorRef = system.actorOf(Props[HornetQMomWebClientServiceActor]) // info(s"Starting ServiceActor: ${actor.toString()} for HornetQMomWebClient") // actor // } // catch { // case NonFatal(x) => { // debug(s"NonFatalException thrown while starting ServiceActor for HornetQMomWebClient: ${x.getMessage}") // throw x // } // case x: ExceptionInInitializerError => { // debug(s"ExceptionInInitializerError thrown while starting ServiceActor for HornetQMomWebClient: ${x.getMessage}") // throw x // } // } val momUrl: String = ConfigSource.config.getString("shrine.messagequeue.hornetq.serverUrl") - override def createQueueIfAbsent(queueName: String): Queue = { - val createQueueUrl = momUrl + s"/createQueue/$queueName" + override def createQueueIfAbsent(queueName: String): Try[Queue] = { + val proposedQueue: Queue = Queue(queueName) + val createQueueUrl = momUrl + s"/createQueue/${proposedQueue.name}" val request: HttpRequest = HttpRequest(HttpMethods.PUT, createQueueUrl) - val tryQueue: Try[Queue] = for { + for { response: HttpResponse <- Try(HttpClient.webApiCall(request)) queue: Queue <- Try { val queueString = response.entity.asString - implicit val formats = Serialization.formats(NoTypeHints) + implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer read[Queue](queueString)(formats, manifest[Queue]) } } yield queue - tryQueue.get } - override def deleteQueue(queueName: String): Unit = { - val deleteQueueUrl = momUrl + s"/deleteQueue/$queueName" - val request: HttpRequest = HttpRequest(HttpMethods.DELETE, deleteQueueUrl) - for { - response <- Try(HttpClient.webApiCall(request)) // StatusCodes.OK - } yield response + override def deleteQueue(queueName: String): Try[Unit] = { + val proposedQueue: Queue = Queue(queueName) + val deleteQueueUrl = momUrl + s"/deleteQueue/${proposedQueue.name}" + val request: HttpRequest = HttpRequest(HttpMethods.PUT, deleteQueueUrl) + Try(HttpClient.webApiCall(request)) // StatusCodes.OK } - override def queues: Seq[Queue] = { + override def queues: Try[Seq[Queue]] = { val getQueuesUrl = momUrl + s"/getQueues" val request: HttpRequest = HttpRequest(HttpMethods.GET, getQueuesUrl) - val tryQueues: Try[Seq[Queue]] = for { + for { response: HttpResponse <- Try(HttpClient.webApiCall(request)) allQueues: Seq[Queue] <- Try { val allQueues: String = response.entity.asString - implicit val formats = Serialization.formats(NoTypeHints) + implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer read[Seq[Queue]](allQueues)(formats, manifest[Seq[Queue]]) } } yield allQueues - tryQueues.get } - override def send(contents: String, to: Queue): Unit = { - val sendMessageUrl = momUrl + s"/sendMessage/$contents/${to.name}" - val request: HttpRequest = HttpRequest(HttpMethods.PUT, sendMessageUrl) + override def send(contents: String, to: Queue): Try[Unit] = { + val sendMessageUrl = momUrl + s"/sendMessage/${to.name}" + val request: HttpRequest = HttpRequest( + method = HttpMethods.PUT, + uri = sendMessageUrl, + entity = HttpEntity(contents) //todo set contents as XML or json + ) for { response: HttpResponse <- Try(HttpClient.webApiCall(request)) } yield response } - override def receive(from: Queue, timeout: Duration): Option[Message] = { + override def receive(from: Queue, timeout: Duration): Try[Option[Message]] = { val seconds = timeout.toSeconds val receiveMessageUrl = momUrl + s"/receiveMessage/${from.name}?timeOutSeconds=$seconds" val request: HttpRequest = HttpRequest(HttpMethods.GET, receiveMessageUrl) - val tryReceive: Try[Option[Message]] = for { + for { response: HttpResponse <- Try(HttpClient.webApiCall(request)) - messageResponse: Option[Message] = { - val responseString: String = response.entity.asString - implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer - val messageResponse: Message = read[Message](responseString)(formats, manifest[Message]) - Option(messageResponse) - } + responseString: String <- Try { response.entity.asString } + formats <- Try { Serialization.formats(NoTypeHints) + new MessageSerializer} + messageResponse: Message <- Try { read[Message](responseString)(formats, manifest[Message]) } + messageResponse: Option[Message] <- Try { Option(messageResponse) } } yield messageResponse - tryReceive.get } - override def completeMessage(message: Message): Unit = { + override def completeMessage(message: Message): Try[Unit] = { implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer val messageString: String = write[Message](message)(formats) val entity: HttpEntity = HttpEntity(messageString) val completeMessageUrl: String = momUrl + s"/acknowledge" // HttpEntity val request: HttpRequest = HttpRequest(HttpMethods.PUT, completeMessageUrl).withEntity(entity) for { response: HttpResponse <- Try(HttpClient.webApiCall(request)) } yield response } } // TODO in SHRINE-2167: Extract and share a SHRINE actor system //class HornetQMomWebClientServiceActor extends Actor with MetaDataService { // // // the HttpService trait defines only one abstract member, which // // connects the services environment to the enclosing actor or test // def actorRefFactory: ActorRefFactory = context // // // this actor only runs our route, but you could add // // other things here, like request stream processing // // or timeout handling // def receive: Receive = runRoute(route) // // override implicit val ec: ExecutionContext = ExecutionContext.Implicits.global // // override def system: ActorSystem = context.system //} \ No newline at end of file diff --git a/messagequeue/hornetqclient/src/test/hornetQMomClientTest/HornetQMomWebClientTest.scala b/messagequeue/hornetqclient/src/test/hornetQMomClientTest/HornetQMomWebClientTest.scala index a50f6aa7e..c8c46d198 100644 --- a/messagequeue/hornetqclient/src/test/hornetQMomClientTest/HornetQMomWebClientTest.scala +++ b/messagequeue/hornetqclient/src/test/hornetQMomClientTest/HornetQMomWebClientTest.scala @@ -1,26 +1,28 @@ /** * A simple scala script that test HornetQMomWedClient * * Load the file in scala REPL to test HornetQMomWebClient * * Created by yifan on 8/14/17. */ // To Run the test, simply start the scala REPL and load the file // Enter the following commands using the command line // #1. Start scala REPL with 2.11.8 version // mvn -Dscala-version="2.11.8" scala:console // #2. Load file in REPL: // :load import net.shrine.hornetqclient.HornetQMomWebClient -val firstQueue = HornetQMomWebClient.createQueueIfAbsent("q1") +import net.shrine.messagequeueservice.{Message, Queue} +import scala.collection.immutable.Seq +val firstQueue: Queue = HornetQMomWebClient.createQueueIfAbsent("firstQueue").get HornetQMomWebClient.send("firstMessage", firstQueue) import scala.concurrent.duration.Duration -val firstDuration = Duration.create(1, "seconds") -val receivedMsg = HornetQMomWebClient.receive(firstQueue, firstDuration) -val msg = receivedMsg.get -val allQueues = HornetQMomWebClient.queues +val firstDuration: Duration = Duration.create(1, "seconds") +val receivedMsg: Option[Message] = HornetQMomWebClient.receive(firstQueue, firstDuration).get +val msg: Message = receivedMsg.get +val allQueues: Seq[Queue] = HornetQMomWebClient.queues.get HornetQMomWebClient.completeMessage(msg) HornetQMomWebClient.deleteQueue("q1") \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala index 4d3385cef..8884288b4 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala @@ -1,112 +1,171 @@ package net.shrine.hornetqmom import akka.event.Logging import net.shrine.log.Loggable -import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue} +import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue, QueueSerializer} +import net.shrine.problem.{AbstractProblem, ProblemSources} import org.json4s.native.Serialization import org.json4s.native.Serialization.{read, write} import org.json4s.{Formats, NoTypeHints} import spray.http.StatusCodes import spray.routing.directives.LogEntry import spray.routing.{HttpService, Route} import scala.collection.immutable.Seq import scala.concurrent.duration.Duration -import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} /** * A web API that provides access to the internal HornetQMom library. * Allows client to createQueue, deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * Created by yifan on 7/24/17. */ trait HornetQMomWebApi extends HttpService with Loggable { def momRoute: Route = pathPrefix("mom") { - put { - createQueue ~ - sendMessage ~ - acknowledge - } ~ receiveMessage ~ getQueues ~ deleteQueue + put { + createQueue ~ + sendMessage ~ + acknowledge + } ~ receiveMessage ~ getQueues ~ deleteQueue } // SQS returns CreateQueueResult, which contains queueUrl: String def createQueue: Route = path("createQueue" / Segment) { queueName => detach() { - val createdQueue: Queue = LocalHornetQMom.createQueueIfAbsent(queueName) - implicit val formats = Serialization.formats(NoTypeHints) - val response: String = write[Queue](createdQueue)(formats) - respondWithStatus(StatusCodes.Created) { - complete(response) + val createdQueueTry: Try[Queue] = LocalHornetQMom.createQueueIfAbsent(queueName) + createdQueueTry match { + case Success(queue) => { + implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer + val response: String = write[Queue](queue)(formats) + respondWithStatus(StatusCodes.Created) { + complete(response) + } + } + case Failure(x) => { + internalServerErrorOccured(x, "createQueue") + } } } - } + } // SQS takes in DeleteMessageRequest, which contains a queueUrl: String and a ReceiptHandle: String // returns a DeleteMessageResult, toString for debugging def deleteQueue: Route = path("deleteQueue" / Segment) { queueName => - delete { + put { detach() { - LocalHornetQMom.deleteQueue(queueName) - complete(StatusCodes.OK) + val deleteQueueTry: Try[Unit] = LocalHornetQMom.deleteQueue(queueName) + deleteQueueTry match { + case Success(v) => { + complete(StatusCodes.OK) + } + case Failure(x) => { + internalServerErrorOccured(x, "deleteQueue") + } + } } } } // SQS sendMessage(String queueUrl, String messageBody) => SendMessageResult - def sendMessage: Route = path("sendMessage" / Segment / Segment) { (messageContent, toQueue) => - detach() { - LocalHornetQMom.send(messageContent, Queue.apply(toQueue)) - complete(StatusCodes.Accepted) + def sendMessage: Route = path("sendMessage" / Segment) { toQueue => + requestInstance { request => + val messageContent = request.entity.asString + detach() { + val sendTry: Try[Unit] = LocalHornetQMom.send(messageContent, Queue(toQueue)) + sendTry match { + case Success(v) => { + complete(StatusCodes.Accepted) + } + case Failure(x) => { + internalServerErrorOccured(x, "sendMessage") + } + } + } } } // SQS ReceiveMessageResult receiveMessage(String queueUrl) def receiveMessage: Route = get { path("receiveMessage" / Segment) { fromQueue => parameter('timeOutSeconds ? 20) { timeOutSeconds => val timeout: Duration = Duration.create(timeOutSeconds, "seconds") detach() { - val response: Option[Message] = LocalHornetQMom.receive(Queue.apply(fromQueue), timeout) - implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer - response.fold(complete(StatusCodes.NotFound))(msg => complete(write(response)(formats))) + val receiveTry: Try[Option[Message]] = LocalHornetQMom.receive(Queue(fromQueue), timeout) + receiveTry match { + case Success(optMessage) => { + implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer + optMessage.fold(complete(StatusCodes.NotFound))(msg => complete(write(optMessage)(formats))) + } + case Failure(x) => { + internalServerErrorOccured(x, "receiveMessage") + } + } } } } } // SQS has DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) def acknowledge: Route = path("acknowledge") { entity(as[String]) { messageJSON => implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer detach() { - try { - val msg: Message = read[Message](messageJSON)(formats, manifest[Message]) - LocalHornetQMom.completeMessage(msg) - complete(StatusCodes.ResetContent) - } catch { - case NonFatal(x) => { - LogEntry(s"\n Request: acknowledge/$messageJSON\n Response: $x", Logging.DebugLevel) - complete(StatusCodes.BadRequest) + val msg: Message = read[Message](messageJSON)(formats, manifest[Message]) + val acknowledgeTry: Try[Unit] = LocalHornetQMom.completeMessage(msg) + acknowledgeTry match { + case Success(v) => { + complete(StatusCodes.ResetContent) + } + case Failure(x) => { + internalServerErrorOccured(x, "acknowledge") } } } } } // Returns the names of the queues created on this server. Seq[Any] def getQueues: Route = path("getQueues") { get { detach() { - implicit val formats = Serialization.formats(NoTypeHints) + implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer respondWithStatus(StatusCodes.OK) { - complete(write[Seq[Queue]](LocalHornetQMom.queues)(formats)) + val getQueuesTry: Try[Seq[Queue]] = LocalHornetQMom.queues + getQueuesTry match { + case Success(seqQueue) => { + complete(write[Seq[Queue]](LocalHornetQMom.queues.get)(formats)) + } + case Failure(x) => { + internalServerErrorOccured(x, "getQueues") + } + } } } } } -} \ No newline at end of file + + def internalServerErrorOccured(x: Throwable, function: String): Route = { + respondWithStatus(StatusCodes.InternalServerError) { + val serverErrorProblem: HornetQMomServerErrorProblem = HornetQMomServerErrorProblem(x, function) + debug(s"HornetQ encountered a Problem during $function, Problem Details: $serverErrorProblem") + complete(s"HornetQ throws an exception while trying to $function. HornetQ Server response: ${x.getMessage} from ${x.getClass}") + } + } + +} + + + +case class HornetQMomServerErrorProblem(x:Throwable, function:String) extends AbstractProblem(ProblemSources.Adapter) { + + override val throwable = Some(x) + override val summary: String = "SHRINE cannot use HornetQMomWebApi due to a server error occurred in hornetQ." + override val description: String = s"HornetQ throws an exception while trying to $function," + + s" the server's response is: ${x.getMessage} from ${x.getClass}." +} diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala index 19d6d8641..8a0d08319 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala @@ -1,145 +1,179 @@ package net.shrine.hornetqmom import com.typesafe.config.Config -import net.shrine.messagequeueservice.{Message, MessageQueueService, Queue} +import net.shrine.messagequeueservice.{Message, MessageQueueService, NoSuchQueueExistsInHornetQ, Queue} import net.shrine.source.ConfigSource -import org.hornetq.api.core.client.{ClientConsumer, ClientMessage, ClientSession, ClientSessionFactory, HornetQClient, ServerLocator} +import org.hornetq.api.core.TransportConfiguration +import org.hornetq.api.core.client.{ClientConsumer, ClientMessage, ClientProducer, ClientSession, ClientSessionFactory, HornetQClient, ServerLocator} import org.hornetq.api.core.management.HornetQServerControl -import org.hornetq.api.core.{HornetQQueueExistsException, TransportConfiguration} import org.hornetq.core.config.impl.ConfigurationImpl import org.hornetq.core.remoting.impl.invm.{InVMAcceptorFactory, InVMConnectorFactory} import org.hornetq.core.server.{HornetQServer, HornetQServers} +import scala.collection.concurrent.{TrieMap, Map => ConcurrentMap} import scala.collection.immutable.Seq import scala.concurrent.blocking import scala.concurrent.duration.Duration -import scala.collection.concurrent.{TrieMap, Map => ConcurrentMap} +import scala.util.Try /** * This object is the local version of the Message-Oriented Middleware API, which uses HornetQ service * @author david * @since 7/18/17 */ object LocalHornetQMom extends MessageQueueService { - val config:Config = ConfigSource.config.getConfig("shrine.messagequeue.hornetq") + val config: Config = ConfigSource.config.getConfig("shrine.messagequeue.hornetq") // todo use the config to set everything needed here that isn't hard-coded. val hornetQConfiguration = new ConfigurationImpl() // todo from config? What is the journal file about? If temporary, use a Java temp file. hornetQConfiguration.setJournalDirectory("target/data/journal") // todo want this. There are likely many other config bits hornetQConfiguration.setPersistenceEnabled(false) // todo maybe want this hornetQConfiguration.setSecurityEnabled(false) // todo probably just want the InVM version, but need to read up on options hornetQConfiguration.getAcceptorConfigurations.add(new TransportConfiguration(classOf[InVMAcceptorFactory].getName)) // Create and start the server val hornetQServer: HornetQServer = HornetQServers.newHornetQServer(hornetQConfiguration) hornetQServer.start() val serverLocator: ServerLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(classOf[InVMConnectorFactory].getName)) val sessionFactory: ClientSessionFactory = serverLocator.createSessionFactory() //arguments are boolean xa, boolean autoCommitSends, boolean autoCommitAcks . - val session: ClientSession = sessionFactory.createSession(false,true,true) + val session: ClientSession = sessionFactory.createSession(false, true, true) session.start() //keep a map of live queues to ClientConsumers to provide a path for completing messages - val queuesToConsumers:ConcurrentMap[Queue, ClientConsumer] = TrieMap.empty + val queuesToConsumers: ConcurrentMap[Queue, ClientConsumer] = TrieMap.empty val propName = "contents" /** * Use HornetQMomStopper to stop the hornetQServer without unintentially starting it */ // todo drop this into a try private[hornetqmom] def stop() = { queuesToConsumers.values.foreach(_.close()) session.close() sessionFactory.close() hornetQServer.stop() } //queue lifecycle - def createQueueIfAbsent(queueName:String):Queue = { - val serverControl: HornetQServerControl = hornetQServer.getHornetQServerControl - if(!queues.map(_.name).contains(queueName)) { - try serverControl.createQueue(queueName, queueName, true) - catch { - case alreadyExists: HornetQQueueExistsException => //Already have what we want. Something slipped in between checking queues for this queue and creating it. + def createQueueIfAbsent(queueName: String): Try[Queue] = { + val proposedQueue: Queue = Queue(queueName) + for { + serverControl: HornetQServerControl <- Try{ hornetQServer.getHornetQServerControl } + createQueueInHornetQ <- Try { + if (!this.queues.get.map(_.name).contains(proposedQueue.name)) { + serverControl.createQueue(proposedQueue.name, proposedQueue.name, true) + } } - } - val queue = Queue(queueName) - queuesToConsumers.getOrElseUpdate(queue,{session.createConsumer(queue.name)}) - queue + useNewQueueToUpdateMapTry: Queue <- Try { + queuesToConsumers.getOrElseUpdate(proposedQueue, { session.createConsumer(proposedQueue.name) }) + proposedQueue + } + } yield useNewQueueToUpdateMapTry } - def deleteQueue(queueName:String) = { - queuesToConsumers.remove(Queue(queueName)).foreach(_.close()) - - val serverControl: HornetQServerControl = hornetQServer.getHornetQServerControl - serverControl.destroyQueue(queueName) + def deleteQueue(queueName: String): Try[Unit] = { + val proposedQueue: Queue = Queue(queueName) + for { + deleteTry <- Try { + queuesToConsumers.remove(proposedQueue).foreach(_.close()) + val serverControl: HornetQServerControl = hornetQServer.getHornetQServerControl + serverControl.destroyQueue(proposedQueue.name) + } + } yield deleteTry } - override def queues:Seq[Queue] = { - val serverControl: HornetQServerControl = hornetQServer.getHornetQServerControl - val queueNames: Array[String] = serverControl.getQueueNames - queueNames.map(Queue(_)).to[Seq] + override def queues: Try[Seq[Queue]] = { + for { + hornetQTry: HornetQServerControl <- Try { + hornetQServer.getHornetQServerControl + } + getQueuesTry: Seq[Queue] <- Try { + val queueNames: Array[String] = hornetQTry.getQueueNames + queueNames.map(Queue(_)).to[Seq] + } + } yield getQueuesTry } //send a message - def send(contents:String,to:Queue):Unit = { - val producer = session.createProducer(to.name) - try { - val message = session.createMessage(true) - message.putStringProperty(propName, contents) - - producer.send(message) - } finally { - producer.close() - } + def send(contents: String, to: Queue): Try[Unit] = { + for { + sendTry <- Try { + // check if the queue exists first + if (!this.queues.get.map(_.name).contains(to.name)) { + throw NoSuchQueueExistsInHornetQ(to) + } + } + producer: ClientProducer <- Try{ session.createProducer(to.name) } + message <- Try{ session.createMessage(true) } + constructMessage <- Try { message.putStringProperty(propName, contents) } + sendMessage <- Try { + producer.send(message) + producer.close() + } + } yield sendMessage } //receive a message /** * Always do AWS SQS-style long polling. * Be sure your code can handle receiving the same message twice. * * @return Some message before the timeout, or None */ - def receive(from:Queue,timeout:Duration):Option[Message] = { - // todo check if queue exists, if not throw exception - val messageConsumer: ClientConsumer = queuesToConsumers(from) //todo handle the case where either stop or close has been called on something gracefully - blocking { - val messageReceived: Option[ClientMessage] = Option(messageConsumer.receive(timeout.toMillis)) - val message = messageReceived.map(Message(_)) - - message - } + def receive(from: Queue, timeout: Duration): Try[Option[Message]] = { + for { + //todo handle the case where either stop or close has been called on something gracefully + messageConsumer: ClientConsumer <- Try { + if (!queuesToConsumers.contains(from)) { + throw new NoSuchElementException(s"Given Queue ${from.name} does not exist in HornetQ server! Please create the queue first!") + } + queuesToConsumers(from) + } + message: Option[Message] <- Try { + blocking { + val messageReceived: Option[ClientMessage] = Option(messageConsumer.receive(timeout.toMillis)) + messageReceived.map(Message(_)) + } + } + } yield message } //todo dead letter queue for all messages. See http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs-dead-letter-queues.html //complete a message - //todo better here or on the message itself?? - override def completeMessage(message:Message):Unit = message.complete() + override def completeMessage(message: Message): Try[Unit] = { + for { + completeMessageTry <- Try { message.complete() } + } yield completeMessageTry + } + } /** * If the configuration is such that HornetQ should have been started use this object to stop it */ //todo is this a good way to write this code? object LocalHornetQMomStopper { - def stop() = { + def stop(): Unit = { //todo fill in as part of SHIRINE-2128 - val config: Config = ConfigSource.config.getConfig("shrine.messagequeue.hornetq") - + for { + config: Config <- Try { + ConfigSource.config.getConfig("shrine.messagequeue.hornetq") + } + } yield config LocalHornetQMom.stop() } } \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala index 222bcb32d..e9ebd6eef 100644 --- a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala +++ b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala @@ -1,76 +1,105 @@ package net.shrine.hornetqmom import akka.actor.ActorRefFactory -import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue} +import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue, QueueSerializer} import org.json4s.NoTypeHints import org.json4s.native.Serialization import org.json4s.native.Serialization.read import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import spray.http.HttpEntity import spray.http.StatusCodes._ import spray.testkit.ScalatestRouteTest import scala.collection.immutable.Seq /** * Created by yifan on 7/27/17. */ @RunWith(classOf[JUnitRunner]) class HornetQMomWebApiTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi { override def actorRefFactory: ActorRefFactory = system - private val queueName = "testQueue" - private val messageContent = "testContent" + private val proposedQueueName = "test Queue" + private val queue: Queue = Queue(proposedQueueName) + private val queueName: String = queue.name // "testQueue" + private val messageContent = "test Content" private var receivedMessage: String = "" "HornetQMomWebApi" should "create/delete the given queue, send/receive message, get queues" in { Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { val response = new String(body.data.toByteArray) - implicit val formats = Serialization.formats(NoTypeHints) + implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) val responseQueueName = jsonToQueue.name assertResult(Created)(status) assertResult(queueName)(responseQueueName) } - Put(s"/mom/sendMessage/$messageContent/$queueName") ~> momRoute ~> check { + // should be OK to create a queue twice + Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { + val response = new String(body.data.toByteArray) + implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer + val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) + val responseQueueName = jsonToQueue.name + assertResult(Created)(status) + assertResult(queueName)(responseQueueName) + } + + Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { assertResult(Accepted)(status) } Get(s"/mom/getQueues") ~> momRoute ~> check { - implicit val formats = Serialization.formats(NoTypeHints) + implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer val response: String = new String(body.data.toByteArray) val jsonToSeq: Seq[Queue] = read[Seq[Queue]](response, false)(formats, manifest[Seq[Queue]]) assertResult(OK)(status) assertResult(queueName)(jsonToSeq.head.name) } // given timeout is 2 seconds Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=2") ~> momRoute ~> check { val response = new String(body.data.toByteArray) receivedMessage = response implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer val responseToMessage: Message = read[Message](response)(formats, manifest[Message]) assertResult(OK)(status) assert(responseToMessage.isInstanceOf[Message]) } - Put("/mom/acknowledge", HttpEntity(s"""$receivedMessage""")) ~> + Put("/mom/acknowledge", HttpEntity(s"$receivedMessage")) ~> momRoute ~> check { implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer assertResult(ResetContent)(status) } - Delete(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { + Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { assertResult(OK)(status) } } + + "HornetQMomWebApi" should "respond Internal server error with the corresponding error message when " + + "failures occur while creating/deleting the given queue, sending/receiving message, getting queues" in { + + Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { + assertResult(InternalServerError)(status) + } + + Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { + assertResult(InternalServerError)(status) + } + + // given timeout is 1 seconds + Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=1") ~> momRoute ~> check { + assertResult(InternalServerError)(status) + } + } } \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/LocalHornetQMomTest.scala b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/LocalHornetQMomTest.scala index 813c167e7..5834fdfe6 100644 --- a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/LocalHornetQMomTest.scala +++ b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/LocalHornetQMomTest.scala @@ -1,108 +1,156 @@ package net.shrine.hornetqmom -import net.shrine.messagequeueservice.Message +import net.shrine.messagequeueservice.{Message, Queue} import org.junit.runner.RunWith import org.scalatest.concurrent.ScalaFutures import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import scala.collection.immutable.Seq import scala.concurrent.duration._ import scala.language.postfixOps /** * Test create, delete queue, send, and receive message, getQueueNames, and acknoledge using HornetQ service */ @RunWith(classOf[JUnitRunner]) class LocalHornetQMomTest extends FlatSpec with BeforeAndAfterAll with ScalaFutures with Matchers { "HornetQ" should "be able to send and receive just one message" in { val queueName = "testQueue" - assert(LocalHornetQMom.queues.isEmpty) + assert(LocalHornetQMom.queues.get.isEmpty) - val queue = LocalHornetQMom.createQueueIfAbsent(queueName) + val queue = LocalHornetQMom.createQueueIfAbsent(queueName).get - assert(LocalHornetQMom.queues == Seq(queue)) + assert(LocalHornetQMom.queues.get == Seq(queue)) val testContents = "Test message" - LocalHornetQMom.send(testContents,queue) + val sendTry = LocalHornetQMom.send(testContents, queue) + assert(sendTry.isSuccess) - val message: Option[Message] = LocalHornetQMom.receive(queue,1 second) + val message: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(message.isDefined) assert(message.get.contents == testContents) LocalHornetQMom.completeMessage(message.get) - val shouldBeNoMessage: Option[Message] = LocalHornetQMom.receive(queue,1 second) + val shouldBeNoMessage: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(shouldBeNoMessage.isEmpty) - LocalHornetQMom.deleteQueue(queueName) - assert(LocalHornetQMom.queues.isEmpty) + val deleteTry = LocalHornetQMom.deleteQueue(queueName) + assert(deleteTry.isSuccess) + assert(LocalHornetQMom.queues.get.isEmpty) } "HornetQ" should "be able to send and receive a few messages" in { val queueName = "testQueue" - assert(LocalHornetQMom.queues.isEmpty) + assert(LocalHornetQMom.queues.get.isEmpty) - val queue = LocalHornetQMom.createQueueIfAbsent(queueName) + val queue = LocalHornetQMom.createQueueIfAbsent(queueName).get - assert(LocalHornetQMom.queues == Seq(queue)) + assert(LocalHornetQMom.queues.get == Seq(queue)) val testContents1 = "Test message1" - LocalHornetQMom.send(testContents1,queue) + val sendTry = LocalHornetQMom.send(testContents1,queue) + assert(sendTry.isSuccess) - val message1: Option[Message] = LocalHornetQMom.receive(queue,1 second) + val message1: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(message1.isDefined) assert(message1.get.contents == testContents1) - LocalHornetQMom.completeMessage(message1.get) + val completeTry = LocalHornetQMom.completeMessage(message1.get) + assert(completeTry.isSuccess) - val shouldBeNoMessage1: Option[Message] = LocalHornetQMom.receive(queue,1 second) + val shouldBeNoMessage1: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(shouldBeNoMessage1.isEmpty) val testContents2 = "Test message2" - LocalHornetQMom.send(testContents2,queue) + val sendTry2 = LocalHornetQMom.send(testContents2,queue) + assert(sendTry2.isSuccess) val testContents3 = "Test message3" - LocalHornetQMom.send(testContents3,queue) + val sendTry3 = LocalHornetQMom.send(testContents3,queue) + assert(sendTry3.isSuccess) - val message2: Option[Message] = LocalHornetQMom.receive(queue,1 second) + val message2: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(message2.isDefined) assert(message2.get.contents == testContents2) - LocalHornetQMom.completeMessage(message2.get) + val completeTry2 = LocalHornetQMom.completeMessage(message2.get) + assert(completeTry2.isSuccess) - val message3: Option[Message] = LocalHornetQMom.receive(queue,1 second) + val message3: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(message3.isDefined) assert(message3.get.contents == testContents3) - LocalHornetQMom.completeMessage(message3.get) + val completeTry3 = LocalHornetQMom.completeMessage(message3.get) + assert(completeTry3.isSuccess) - val shouldBeNoMessage4: Option[Message] = LocalHornetQMom.receive(queue,1 second) + val shouldBeNoMessage4: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(shouldBeNoMessage4.isEmpty) - LocalHornetQMom.deleteQueue(queueName) - assert(LocalHornetQMom.queues.isEmpty) + val deleteTry = LocalHornetQMom.deleteQueue(queueName) + assert(deleteTry.isSuccess) + assert(LocalHornetQMom.queues.get.isEmpty) } "HornetQ" should "be OK if asked to create the same queue twice " in { val queueName = "testQueue" - LocalHornetQMom.createQueueIfAbsent(queueName) - LocalHornetQMom.createQueueIfAbsent(queueName) + val queue = LocalHornetQMom.createQueueIfAbsent(queueName) + assert(queue.isSuccess) + val sameQueue = LocalHornetQMom.createQueueIfAbsent(queueName) + assert(sameQueue.isSuccess) + + assert(LocalHornetQMom.queues.get == Seq(sameQueue.get)) + val deleteTry = LocalHornetQMom.deleteQueue(queueName) + assert(deleteTry.isSuccess) + assert(LocalHornetQMom.queues.get.isEmpty) + } + + "HornetQ" should "return a failure if deleting a non-existing queue" in { + + val queueName = "testQueue" + val deleteQueue = LocalHornetQMom.deleteQueue(queueName) + assert(deleteQueue.isFailure) + } - LocalHornetQMom.deleteQueue(queueName) + "HornetQ" should "return a failure if sending message to a non-existing queue" in { + + val queueName = "non-existingQueue" + val sendTry = LocalHornetQMom.send("testContent", Queue(queueName)) + assert(sendTry.isFailure) } + "HornetQ" should "return a failure if receiving a message to a non-existing queue" in { + + val queueName = "non-existingQueue" + val receiveTry = LocalHornetQMom.receive(Queue(queueName), Duration(1, "second")) + assert(receiveTry.isFailure) + } + + "HornetQ" should "be able to filter the special characters in queue name" in { + + val queueName = "test# Qu%eueFilter" + + assert(LocalHornetQMom.queues.get.isEmpty) + + val queue = LocalHornetQMom.createQueueIfAbsent(queueName).get + + assert(LocalHornetQMom.queues.get == Seq(queue)) + LocalHornetQMom.deleteQueue(queue.name) + } + + override def afterAll() = LocalHornetQMomStopper.stop() } \ No newline at end of file diff --git a/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala index a4cc9cbe4..f707d3fbe 100644 --- a/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala +++ b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala @@ -1,77 +1,103 @@ package net.shrine.messagequeueservice import net.shrine.source.ConfigSource import net.shrine.spray.DefaultJsonSupport import org.hornetq.api.core.client.ClientMessage import org.hornetq.core.client.impl.ClientMessageImpl import org.json4s.JsonAST.{JField, JObject} import org.json4s.{CustomSerializer, DefaultFormats, Formats, _} import scala.collection.immutable.Seq import scala.concurrent.duration.Duration +import scala.util.Try /** * This object mostly imitates AWS SQS' API via an embedded HornetQ. See http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs.html * * @author david * @since 7/18/17 */ //todo in 1.23 all but the server side will use the client RemoteHornetQ implementation (which will call to the server at the hub) //todo in 1.24, create an AwsSqs implementation of the trait trait MessageQueueService { - def createQueueIfAbsent(queueName:String):Queue - def deleteQueue(queueName:String) - def queues:Seq[Queue] - def send(contents:String,to:Queue):Unit - def receive(from:Queue,timeout:Duration):Option[Message] - def completeMessage(message:Message):Unit + def createQueueIfAbsent(queueName:String): Try[Queue] + def deleteQueue(queueName:String): Try[Unit] + def queues: Try[Seq[Queue]] + def send(contents:String,to:Queue): Try[Unit] + def receive(from:Queue,timeout:Duration): Try[Option[Message]] + def completeMessage(message:Message): Try[Unit] } object MessageQueueService { lazy val service:MessageQueueService = { import scala.reflect.runtime.universe.runtimeMirror val momClassName = ConfigSource.config.getString("shrine.messagequeue.implementation") val classLoaderMirror = runtimeMirror(getClass.getClassLoader) val module = classLoaderMirror.staticModule(momClassName) classLoaderMirror.reflectModule(module).instance.asInstanceOf[MessageQueueService] } } case class Message(hornetQMessage:ClientMessage) extends DefaultJsonSupport { override implicit def json4sFormats: Formats = DefaultFormats val propName = "contents" def getClientMessage = hornetQMessage def contents = hornetQMessage.getStringProperty(propName) def getMessageID = hornetQMessage.getMessageID def complete() = hornetQMessage.acknowledge() } -case class Queue(name:String) extends DefaultJsonSupport +case class Queue(var name:String) extends DefaultJsonSupport { + // filter all (Unicode) characters that are not letters + // filter neither letters nor (decimal) digits, replaceAll("[^\\p{L}]+", "") + name = name.filterNot(c => c.isWhitespace).replaceAll("[^\\p{L}\\p{Nd}]+", "") + if (name.length == 0) { + throw new IllegalArgumentException("ERROR: A valid Queue name must contain at least one letter!") + } +} + +class QueueSerializer extends CustomSerializer[Queue](format => ( + { + case JObject(JField("name", JString(s)) :: Nil) => Queue(s) + }, + { + case queue: Queue => + JObject(JField("name", JString(queue.name)) :: Nil) + } +)) class MessageSerializer extends CustomSerializer[Message](format => ( { //JObject(List((hornetQMessage,JObject(List((type,JInt(0)), (durable,JBool(false)), (expiration,JInt(0)), (timestamp,JInt(1502218873012)), (priority,JInt(4))))))) // type, durable, expiration, timestamp, priority, initialMessageBufferSize case JObject(JField("hornetQMessage", JObject(JField("type", JInt(s)) :: JField("durable", JBool(d)) :: JField("expiration", JInt(e)) :: JField("timestamp", JInt(t)) :: JField("priority", JInt(p)) :: Nil)) :: Nil) => new Message(new ClientMessageImpl(s.toByte, d, e.toLong, t.toLong, p.toByte, 0)) }, { case msg: Message => JObject(JField("hornetQMessage", JObject(JField("type", JLong(msg.getClientMessage.getType)) :: JField("durable", JBool(msg.getClientMessage.isDurable)) :: JField("expiration", JLong(msg.getClientMessage.getExpiration)) :: JField("timestamp", JLong(msg.getClientMessage.getTimestamp)) :: JField("priority", JLong(msg.getClientMessage.getPriority)) :: Nil)) :: Nil) } )) -// todo test MessageSerializer \ No newline at end of file +// todo test MessageSerializer + + +case class NoSuchQueueExistsInHornetQ(proposedQueue: Queue) extends Exception { + override def getMessage: String = { + s"Given Queue ${proposedQueue.name} does not exist in HornetQ server! Please create the queue first!" + } + +}