diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala index 761e45e9c..686780294 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala @@ -1,320 +1,321 @@ package net.shrine.hornetqmom import java.util.UUID import net.shrine.config.ConfigExtensions import net.shrine.log.{Log, Loggable} import net.shrine.messagequeueservice.{Message, Queue} import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.source.ConfigSource import org.json4s.native.Serialization import org.json4s.native.Serialization.write import org.json4s.{NoTypeHints, ShortTypeHints} import spray.http.StatusCodes import spray.routing.{HttpService, Route} import scala.collection.concurrent.TrieMap import scala.collection.immutable.Seq import scala.concurrent.duration.Duration import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} /** * A web API that provides access to the internal HornetQMom library. * Allows client to createQueue, deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * Created by yifan on 7/24/17. */ trait HornetQMomWebApi extends HttpService with Loggable { def enabled: Boolean = ConfigSource.config.getBoolean("shrine.messagequeue.hornetQWebApi.enabled") val warningMessage: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" if(!enabled) { //todo this is where the enabled check goes // this is not a problem for any downstream nodes. Only a problem for hubs in 1.23, and it will be obvious // val configProblem: CannotUseHornetQMomWebApiProblem = CannotUseHornetQMomWebApiProblem(new UnsupportedOperationException) debug(s"HornetQMomWebApi is not enabled.") } // keep a map of messages and ids private val idToMessages: TrieMap[UUID, (Message, Long)] = TrieMap.empty case class MapSentinelRunner(timeOutInMillis: Long) extends Runnable { // watches the map override def run(): Unit = { val currentTimeInMillis = System.currentTimeMillis() try { Log.debug("About to clean up outstanding messages.") idToMessages.retain({ (uuid, localHornetQMessageAndCreatedTime) => (currentTimeInMillis - localHornetQMessageAndCreatedTime._2) < timeOutInMillis }) Log.debug(s"Outstanding messages that exceed $timeOutInMillis milliseconds have been cleaned from the map.") } catch { case NonFatal(x) => ExceptionWhileCleaningUpMessageProblem(timeOutInMillis, x) //pass-through to blow up the thread, receive no more results, do something dramatic in UncaughtExceptionHandler. case x => Log.error("Fatal exception while cleaning up outstanding messages", x) throw x } } } def momRoute: Route = pathPrefix("mom") { if (!enabled) { respondWithStatus(StatusCodes.NotFound) { complete(warningMessage) } } else { put { createQueue ~ sendMessage ~ acknowledge } ~ receiveMessage ~ getQueues ~ deleteQueue } } // SQS returns CreateQueueResult, which contains queueUrl: String def createQueue: Route = path("createQueue" / Segment) { queueName => detach() { val createdQueueTry: Try[Queue] = LocalHornetQMom.createQueueIfAbsent(queueName) createdQueueTry match { case Success(queue) => { implicit val formats = Serialization.formats(NoTypeHints) val response: String = write[Queue](queue)(formats) respondWithStatus(StatusCodes.Created) { complete(response) } } case Failure(x) => { internalServerErrorOccured(x, "createQueue") } } } } // SQS takes in DeleteMessageRequest, which contains a queueUrl: String and a ReceiptHandle: String // returns a DeleteMessageResult, toString for debugging def deleteQueue: Route = path("deleteQueue" / Segment) { queueName => put { detach() { val deleteQueueTry: Try[Unit] = LocalHornetQMom.deleteQueue(queueName) deleteQueueTry match { case Success(v) => { complete(StatusCodes.OK) } case Failure(x) => { internalServerErrorOccured(x, "deleteQueue") } } } } } // SQS sendMessage(String queueUrl, String messageBody) => SendMessageResult def sendMessage: Route = path("sendMessage" / Segment) { toQueue => requestInstance { request => val messageContent = request.entity.asString debug(s"sendMessage to $toQueue '$messageContent'") detach() { val sendTry: Try[Unit] = LocalHornetQMom.send(messageContent, Queue(toQueue)) sendTry match { case Success(v) => { complete(StatusCodes.Accepted) } case Failure(x) => { internalServerErrorOccured(x, "sendMessage") } } } } } // SQS ReceiveMessageResult receiveMessage(String queueUrl) def receiveMessage: Route = get { path("receiveMessage" / Segment) { fromQueue => parameter('timeOutSeconds ? 20) { timeOutSeconds => val timeout: Duration = Duration.create(timeOutSeconds, "seconds") detach() { val receiveTry: Try[Option[Message]] = LocalHornetQMom.receive(Queue(fromQueue), timeout) receiveTry match { case Success(optionMessage) => { optionMessage.fold(complete(StatusCodes.NotFound)){localHornetQMessage => // add message in the map with an unique UUID val msgID = UUID.randomUUID() scheduleCleanupMessageMap(msgID, localHornetQMessage) complete(MessageContainer(msgID.toString, localHornetQMessage.contents).toJson) } } case Failure(x) => { internalServerErrorOccured(x, "receiveMessage") } } } } } } private def scheduleCleanupMessageMap(msgID: UUID, localHornetQMessage: Message) = { import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} idToMessages.update(msgID, (localHornetQMessage, System.currentTimeMillis())) // a sentinel that monitors the hashmap of idToMessages, any message that has been outstanding for more than 3X or 10X // time-to-live need to get cleaned out of this map val messageTimeOutInMillis: Long = ConfigSource.config.get("shrine.messagequeue.hornetQWebApi.messageTimeOutSeconds", Duration(_)).toMillis val sentinelRunner: MapSentinelRunner = MapSentinelRunner(messageTimeOutInMillis) try { Log.debug(s"Starting the sentinel scheduler that cleans outstanding messages exceeds 3 times $messageTimeOutInMillis") val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1) scheduler.schedule(sentinelRunner, messageTimeOutInMillis * 3, TimeUnit.MILLISECONDS) + scheduler.shutdown() } catch { case NonFatal(x) => ExceptionWhileSchedulingSentinelProblem(messageTimeOutInMillis, x) //pass-through to blow up the thread, receive no more results, do something dramatic in UncaughtExceptionHandler. case x => Log.error("Fatal exception while scheduling a sentinel for cleaning up outstanding messages", x) throw x } } // SQS has DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) def acknowledge: Route = path("acknowledge") { entity(as[String]) { messageUUID => detach() { val id: UUID = UUID.fromString(messageUUID) // retrieve the localMessage from the concurrent hashmap val getMessageTry: Try[Option[(Message, Long)]] = Try { idToMessages.remove(id) }.transform({ messageAndTime => Success(messageAndTime) }, { throwable => Failure(MessageDoesNotExistException(id)) }) getMessageTry match { case Success(messageAndTimeOption) => { messageAndTimeOption.fold({ respondWithStatus(StatusCodes.NotFound) { val noMessageProblem = MessageDoesNotExistInMapProblem(id) complete(noMessageProblem.description) } }) { messageAndTime => messageAndTime._1.complete() complete(StatusCodes.ResetContent) } } case Failure(x) => { x match { case m: MessageDoesNotExistException => { respondWithStatus(StatusCodes.NotFound) { complete(m.getMessage) } } case _ => internalServerErrorOccured(x, "acknowledge") } } } } } } // Returns the names of the queues created on this server. Seq[Any] def getQueues: Route = path("getQueues") { get { detach() { implicit val formats = Serialization.formats(NoTypeHints) respondWithStatus(StatusCodes.OK) { val getQueuesTry: Try[Seq[Queue]] = LocalHornetQMom.queues getQueuesTry match { case Success(seqQueue) => { complete(write[Seq[Queue]](LocalHornetQMom.queues.get)(formats)) } case Failure(x) => { internalServerErrorOccured(x, "getQueues") } } } } } } def internalServerErrorOccured(x: Throwable, function: String): Route = { respondWithStatus(StatusCodes.InternalServerError) { val serverErrorProblem: HornetQMomServerErrorProblem = HornetQMomServerErrorProblem(x, function) debug(s"HornetQ encountered a Problem during $function, Problem Details: $serverErrorProblem") complete(s"HornetQ throws an exception while trying to $function. HornetQ Server response: ${x.getMessage}" + s"Exception is from ${x.getClass}") } } } case class MessageContainer(id: String, contents: String) { def toJson: String = { Serialization.write(this)(MessageContainer.messageFormats) } } object MessageContainer { val messageFormats = Serialization.formats(ShortTypeHints(List(classOf[MessageContainer]))) def fromJson(jsonString: String): MessageContainer = { implicit val formats = messageFormats Serialization.read[MessageContainer](jsonString) } } case class HornetQMomServerErrorProblem(x:Throwable, function:String) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to a server error occurred in hornetQ." override val description: String = s"HornetQ throws an exception while trying to $function," + s" the server's response is: ${x.getMessage} from ${x.getClass}." } case class CannotUseHornetQMomWebApiProblem(x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to configuration in shrine.conf." override val description: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" } case class MessageDoesNotExistException(id: UUID) extends Exception(s"Cannot match given ${id.toString} to any Message in HornetQ server! Message does not exist!") case class MessageDoesNotExistInMapProblem(id: UUID) extends AbstractProblem(ProblemSources.Hub) { override def summary: String = s"The client expected message $id, but the server did not find it and could not complete() the message." override def description: String = s"The client expected message $id, but the server did not find it and could not complete() the message." + s" Message either has never been received or already been completed!" } case class ExceptionWhileCleaningUpMessageProblem(timeOutInMillis: Long, x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override def summary: String = s"The Hub encountered an exception while trying to " + s"cleanup messages that has been outstanding for more than $timeOutInMillis milliseconds" override def description: String = s"The Hub encountered an exception while trying to " + s"cleanup messages that has been received for more than $timeOutInMillis milliseconds " + s"on Thread ${Thread.currentThread().getName}: ${x.getMessage}" } case class ExceptionWhileSchedulingSentinelProblem(timeOutInMillis: Long, x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override def summary: String = s"The Hub encountered an exception while trying to " + s"schedule a sentinel that cleans up outstanding messages exceed $timeOutInMillis milliseconds" override def description: String = s"The Hub encountered an exception while trying to " + s"schedule a sentinel that cleans up outstanding messages exceed $timeOutInMillis milliseconds " + s"on Thread ${Thread.currentThread().getName}: ${x.getMessage}" } \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala index d8791ad30..ad77cceaf 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala @@ -1,197 +1,201 @@ package net.shrine.hornetqmom import com.typesafe.config.Config +import net.shrine.config.ConfigExtensions import net.shrine.log.Log import net.shrine.messagequeueservice.{Message, MessageQueueService, NoSuchQueueExistsInHornetQ, Queue} import net.shrine.source.ConfigSource import org.hornetq.api.core.{HornetQQueueExistsException, TransportConfiguration} import org.hornetq.api.core.client.{ClientConsumer, ClientMessage, ClientProducer, ClientSession, ClientSessionFactory, HornetQClient, ServerLocator} import org.hornetq.api.core.management.HornetQServerControl import org.hornetq.core.config.impl.ConfigurationImpl import org.hornetq.core.remoting.impl.invm.{InVMAcceptorFactory, InVMConnectorFactory} import org.hornetq.core.server.{HornetQServer, HornetQServers} import scala.collection.concurrent.{TrieMap, Map => ConcurrentMap} import scala.collection.immutable.Seq import scala.concurrent.blocking import scala.concurrent.duration.Duration import scala.util.Try /** * This object is the local version of the Message-Oriented Middleware API, which uses HornetQ service * * @author david * @since 7/18/17 */ object LocalHornetQMom extends MessageQueueService { val config: Config = ConfigSource.config.getConfig("shrine.messagequeue.hornetq") // todo use the config to set everything needed here that isn't hard-coded. val hornetQConfiguration = new ConfigurationImpl() // todo from config? What is the journal file about? If temporary, use a Java temp file. hornetQConfiguration.setJournalDirectory("target/data/journal") // todo want this. There are likely many other config bits hornetQConfiguration.setPersistenceEnabled(false) // todo maybe want this hornetQConfiguration.setSecurityEnabled(false) // todo probably just want the InVM version, but need to read up on options hornetQConfiguration.getAcceptorConfigurations.add(new TransportConfiguration(classOf[InVMAcceptorFactory].getName)) // Create and start the server val hornetQServer: HornetQServer = HornetQServers.newHornetQServer(hornetQConfiguration) hornetQServer.start() val serverLocator: ServerLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(classOf[InVMConnectorFactory].getName)) val sessionFactory: ClientSessionFactory = serverLocator.createSessionFactory() //arguments are boolean xa, boolean autoCommitSends, boolean autoCommitAcks . val session: ClientSession = sessionFactory.createSession(false, true, true) session.start() //keep a map of live queues to ClientConsumers to provide a path for completing messages val queuesToConsumers: ConcurrentMap[Queue, ClientConsumer] = TrieMap.empty /** * Use HornetQMomStopper to stop the hornetQServer without unintentially starting it */ // todo drop this into a try private[hornetqmom] def stop() = { queuesToConsumers.values.foreach(_.close()) session.close() sessionFactory.close() hornetQServer.stop() } //queue lifecycle def createQueueIfAbsent(queueName: String): Try[Queue] = { val unit = () val proposedQueue: Queue = Queue(queueName) for { serverControl: HornetQServerControl <- Try{ hornetQServer.getHornetQServerControl } queuesSoFar <- queues queueToUse <- Try { queuesSoFar.find(_.name == proposedQueue.name).fold{ try { serverControl.createQueue(proposedQueue.name, proposedQueue.name, true) } catch { case hqqex:HornetQQueueExistsException => Log.debug(s"Ignored a HornetQQueueExistsException in createQueueIfAbsent because ${proposedQueue} already exists.") } proposedQueue }{ queue => queue} } consumer <- Try { queuesToConsumers.getOrElseUpdate(proposedQueue, { session.createConsumer(proposedQueue.name) }) } } yield queueToUse } def deleteQueue(queueName: String): Try[Unit] = { val proposedQueue: Queue = Queue(queueName) for { deleteTry <- Try { queuesToConsumers.remove(proposedQueue).foreach(_.close()) val serverControl: HornetQServerControl = hornetQServer.getHornetQServerControl serverControl.destroyQueue(proposedQueue.name) } } yield deleteTry } override def queues: Try[Seq[Queue]] = { for { hornetQTry: HornetQServerControl <- Try { hornetQServer.getHornetQServerControl } getQueuesTry: Seq[Queue] <- Try { val queueNames: Array[String] = hornetQTry.getQueueNames queueNames.map(Queue(_)).to[Seq] } } yield getQueuesTry } //send a message def send(contents: String, to: Queue): Try[Unit] = { for { sendTry <- Try { // check if the queue exists first if (!this.queues.get.map(_.name).contains(to.name)) { throw NoSuchQueueExistsInHornetQ(to) } } producer: ClientProducer <- Try{ session.createProducer(to.name) } message <- Try{ - session.createMessage(true).putStringProperty(Message.contentsKey, contents) + val msg = session.createMessage(true).putStringProperty(Message.contentsKey, contents) + val messageTimeOutInMillis: Long = ConfigSource.config.get("shrine.messagequeue.hornetQWebApi.messageTimeOutSeconds", Duration(_)).toMillis + msg.setExpiration(System.currentTimeMillis() + messageTimeOutInMillis) + msg } sendMessage <- Try { producer.send(message) Log.debug(s"Message $message sent to $to in HornetQ") producer.close() } } yield sendMessage } //receive a message /** * Always do AWS SQS-style long polling. * Be sure your code can handle receiving the same message twice. * * @return Some message before the timeout, or None */ def receive(from: Queue, timeout: Duration): Try[Option[Message]] = { for { //todo handle the case where either stop or close has been called on something gracefully messageConsumer: ClientConsumer <- Try { if (!queuesToConsumers.contains(from)) { throw new NoSuchElementException(s"Given Queue ${from.name} does not exist in HornetQ server! Please create the queue first!") } queuesToConsumers(from) } message: Option[LocalHornetQMessage] <- Try { blocking { val messageReceived: Option[ClientMessage] = Option(messageConsumer.receive(timeout.toMillis)) messageReceived.foreach(m => Log.debug(s"Received $m from $from in HornetQ")) messageReceived.map(clientMsg => LocalHornetQMessage(clientMsg)) } } } yield message } def getQueueConsumer(queue: Queue): Try[ClientConsumer] = { for { messageConsumer: ClientConsumer <- Try { if (!queuesToConsumers.contains(queue)) { throw new NoSuchElementException(s"Given Queue ${queue.name} does not exist in HornetQ server! Please create the queue first!") } queuesToConsumers(queue) } } yield messageConsumer } //todo dead letter queue for all messages SHRINE-2261 case class LocalHornetQMessage private(clientMessage: ClientMessage) extends Message { override def contents: String = clientMessage.getStringProperty(Message.contentsKey) //complete a message override def complete(): Try[Unit] = Try { clientMessage.acknowledge() } } } /** * If the configuration is such that HornetQ should have been started use this object to stop it */ object LocalHornetQMomStopper { def stop(): Unit = { if(ConfigSource.config.getBoolean("shrine.messagequeue.hornetQWebApi.enabled")) { LocalHornetQMom.stop() } } } \ No newline at end of file