diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala index dcdb9b2c0..4ff3c0880 100644 --- a/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala +++ b/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala @@ -1,112 +1,116 @@ package net.shrine.metadata import com.typesafe.config.Config import net.shrine.config.ConfigExtensions import net.shrine.hornetqclient.CouldNotCreateQueueButOKToRetryException import net.shrine.log.Log import net.shrine.messagequeueservice.{Message, MessageQueueService, Queue} import net.shrine.problem.ProblemNotYetEncoded import net.shrine.protocol.{ResultOutputType, ResultOutputTypes, RunQueryResponse} import net.shrine.qep.querydb.QepQueryDb import net.shrine.source.ConfigSource import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal /** * Receives messages and writes the result to the QEP's cache * * @author david * @since 8/18/17 */ object QepReceiver { val config: Config = ConfigSource.config val nodeName = config.getString("shrine.humanReadableNodeName") //create a daemon thread that long-polls for messages forever val pollingThread = new Thread(QepReceiverRunner(nodeName),s"${getClass.getSimpleName} poller") pollingThread.setDaemon(true) //todo pollingThread.setUncaughtExceptionHandler() SHRINE-2198 pollingThread.start() Log.debug(s"Started the QepReceiver thread for $nodeName") case class QepReceiverRunner(nodeName:String) extends Runnable { val pollDuration = Duration("15 seconds") //todo from config val breakdownTypes: Set[ResultOutputType] = ConfigSource.config.getOptionConfigured("shrine.breakdownResultOutputTypes", ResultOutputTypes.fromConfig).getOrElse(Set.empty) override def run(): Unit = { val queue = createQueue(nodeName) while (true) { //forever try { //todo only ask to receive a message if there are incomplete queries SHRINE-2196 Log.debug("About to call receive.") receiveAMessage(queue) Log.debug("Successfully called receive.") } catch { case NonFatal(x) => Log.error("Exception while receiving a message.", x) //todo new kind of problem //pass-through to blow up the thread, receive no more results, do something dramatic in UncaughtExceptionHandler. case x => Log.error("Fatal exception while receiving a message", x) throw x } } } def receiveAMessage(queue:Queue): Unit = { val maybeMessage: Try[Option[Message]] = MessageQueueService.service.receive(queue, pollDuration) //todo make this configurable (and testable) maybeMessage.transform({m => m.foreach(interpretAMessage(_,queue)) Success(m) },{x => ProblemNotYetEncoded(s"Something went wrong during receive from $queue",x) //todo create full-up problem Failure(x) }) } val unit = () - def interpretAMessage(message: Message,queue: Queue):Unit = { - Log.debug(s"Received a message from $queue") + def interpretAMessage(message: Message,queue: Queue): Unit = { + Log.debug(s"Received a message from $queue of $message") val xmlString = message.contents val rqrt: Try[RunQueryResponse] = RunQueryResponse.fromXmlString(breakdownTypes)(xmlString) rqrt.transform({ rqr => Log.debug(s"Inserting result from ${rqr.singleNodeResult.description} for query ${rqr.queryId}") QepQueryDb.db.insertQueryResult(rqr.queryId, rqr.singleNodeResult) message.complete() Success(unit) },{ x => + x match { + case NonFatal(nfx) => Log.error(s"Could not decode message $xmlString ",x) + case _ => + } Failure(x) }).get } def createQueue(nodeName:String):Queue = { //Either come back with the right exception to try again, or a Queue def tryToCreateQueue():Try[Queue] = MessageQueueService.service.createQueueIfAbsent(nodeName) def keepGoing(attempt:Try[Queue]):Try[Boolean] = attempt.transform({queue => Success(false)}, { case okIsh: CouldNotCreateQueueButOKToRetryException => Success(true) case x => Failure(x) }) //todo for fun figure out how to do this without the var. maybe a Stream ? var lastAttempt:Try[Queue] = tryToCreateQueue() while(keepGoing(lastAttempt).get) { Log.debug(s"Last attempt to create a queue resulted in ${lastAttempt}. Sleeping $pollDuration before next attempt") Thread.sleep(pollDuration.toMillis) lastAttempt = tryToCreateQueue() } Log.info(s"Finishing createQueue with $lastAttempt") lastAttempt.get } } } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala index 30116316c..5115bfdc2 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala @@ -1,117 +1,117 @@ package net.shrine.broadcaster import net.shrine.adapter.client.{AdapterClient, RemoteAdapterClient} import net.shrine.audit.NetworkQueryId import net.shrine.broadcaster.dao.HubDao import net.shrine.client.TimeoutException import net.shrine.log.Loggable import net.shrine.messagequeueservice.MessageQueueService import net.shrine.protocol.{BroadcastMessage, FailureResult, RunQueryRequest, SingleNodeResult, Timeout} import scala.concurrent.Future import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal /** * @author clint * @since Nov 15, 2013 */ final case class AdapterClientBroadcaster(destinations: Set[NodeHandle], dao: HubDao) extends Broadcaster with Loggable { logStartup() import scala.concurrent.ExecutionContext.Implicits.global override def broadcast(message: BroadcastMessage): Multiplexer = { logOutboundIfNecessary(message) val multiplexer: Multiplexer = new BufferingMultiplexer(destinations.map(_.nodeId)) for { nodeHandle <- destinations //todo more status for SHRINE-2120 shrineResponse: SingleNodeResult <- callAdapter(message, nodeHandle) } { try { message.request match { case rqr:RunQueryRequest => debug(s"RunQueryRequest's nodeId is ${rqr.nodeId}") //todo SHRINE-2120 send to the QEP queue named nodeId //todo get to the point where there's always a nodeId and clean this up rqr.nodeId.fold{ debug(s"Did not send to queue because nodeId is None") }{ nodeId => sendToQep(shrineResponse,rqr.networkQueryId,nodeId.name) } case _ => debug(s"Not a RunQueryRequest but a ${message.request.getClass.getSimpleName}.") } multiplexer.processResponse(shrineResponse) } finally { logResultsIfNecessary(message, shrineResponse) } } multiplexer } private def sendToQep(shrineResponse: SingleNodeResult,networkQueryId: NetworkQueryId,queueName:String):Unit = { val s: Try[Unit] = for { queue <- MessageQueueService.service.createQueueIfAbsent(queueName) //todo use the json envelope when you get to SHRINE-2177 - sent <- MessageQueueService.service.send(shrineResponse.toXml.text, queue) + sent <- MessageQueueService.service.send(shrineResponse.toXmlString, queue) } yield sent s.transform({itWorked => debug(s"Result from ${shrineResponse.origin.name} sent to queue") Success(itWorked) },{throwable: Throwable => throwable match { case NonFatal(x) => error(s"Could not send result from hub to $queueName for $networkQueryId", x) //todo better error handling case _ => //no op } Failure(throwable) }) } private[broadcaster] def callAdapter(message: BroadcastMessage, nodeHandle: NodeHandle): Future[SingleNodeResult] = { val NodeHandle(nodeId, client) = nodeHandle client.query(message).recover { case e: TimeoutException => error(s"Broadcasting to $nodeId timed out") Timeout(nodeId) case NonFatal(e) => error(s"Broadcasting to $nodeId failed with ", e) FailureResult(nodeId, e) } } private[broadcaster] def logResultsIfNecessary(message: BroadcastMessage, result: SingleNodeResult): Unit = logIfNecessary(message) { _ => dao.logQueryResult(message.requestId, result) } private[broadcaster] def logOutboundIfNecessary(message: BroadcastMessage): Unit = logIfNecessary(message) { runQueryReq => dao.logOutboundQuery(message.requestId, message.networkAuthn, runQueryReq.queryDefinition) } private[broadcaster] def logIfNecessary(message: BroadcastMessage)(f: RunQueryRequest => Any): Unit = { message.request match { case runQueryReq: RunQueryRequest => f(runQueryReq) case _ => () } } private def logStartup(): Unit = { def clientToString(client: AdapterClient): String = client match { case r: RemoteAdapterClient => r.poster.url.toString case _ => "" } info(s"Initialized ${getClass.getSimpleName}, will broadcast to the following destinations:") destinations.toSeq.sortBy(_.nodeId.name).foreach { handle => info(s" ${handle.nodeId}: ${clientToString(handle.client)}") } } } \ No newline at end of file diff --git a/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala b/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala index ca24905d4..b44e3c814 100644 --- a/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala +++ b/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala @@ -1,194 +1,197 @@ package net.shrine.hornetqclient import akka.actor.{Actor, ActorRef, ActorRefFactory, ActorSystem, Props} import net.shrine.log.Loggable import net.shrine.messagequeueservice.{Message, MessageQueueService, MessageSerializer, Queue, QueueSerializer} import net.shrine.source.ConfigSource import org.json4s.native.Serialization import org.json4s.native.Serialization.{read, write} import org.json4s.{Formats, NoTypeHints} import spray.http.{HttpEntity, HttpMethods, HttpRequest, HttpResponse, StatusCode, StatusCodes} import scala.collection.immutable.Seq import scala.concurrent.ExecutionContext import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration} import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal /** * A simple HornetQMomWebClient that uses HornetQMomWebApi to createQueue, * deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * @author yifan * @since 8/10/17 */ object HornetQMomWebClient extends MessageQueueService with Loggable { // we need an ActorSystem to host our application in implicit val system: ActorSystem = ActorSystem("momServer", ConfigSource.config) // TODO in SHRINE-2167: Extract and share a SHRINE actor system // the service actor replies to incoming HttpRequests // implicit val serviceActor: ActorRef = startServiceActor() // def startActorSystem(): ActorSystem = try { // val actorSystem: ActorSystem = ActorSystem("momServer", ConfigSource.config) // info(s"Starting ActorSystem: ${actorSystem.name} for HornetQMomWebClient at time: ${actorSystem.startTime}") // actorSystem // } catch { // case NonFatal(x) => { // debug(s"NonFatalException thrown while starting ActorSystem for HornetQMomWebClient: ${x.getMessage}") // throw x // } // case x: ExceptionInInitializerError => { // debug(s"ExceptionInInitializerError thrown while starting ActorSystem for HornetQMomWebClient: ${x.getMessage}") // throw x // } // } // // def startServiceActor(): ActorRef = try { // // the service actor replies to incoming HttpRequests // val actor: ActorRef = system.actorOf(Props[HornetQMomWebClientServiceActor]) // info(s"Starting ServiceActor: ${actor.toString()} for HornetQMomWebClient") // actor // } // catch { // case NonFatal(x) => { // debug(s"NonFatalException thrown while starting ServiceActor for HornetQMomWebClient: ${x.getMessage}") // throw x // } // case x: ExceptionInInitializerError => { // debug(s"ExceptionInInitializerError thrown while starting ServiceActor for HornetQMomWebClient: ${x.getMessage}") // throw x // } // } val momUrl: String = ConfigSource.config.getString("shrine.messagequeue.hornetq.serverUrl") override def createQueueIfAbsent(queueName: String): Try[Queue] = { val proposedQueue: Queue = Queue(queueName) val createQueueUrl = momUrl + s"/createQueue/${proposedQueue.name}" val request: HttpRequest = HttpRequest(HttpMethods.PUT, createQueueUrl) for { response: HttpResponse <- Try(HttpClient.webApiCall(request)) queue: Queue <- queueFromResponse(response) } yield queue } def queueFromResponse(response: HttpResponse):Try[Queue] = Try { if(response.status == StatusCodes.Created) { val queueString = response.entity.asString implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer read[Queue](queueString)(formats, manifest[Queue]) } else { if((response.status == StatusCodes.NotFound) || (response.status == StatusCodes.RequestTimeout))throw new CouldNotCreateQueueButOKToRetryException(response.status,response.entity.asString) else throw new IllegalStateException(s"Response status is ${response.status}, not Created. Cannot make a queue from this response: ${response.entity.asString}") //todo more specific custom exception } }.transform({ s => Success(s) },{throwable => throwable match { case NonFatal(x) => error(s"Unable to create a Queue from '${response.entity.asString}' due to exception",throwable) //todo probably want to wrap more information into a new Throwable here case _ => } Failure(throwable) }) override def deleteQueue(queueName: String): Try[Unit] = { val proposedQueue: Queue = Queue(queueName) val deleteQueueUrl = momUrl + s"/deleteQueue/${proposedQueue.name}" val request: HttpRequest = HttpRequest(HttpMethods.PUT, deleteQueueUrl) Try(HttpClient.webApiCall(request)) // StatusCodes.OK } override def queues: Try[Seq[Queue]] = { val getQueuesUrl = momUrl + s"/getQueues" val request: HttpRequest = HttpRequest(HttpMethods.GET, getQueuesUrl) for { response: HttpResponse <- Try(HttpClient.webApiCall(request)) allQueues: Seq[Queue] <- Try { val allQueues: String = response.entity.asString implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer read[Seq[Queue]](allQueues)(formats, manifest[Seq[Queue]]) } } yield allQueues } override def send(contents: String, to: Queue): Try[Unit] = { + + debug(s"send to $to '$contents'") + val sendMessageUrl = momUrl + s"/sendMessage/${to.name}" val request: HttpRequest = HttpRequest( method = HttpMethods.PUT, uri = sendMessageUrl, entity = HttpEntity(contents) //todo set contents as XML or json ) for { response: HttpResponse <- Try(HttpClient.webApiCall(request,3L seconds)) //todo configurable } yield response } //todo test receiving no message override def receive(from: Queue, timeout: Duration): Try[Option[Message]] = { val seconds = timeout.toSeconds val receiveMessageUrl = momUrl + s"/receiveMessage/${from.name}?timeOutSeconds=$seconds" val request: HttpRequest = HttpRequest(HttpMethods.GET, receiveMessageUrl) val httpRequestTimeout: FiniteDuration = (timeout.toSeconds + 1) second //todo configurable for { response: HttpResponse <- Try(HttpClient.webApiCall(request, httpRequestTimeout)) messageResponse: Option[Message] <- messageOptionFromResponse(response) } yield (messageResponse) } def messageOptionFromResponse(response: HttpResponse):Try[Option[Message]] = Try { if(response.status == StatusCodes.NotFound) None else if (response.status == StatusCodes.OK) Some { val responseString = response.entity.asString val formats = Serialization.formats(NoTypeHints) + new MessageSerializer read[Message](responseString)(formats, manifest[Message]) } else { throw new IllegalStateException(s"Response status is ${response.status}, not OK or NotFound. Cannot make a Message from this response: ${response.entity.asString}") } }.transform({ s => Success(s) },{throwable => throwable match { case NonFatal(x) => error(s"Unable to create a Message from '${response.entity.asString}' due to exception",throwable) //todo probably want to wrap more information into a new Throwable here case _ => } Failure(throwable) }) override def completeMessage(message: Message): Try[Unit] = { implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer val messageString: String = write[Message](message)(formats) val entity: HttpEntity = HttpEntity(messageString) val completeMessageUrl: String = momUrl + s"/acknowledge" // HttpEntity val request: HttpRequest = HttpRequest(HttpMethods.PUT, completeMessageUrl).withEntity(entity) for { response: HttpResponse <- Try(HttpClient.webApiCall(request)) } yield response } } case class CouldNotCreateQueueButOKToRetryException(status:StatusCode,contents:String) extends Exception(s"Could not create a queue due to status code $status with message '$contents'") // TODO in SHRINE-2167: Extract and share a SHRINE actor system //class HornetQMomWebClientServiceActor extends Actor with MetaDataService { // // // the HttpService trait defines only one abstract member, which // // connects the services environment to the enclosing actor or test // def actorRefFactory: ActorRefFactory = context // // // this actor only runs our route, but you could add // // other things here, like request stream processing // // or timeout handling // def receive: Receive = runRoute(route) // // override implicit val ec: ExecutionContext = ExecutionContext.Implicits.global // // override def system: ActorSystem = context.system //} \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala index af57973a4..a17f874f1 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala @@ -1,191 +1,198 @@ package net.shrine.hornetqmom import net.shrine.log.Loggable import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue, QueueSerializer} import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.source.ConfigSource import org.json4s.native.Serialization import org.json4s.native.Serialization.{read, write} import org.json4s.{Formats, NoTypeHints} import spray.http.StatusCodes import spray.routing.{HttpService, Route} import scala.collection.immutable.Seq import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} /** * A web API that provides access to the internal HornetQMom library. * Allows client to createQueue, deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * Created by yifan on 7/24/17. */ trait HornetQMomWebApi extends HttpService with Loggable { - val enabled: Boolean = ConfigSource.config.getString("shrine.messagequeue.hornetQWebApi.enabled").toBoolean + def enabled: Boolean = ConfigSource.config.getBoolean("shrine.messagequeue.hornetQWebApi.enabled") val warningMessage: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" def momRoute: Route = pathPrefix("mom") { if (!enabled) { val configProblem: CannotUseHornetQMomWebApiProblem = CannotUseHornetQMomWebApiProblem(new UnsupportedOperationException) warn(s"HornetQMomWebApi is not available to use due to configProblem ${configProblem.description}!") respondWithStatus(StatusCodes.NotFound) { complete(warningMessage) } } else { put { createQueue ~ sendMessage ~ acknowledge } ~ receiveMessage ~ getQueues ~ deleteQueue } } // SQS returns CreateQueueResult, which contains queueUrl: String def createQueue: Route = path("createQueue" / Segment) { queueName => detach() { val createdQueueTry: Try[Queue] = LocalHornetQMom.createQueueIfAbsent(queueName) createdQueueTry match { case Success(queue) => { implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer val response: String = write[Queue](queue)(formats) respondWithStatus(StatusCodes.Created) { complete(response) } } case Failure(x) => { internalServerErrorOccured(x, "createQueue") } } } } // SQS takes in DeleteMessageRequest, which contains a queueUrl: String and a ReceiptHandle: String // returns a DeleteMessageResult, toString for debugging def deleteQueue: Route = path("deleteQueue" / Segment) { queueName => put { detach() { val deleteQueueTry: Try[Unit] = LocalHornetQMom.deleteQueue(queueName) deleteQueueTry match { case Success(v) => { complete(StatusCodes.OK) } case Failure(x) => { internalServerErrorOccured(x, "deleteQueue") } } } } } // SQS sendMessage(String queueUrl, String messageBody) => SendMessageResult def sendMessage: Route = path("sendMessage" / Segment) { toQueue => requestInstance { request => val messageContent = request.entity.asString + + debug(s"sendMessage to $toQueue '$messageContent'") + detach() { val sendTry: Try[Unit] = LocalHornetQMom.send(messageContent, Queue(toQueue)) sendTry match { case Success(v) => { complete(StatusCodes.Accepted) } case Failure(x) => { internalServerErrorOccured(x, "sendMessage") } } } } } // SQS ReceiveMessageResult receiveMessage(String queueUrl) def receiveMessage: Route = get { path("receiveMessage" / Segment) { fromQueue => parameter('timeOutSeconds ? 20) { timeOutSeconds => val timeout: Duration = Duration.create(timeOutSeconds, "seconds") detach() { val receiveTry: Try[Option[Message]] = LocalHornetQMom.receive(Queue(fromQueue), timeout) receiveTry match { case Success(optMessage) => { - implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer - optMessage.fold(complete(StatusCodes.NotFound))(msg => complete(write(optMessage)(formats))) + optMessage.fold(complete(StatusCodes.NotFound)){msg => + implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer + val messageJson = write(msg) + debug(s"receiveMessage json is $messageJson") + complete(write(msg)(formats)) + } } case Failure(x) => { internalServerErrorOccured(x, "receiveMessage") } } } } } } // SQS has DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) def acknowledge: Route = path("acknowledge") { entity(as[String]) { messageJSON => implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer detach() { val msg: Message = read[Message](messageJSON)(formats, manifest[Message]) val acknowledgeTry: Try[Unit] = LocalHornetQMom.completeMessage(msg) acknowledgeTry match { case Success(v) => { complete(StatusCodes.ResetContent) } case Failure(x) => { internalServerErrorOccured(x, "acknowledge") } } } } } // Returns the names of the queues created on this server. Seq[Any] def getQueues: Route = path("getQueues") { get { detach() { implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer respondWithStatus(StatusCodes.OK) { val getQueuesTry: Try[Seq[Queue]] = LocalHornetQMom.queues getQueuesTry match { case Success(seqQueue) => { complete(write[Seq[Queue]](LocalHornetQMom.queues.get)(formats)) } case Failure(x) => { internalServerErrorOccured(x, "getQueues") } } } } } } def internalServerErrorOccured(x: Throwable, function: String): Route = { respondWithStatus(StatusCodes.InternalServerError) { val serverErrorProblem: HornetQMomServerErrorProblem = HornetQMomServerErrorProblem(x, function) debug(s"HornetQ encountered a Problem during $function, Problem Details: $serverErrorProblem") complete(s"HornetQ throws an exception while trying to $function. HornetQ Server response: ${x.getMessage}" + s"Exception is from ${x.getClass}") } } } case class HornetQMomServerErrorProblem(x:Throwable, function:String) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to a server error occurred in hornetQ." override val description: String = s"HornetQ throws an exception while trying to $function," + s" the server's response is: ${x.getMessage} from ${x.getClass}." } case class CannotUseHornetQMomWebApiProblem(x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to configuration in shrine.conf." override val description: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" } diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala index 7324e3f6c..ab57054f6 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala @@ -1,189 +1,190 @@ package net.shrine.hornetqmom import com.typesafe.config.Config import net.shrine.log.Log import net.shrine.messagequeueservice.{Message, MessageQueueService, NoSuchQueueExistsInHornetQ, Queue} import net.shrine.source.ConfigSource import org.hornetq.api.core.{HornetQQueueExistsException, TransportConfiguration} import org.hornetq.api.core.client.{ClientConsumer, ClientMessage, ClientProducer, ClientSession, ClientSessionFactory, HornetQClient, ServerLocator} import org.hornetq.api.core.management.HornetQServerControl import org.hornetq.core.config.impl.ConfigurationImpl import org.hornetq.core.remoting.impl.invm.{InVMAcceptorFactory, InVMConnectorFactory} import org.hornetq.core.server.{HornetQServer, HornetQServers} import scala.collection.concurrent.{TrieMap, Map => ConcurrentMap} import scala.collection.immutable.Seq import scala.concurrent.blocking import scala.concurrent.duration.Duration import scala.util.Try /** * This object is the local version of the Message-Oriented Middleware API, which uses HornetQ service * * @author david * @since 7/18/17 */ object LocalHornetQMom extends MessageQueueService { val config: Config = ConfigSource.config.getConfig("shrine.messagequeue.hornetq") // todo use the config to set everything needed here that isn't hard-coded. val hornetQConfiguration = new ConfigurationImpl() // todo from config? What is the journal file about? If temporary, use a Java temp file. hornetQConfiguration.setJournalDirectory("target/data/journal") // todo want this. There are likely many other config bits hornetQConfiguration.setPersistenceEnabled(false) // todo maybe want this hornetQConfiguration.setSecurityEnabled(false) // todo probably just want the InVM version, but need to read up on options hornetQConfiguration.getAcceptorConfigurations.add(new TransportConfiguration(classOf[InVMAcceptorFactory].getName)) // Create and start the server val hornetQServer: HornetQServer = HornetQServers.newHornetQServer(hornetQConfiguration) hornetQServer.start() val serverLocator: ServerLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(classOf[InVMConnectorFactory].getName)) val sessionFactory: ClientSessionFactory = serverLocator.createSessionFactory() //arguments are boolean xa, boolean autoCommitSends, boolean autoCommitAcks . val session: ClientSession = sessionFactory.createSession(false, true, true) session.start() //keep a map of live queues to ClientConsumers to provide a path for completing messages val queuesToConsumers: ConcurrentMap[Queue, ClientConsumer] = TrieMap.empty val propName = "contents" /** * Use HornetQMomStopper to stop the hornetQServer without unintentially starting it */ // todo drop this into a try private[hornetqmom] def stop() = { queuesToConsumers.values.foreach(_.close()) session.close() sessionFactory.close() hornetQServer.stop() } //queue lifecycle def createQueueIfAbsent(queueName: String): Try[Queue] = { val unit = () val proposedQueue: Queue = Queue(queueName) for { serverControl: HornetQServerControl <- Try{ hornetQServer.getHornetQServerControl } queuesSoFar <- queues queueToUse <- Try { queuesSoFar.find(_.name == proposedQueue.name).fold{ try { serverControl.createQueue(proposedQueue.name, proposedQueue.name, true) } catch { case hqqex:HornetQQueueExistsException => Log.debug(s"Caught and ignored a HornetQQueueExistsException in createQueueIfAbsent.",hqqex) } proposedQueue }{ queue => queue} } consumer <- Try { queuesToConsumers.getOrElseUpdate(proposedQueue, { session.createConsumer(proposedQueue.name) }) } } yield queueToUse } def deleteQueue(queueName: String): Try[Unit] = { val proposedQueue: Queue = Queue(queueName) for { deleteTry <- Try { queuesToConsumers.remove(proposedQueue).foreach(_.close()) val serverControl: HornetQServerControl = hornetQServer.getHornetQServerControl serverControl.destroyQueue(proposedQueue.name) } } yield deleteTry } override def queues: Try[Seq[Queue]] = { for { hornetQTry: HornetQServerControl <- Try { hornetQServer.getHornetQServerControl } getQueuesTry: Seq[Queue] <- Try { val queueNames: Array[String] = hornetQTry.getQueueNames queueNames.map(Queue(_)).to[Seq] } } yield getQueuesTry } //send a message def send(contents: String, to: Queue): Try[Unit] = { for { sendTry <- Try { // check if the queue exists first if (!this.queues.get.map(_.name).contains(to.name)) { throw NoSuchQueueExistsInHornetQ(to) } } producer: ClientProducer <- Try{ session.createProducer(to.name) } - message <- Try{ session.createMessage(true) } - constructMessage <- Try { message.putStringProperty(propName, contents) } + message <- Try{ session.createMessage(true).putStringProperty(propName, contents) } sendMessage <- Try { producer.send(message) + Log.debug(s"Message $message sent to $to in HornetQ") producer.close() } } yield sendMessage } //receive a message /** * Always do AWS SQS-style long polling. * Be sure your code can handle receiving the same message twice. * * @return Some message before the timeout, or None */ def receive(from: Queue, timeout: Duration): Try[Option[Message]] = { for { //todo handle the case where either stop or close has been called on something gracefully messageConsumer: ClientConsumer <- Try { if (!queuesToConsumers.contains(from)) { throw new NoSuchElementException(s"Given Queue ${from.name} does not exist in HornetQ server! Please create the queue first!") } queuesToConsumers(from) } message: Option[Message] <- Try { blocking { val messageReceived: Option[ClientMessage] = Option(messageConsumer.receive(timeout.toMillis)) + messageReceived.foreach(m => Log.debug(s"Received ${m} from $from in HornetQ")) messageReceived.map(Message(_)) } } } yield message } //todo dead letter queue for all messages. See http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs-dead-letter-queues.html //complete a message override def completeMessage(message: Message): Try[Unit] = { for { completeMessageTry <- Try { message.complete() } } yield completeMessageTry } } /** * If the configuration is such that HornetQ should have been started use this object to stop it */ //todo is this a good way to write this code? object LocalHornetQMomStopper { def stop(): Unit = { //todo fill in as part of SHIRINE-2128 for { config: Config <- Try { ConfigSource.config.getConfig("shrine.messagequeue.hornetq") } } yield config LocalHornetQMom.stop() } } \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala index 01bfe8485..5e023d6c1 100644 --- a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala +++ b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala @@ -1,181 +1,185 @@ package net.shrine.hornetqmom import akka.actor.ActorRefFactory import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue, QueueSerializer} import net.shrine.source.ConfigSource import org.json4s.NoTypeHints import org.json4s.native.Serialization import org.json4s.native.Serialization.read import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import spray.http.HttpEntity import spray.http.StatusCodes._ import spray.testkit.ScalatestRouteTest import scala.collection.immutable.Seq /** * Created by yifan on 7/27/17. */ @RunWith(classOf[JUnitRunner]) class HornetQMomWebApiTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi { override def actorRefFactory: ActorRefFactory = system private val proposedQueueName = "test Queue" private val queue: Queue = Queue(proposedQueueName) private val queueName: String = queue.name // "testQueue" private val messageContent = "test Content" private var receivedMessage: String = "" "HornetQMomWebApi" should "create/delete the given queue, send/receive message, get queues" in { - Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer - println(response) - val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) - val responseQueueName = jsonToQueue.name - assertResult(Created)(status) - assertResult(queueName)(responseQueueName) + ConfigSource.atomicConfig.configForBlock("shrine.messagequeue.hornetQWebApi.enabled","true","HornetQMomWebApiTest") { + + Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { + val response = new String(body.data.toByteArray) + if (!enabled) { + assertResult(NotFound)(status) + assertResult(warningMessage)(response) + } else { + implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer + println(response) + val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) + val responseQueueName = jsonToQueue.name + assertResult(Created)(status) + assertResult(queueName)(responseQueueName) + } } - } - // should be OK to create a queue twice - Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer - val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) - val responseQueueName = jsonToQueue.name - assertResult(Created)(status) - assertResult(queueName)(responseQueueName) + // should be OK to create a queue twice + Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { + val response = new String(body.data.toByteArray) + if (!enabled) { + assertResult(NotFound)(status) + assertResult(warningMessage)(response) + } else { + implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer + val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) + val responseQueueName = jsonToQueue.name + assertResult(Created)(status) + assertResult(queueName)(responseQueueName) + } } - } - Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - assertResult(Accepted)(status) + Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { + val response = new String(body.data.toByteArray) + if (!enabled) { + assertResult(NotFound)(status) + assertResult(warningMessage)(response) + } else { + assertResult(Accepted)(status) + } } - } - Get(s"/mom/getQueues") ~> momRoute ~> check { - val response: String = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer - val jsonToSeq: Seq[Queue] = read[Seq[Queue]](response, false)(formats, manifest[Seq[Queue]]) - - assertResult(OK)(status) - assertResult(queueName)(jsonToSeq.head.name) + Get(s"/mom/getQueues") ~> momRoute ~> check { + val response: String = new String(body.data.toByteArray) + if (!enabled) { + assertResult(NotFound)(status) + assertResult(warningMessage)(response) + } else { + implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer + val jsonToSeq: Seq[Queue] = read[Seq[Queue]](response, false)(formats, manifest[Seq[Queue]]) + + assertResult(OK)(status) + assertResult(queueName)(jsonToSeq.head.name) + } } - } - // given timeout is 2 seconds - Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=2") ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - receivedMessage = response - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer - val responseToMessage: Message = read[Message](response)(formats, manifest[Message]) - - assertResult(OK)(status) - assert(responseToMessage.isInstanceOf[Message]) + // given timeout is 2 seconds + Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=2") ~> momRoute ~> check { + val response = new String(body.data.toByteArray) + receivedMessage = response + if (!enabled) { + assertResult(NotFound)(status) + assertResult(warningMessage)(response) + } else { + implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer + val responseToMessage: Message = read[Message](response)(formats, manifest[Message]) + + assertResult(OK)(status) + assert(responseToMessage.isInstanceOf[Message]) + //todo contents is always null assertResult(messageContent)(responseToMessage.contents) + } } - } - Put("/mom/acknowledge", HttpEntity(s"$receivedMessage")) ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer - assertResult(ResetContent)(status) + Put("/mom/acknowledge", HttpEntity(s"$receivedMessage")) ~> momRoute ~> check { + val response = new String(body.data.toByteArray) + if (!enabled) { + assertResult(NotFound)(status) + assertResult(warningMessage)(response) + } else { + implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer + assertResult(ResetContent)(status) + } } - } - Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - assertResult(OK)(status) + Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { + val response = new String(body.data.toByteArray) + if (!enabled) { + assertResult(NotFound)(status) + assertResult(warningMessage)(response) + } else { + assertResult(OK)(status) + } } } } "HornetQMomWebApi" should "respond Internal server error with the corresponding error message when " + "failures occur while creating/deleting the given queue, sending/receiving message, getting queues" in { Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { val response = new String(body.data.toByteArray) if (!enabled) { assertResult(NotFound)(status) assertResult(warningMessage)(response) } else { assertResult(InternalServerError)(status) } } Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { val response = new String(body.data.toByteArray) if (!enabled) { assertResult(NotFound)(status) assertResult(warningMessage)(response) } else { assertResult(InternalServerError)(status) } } // given timeout is 1 seconds Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=1") ~> momRoute ~> check { val response = new String(body.data.toByteArray) if (!enabled) { assertResult(NotFound)(status) assertResult(warningMessage)(response) } else { assertResult(InternalServerError)(status) } } } } @RunWith(classOf[JUnitRunner]) class HornetQMomWebApiConfigTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi { override def actorRefFactory: ActorRefFactory = system private val queueName = "testQueue" override val enabled: Boolean = ConfigSource.config.getString("shrine.messagequeue.hornetQWebApiTest.enabled").toBoolean "HornetQMomWebApi" should "block user from using the API and return a 404 response" in { Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { val response = new String(body.data.toByteArray) assertResult(warningMessage)(response) assertResult(NotFound)(status) } } } \ No newline at end of file