diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala index be6e8b49c..4132eb821 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala @@ -1,164 +1,196 @@ package net.shrine.broadcaster import net.shrine.adapter.client.{AdapterClient, RemoteAdapterClient} import net.shrine.aggregation.RunQueryAggregator import net.shrine.broadcaster.dao.HubDao import net.shrine.client.TimeoutException -import net.shrine.log.Loggable -import net.shrine.messagequeueservice.MessageQueueService +import net.shrine.log.{Log, Loggable} +import net.shrine.messagequeueservice.{CouldNotCompleteMomTaskButOKToRetryException, MessageQueueService, Queue} import net.shrine.messagequeueservice.protocol.Envelope import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.protocol.{AggregatedRunQueryResponse, BaseShrineResponse, BroadcastMessage, FailureResult, QueryResult, RunQueryRequest, SingleNodeResult, Timeout} import net.shrine.status.protocol.IncrementalQueryResult +import scala.collection.concurrent.TrieMap import scala.concurrent.Future +import scala.concurrent.duration.Duration import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} + + /** * @author clint * @since Nov 15, 2013 */ final case class AdapterClientBroadcaster(destinations: Set[NodeHandle], dao: HubDao) extends Broadcaster with Loggable { logStartup() import scala.concurrent.ExecutionContext.Implicits.global override def broadcast(message: BroadcastMessage): Multiplexer = { logOutboundIfNecessary(message) //send back json containing just enough to fill a QueryResultRow message.request match { case runQueryRequest: RunQueryRequest => debug(s"RunQueryRequest's nodeId is ${runQueryRequest.nodeId}") runQueryRequest.nodeId.fold { error(s"Did not send to queue because nodeId is None") } { nodeId => { val hubWillSubmitStatuses = destinations.map { nodeHandle => IncrementalQueryResult( runQueryRequest.networkQueryId, nodeHandle.nodeId.name, QueryResult.StatusType.HubWillSubmit.name, s"The hub is about to submit query ${runQueryRequest.networkQueryId} to ${nodeHandle.nodeId.name}" ) }.to[Seq] val envelope = Envelope(IncrementalQueryResult.incrementalQueryResultsEnvelopeContentsType,IncrementalQueryResult.seqToJson(hubWillSubmitStatuses)) sendToQep(envelope,nodeId.name,s"Status update - the hub will submit the query to the adapters for ${runQueryRequest.networkQueryId}") } } case _ => //don't care } val multiplexer: Multiplexer = new BufferingMultiplexer(destinations.map(_.nodeId)) for { nodeHandle <- destinations shrineResponse: SingleNodeResult <- callAdapter(message, nodeHandle) } { try { message.request match { case runQueryRequest:RunQueryRequest => debug(s"RunQueryRequest's nodeId is ${runQueryRequest.nodeId}") runQueryRequest.nodeId.fold{ error(s"Did not send to queue because nodeId is None") }{ nodeId => // make an AggregateRunQueryResponse from the SingleNodeResult val aggregator = new RunQueryAggregator( //to convert the SingleNodeResult into an AggregateRunQueryResponse runQueryRequest.networkQueryId, runQueryRequest.authn.username, runQueryRequest.authn.domain, runQueryRequest.queryDefinition, false ) val response: BaseShrineResponse = aggregator.aggregate(Seq(shrineResponse),Seq.empty,message) response match { case runQueryResponse:AggregatedRunQueryResponse => val envelope = Envelope(AggregatedRunQueryResponse.getClass.getSimpleName,runQueryResponse.toXmlString) sendToQep(envelope,nodeId.name,s"Result from ${runQueryResponse.results.head.description.get}") case _ => error(s"response is not a AggregatedRunQueryResponse. It is a ${response.toString}") } } case _ => debug(s"Not a RunQueryRequest but a ${message.request.getClass.getSimpleName}.") } multiplexer.processResponse(shrineResponse) } finally { logResultsIfNecessary(message, shrineResponse) } } multiplexer } + //todo clean up this cut/paste -remove if not used + def createQueue(nodeName:String):Queue = { + + val pollDuration:Duration = Duration("10 seconds") + + //Either come back with the right exception to try again, or a Queue + def tryToCreateQueue():Try[Queue] = + MessageQueueService.service.createQueueIfAbsent(nodeName) + + def keepGoing(attempt:Try[Queue]):Try[Boolean] = attempt.transform({queue => Success(false)}, { + case okIsh: CouldNotCompleteMomTaskButOKToRetryException => Success(true) + case x => Failure(x) + }) + + //todo for fun figure out how to do this without the var. maybe a Stream ? SHRINE-2211 + var lastAttempt:Try[Queue] = tryToCreateQueue() + while(keepGoing(lastAttempt).get) { + Log.debug(s"Last attempt to create a queue resulted in $lastAttempt. Sleeping $pollDuration before next attempt") + Thread.sleep(pollDuration.toMillis) + lastAttempt = tryToCreateQueue() + } + Log.info(s"Finishing createQueue with $lastAttempt") + + lastAttempt.get + } + + val namesToQueues: TrieMap[String, Queue] = TrieMap[String,Queue]() + private def sendToQep(envelope: Envelope,queueName:String,logDescription:String):Unit = { - val s: Try[Unit] = for { - queue <- MessageQueueService.service.createQueueIfAbsent(queueName) - sent <- MessageQueueService.service.send(envelope.toJson, queue) - } yield sent - s.transform({itWorked => + def createThisQueue() = createQueue(queueName) + + val queue = namesToQueues.getOrElseUpdate(queueName,createThisQueue) + + MessageQueueService.service.send(envelope.toJson, queue).transform({itWorked => debug(s"$logDescription sent to queue") Success(itWorked) },{throwable: Throwable => throwable match { case NonFatal(x) =>ExceptionWhileSendingMessage(logDescription,queueName,x) case _ => throw throwable } Failure(throwable) }) } private[broadcaster] def callAdapter(message: BroadcastMessage, nodeHandle: NodeHandle): Future[SingleNodeResult] = { val NodeHandle(nodeId, client) = nodeHandle client.query(message).recover { case e: TimeoutException => error(s"Broadcasting to $nodeId timed out") Timeout(nodeId) case NonFatal(e) => error(s"Broadcasting to $nodeId failed with ", e) FailureResult(nodeId, e) } } private[broadcaster] def logResultsIfNecessary(message: BroadcastMessage, result: SingleNodeResult): Unit = logIfNecessary(message) { _ => dao.logQueryResult(message.requestId, result) } private[broadcaster] def logOutboundIfNecessary(message: BroadcastMessage): Unit = logIfNecessary(message) { runQueryReq => dao.logOutboundQuery(message.requestId, message.networkAuthn, runQueryReq.queryDefinition) } private[broadcaster] def logIfNecessary(message: BroadcastMessage)(f: RunQueryRequest => Any): Unit = { message.request match { case runQueryReq: RunQueryRequest => f(runQueryReq) case _ => () } } private def logStartup(): Unit = { def clientToString(client: AdapterClient): String = client match { case r: RemoteAdapterClient => r.poster.url.toString case _ => "" } info(s"Initialized ${getClass.getSimpleName}, will broadcast to the following destinations:") destinations.toSeq.sortBy(_.nodeId.name).foreach { handle => info(s" ${handle.nodeId}: ${clientToString(handle.client)}") } } } case class ExceptionWhileSendingMessage(logDescription:String,queueName:String, x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override def summary: String = s"The Hub encountered an exception while trying to send a message to $queueName" override def description: String = s"The Hub encountered an exception while trying to send a message about $logDescription from $queueName : ${x.getMessage}" } \ No newline at end of file diff --git a/messagequeue/hornetqclient/src/main/resources/reference.conf b/messagequeue/hornetqclient/src/main/resources/reference.conf index 601bf6ae2..2b5afed45 100644 --- a/messagequeue/hornetqclient/src/main/resources/reference.conf +++ b/messagequeue/hornetqclient/src/main/resources/reference.conf @@ -1,24 +1,27 @@ shrine { messagequeue { blockingq { serverUrl = "https://localhost:6443/shrine-metadata/mom" webClientTimeOutSecond = 10 seconds } httpClient { defaultTimeOut = 10 seconds timeOutWaitGap = 1 second } } } //todo typesafe config precedence seems to do the right thing, but I haven't found the rules that say this reference.conf should override others // todo go with the actor system, also check other apps, possiblly they use it too akka { loglevel = INFO // log-config-on-start = on loggers = ["akka.event.slf4j.Slf4jLogger"] // logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" // Toggles whether the threads created by this ActorSystem should be daemons or not daemonic = on -} \ No newline at end of file +} + +//todo not sure exactly what this does, but seems to help +spray.host-connector.pipelining = on diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala index c9d696659..7ad81204c 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala @@ -1,332 +1,334 @@ package net.shrine.hornetqmom import java.util.UUID import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} import net.shrine.config.ConfigExtensions import net.shrine.log.{Log, Loggable} import net.shrine.messagequeueservice.{Message, Queue} import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.source.ConfigSource import org.json4s.native.Serialization import org.json4s.native.Serialization.write import org.json4s.{NoTypeHints, ShortTypeHints} import spray.http.StatusCodes import spray.routing.{HttpService, Route} import scala.collection.concurrent.TrieMap import scala.collection.immutable.Seq import scala.concurrent.duration.Duration import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} /** * A web API that provides access to the internal HornetQMom library. * Allows client to createQueue, deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * Created by yifan on 7/24/17. */ trait HornetQMomWebApi extends HttpService with Loggable { val configPath = "shrine.messagequeue.blockingqWebApi" def webApiConfig = ConfigSource.config.getConfig(configPath) //if(!webClientConfig.getConfigOrEmpty("serverUrl").isEmpty) webClientConfig.getString("serverUrl") def enabled: Boolean = webApiConfig.getBoolean("enabled") val warningMessage: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.blockingqWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" if(!enabled) { debug(s"HornetQMomWebApi is not enabled.") } // keep a map of messages and ids private val idToMessages: TrieMap[UUID, (Message, Long)] = TrieMap.empty case class MapSentinelRunner(timeOutInMillis: Long) extends Runnable { // watches the map override def run(): Unit = { val currentTimeInMillis = System.currentTimeMillis() try { Log.debug("About to clean up outstanding messages.") idToMessages.retain({ (uuid, localHornetQMessageAndCreatedTime) => (currentTimeInMillis - localHornetQMessageAndCreatedTime._2) < timeOutInMillis }) Log.debug(s"Outstanding messages that exceed $timeOutInMillis milliseconds have been cleaned from the map.") } catch { case NonFatal(x) => ExceptionWhileCleaningUpMessageProblem(timeOutInMillis, x) //pass-through to blow up the thread, receive no more results, do something dramatic in UncaughtExceptionHandler. case x => Log.error("Fatal exception while cleaning up outstanding messages", x) throw x } } } def momRoute: Route = pathPrefix("mom") { if (!enabled) { respondWithStatus(StatusCodes.NotFound) { complete(warningMessage) } } else { put { createQueue ~ sendMessage ~ acknowledge } ~ receiveMessage ~ getQueues ~ deleteQueue } } // SQS returns CreateQueueResult, which contains queueUrl: String def createQueue: Route = path("createQueue" / Segment) { queueName => detach() { + debug(s"Start createqueue/$queueName") val createdQueueTry: Try[Queue] = LocalHornetQMom.createQueueIfAbsent(queueName) + debug(s"createqueueTry is $createdQueueTry") createdQueueTry match { case Success(queue) => { implicit val formats = Serialization.formats(NoTypeHints) val response: String = write[Queue](queue)(formats) respondWithStatus(StatusCodes.Created) { complete(response) } } case Failure(x) => { internalServerErrorOccured(x, "createQueue") } } } } // SQS takes in DeleteMessageRequest, which contains a queueUrl: String and a ReceiptHandle: String // returns a DeleteMessageResult, toString for debugging def deleteQueue: Route = path("deleteQueue" / Segment) { queueName => put { detach() { val deleteQueueTry: Try[Unit] = LocalHornetQMom.deleteQueue(queueName) deleteQueueTry match { case Success(v) => { complete(StatusCodes.OK) } case Failure(x) => { internalServerErrorOccured(x, "deleteQueue") } } } } } // SQS sendMessage(String queueUrl, String messageBody) => SendMessageResult def sendMessage: Route = path("sendMessage" / Segment) { toQueue => requestInstance { request => val messageContent = request.entity.asString debug(s"sendMessage to $toQueue '$messageContent'") detach() { val sendTry: Try[Unit] = LocalHornetQMom.send(messageContent, Queue(toQueue)) sendTry match { case Success(v) => { complete(StatusCodes.Accepted) } case Failure(x) => { internalServerErrorOccured(x, "sendMessage") } } } } } // SQS ReceiveMessageResult receiveMessage(String queueUrl) def receiveMessage: Route = get { path("receiveMessage" / Segment) { fromQueue => parameter('timeOutSeconds ? 20) { timeOutSeconds => val timeout: Duration = Duration.create(timeOutSeconds, "seconds") detach() { val receiveTry: Try[Option[Message]] = LocalHornetQMom.receive(Queue(fromQueue), timeout) receiveTry match { case Success(optionMessage) => { optionMessage.fold(complete(StatusCodes.NoContent)){localHornetQMessage => // add message in the map with an unique UUID val msgID = UUID.randomUUID() scheduleCleanupMessageMap(msgID, localHornetQMessage) complete(MessageContainer(msgID.toString, localHornetQMessage.contents).toJson) } } case Failure(x) => { internalServerErrorOccured(x, "receiveMessage") } } } } } } private def scheduleCleanupMessageMap(msgID: UUID, localHornetQMessage: Message) = { idToMessages.update(msgID, (localHornetQMessage, System.currentTimeMillis())) // a sentinel that monitors the hashmap of idToMessages, any message that has been outstanding for more than 3X or 10X // time-to-live need to get cleaned out of this map val messageTimeToLiveInMillis: Long = webApiConfig.get("messageTimeToLive", Duration(_)).toMillis val sentinelRunner: MapSentinelRunner = MapSentinelRunner(messageTimeToLiveInMillis) try { Log.debug(s"Starting the sentinel scheduler that cleans outstanding messages exceeds 3 times $messageTimeToLiveInMillis") MessageMapCleaningScheduler.schedule(sentinelRunner, messageTimeToLiveInMillis * 3, TimeUnit.MILLISECONDS) } catch { case NonFatal(x) => ExceptionWhileSchedulingSentinelProblem(messageTimeToLiveInMillis, x) //pass-through to blow up the thread, receive no more results, do something dramatic in UncaughtExceptionHandler. case x => Log.error("Fatal exception while scheduling a sentinel for cleaning up outstanding messages", x) throw x } } // SQS has DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) def acknowledge: Route = path("acknowledge") { entity(as[String]) { messageUUID => detach() { val id: UUID = UUID.fromString(messageUUID) // retrieve the localMessage from the concurrent hashmap val getMessageTry: Try[Option[(Message, Long)]] = Try { idToMessages.remove(id) }.transform({ messageAndTime => Success(messageAndTime) }, { throwable => Failure(MessageDoesNotExistException(id)) }) getMessageTry match { case Success(messageAndTimeOption) => { messageAndTimeOption.fold({ respondWithStatus(StatusCodes.NotFound) { val noMessageProblem = MessageDoesNotExistInMapProblem(id) complete(noMessageProblem.description) } }) { messageAndTime => messageAndTime._1.complete() complete(StatusCodes.ResetContent) } } case Failure(x) => { x match { case m: MessageDoesNotExistException => { respondWithStatus(StatusCodes.NotFound) { complete(m.getMessage) } } case _ => internalServerErrorOccured(x, "acknowledge") } } } } } } // Returns the names of the queues created on this server. Seq[Any] def getQueues: Route = path("getQueues") { get { detach() { implicit val formats = Serialization.formats(NoTypeHints) respondWithStatus(StatusCodes.OK) { val getQueuesTry: Try[Seq[Queue]] = LocalHornetQMom.queues getQueuesTry match { case Success(seqQueue) => { complete(write[Seq[Queue]](LocalHornetQMom.queues.get)(formats)) } case Failure(x) => { internalServerErrorOccured(x, "getQueues") } } } } } } def internalServerErrorOccured(x: Throwable, function: String): Route = { respondWithStatus(StatusCodes.InternalServerError) { val serverErrorProblem: HornetQMomServerErrorProblem = HornetQMomServerErrorProblem(x, function) debug(s"HornetQ encountered a Problem during $function, Problem Details: $serverErrorProblem") complete(s"HornetQ throws an exception while trying to $function. HornetQ Server response: ${x.getMessage}" + s"Exception is from ${x.getClass}") } } } object MessageMapCleaningScheduler { private val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1) def schedule(command: Runnable, delay: Long, unit: TimeUnit) = { scheduler.schedule(command, delay, unit) } def shutDown() = { scheduler.shutdownNow() } } case class MessageContainer(id: String, contents: String) { def toJson: String = { Serialization.write(this)(MessageContainer.messageFormats) } } object MessageContainer { val messageFormats = Serialization.formats(ShortTypeHints(List(classOf[MessageContainer]))) def fromJson(jsonString: String): MessageContainer = { implicit val formats = messageFormats Serialization.read[MessageContainer](jsonString) } } case class HornetQMomServerErrorProblem(x:Throwable, function:String) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to a server error occurred in hornetQ." override val description: String = s"HornetQ throws an exception while trying to $function," + s" the server's response is: ${x.getMessage} from ${x.getClass}." } //todo is this used anywhere? case class CannotUseHornetQMomWebApiProblem(x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to configuration in shrine.conf." override val description: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" } case class MessageDoesNotExistException(id: UUID) extends Exception(s"Cannot match given ${id.toString} to any Message in HornetQ server! Message does not exist!") case class MessageDoesNotExistInMapProblem(id: UUID) extends AbstractProblem(ProblemSources.Hub) { override def summary: String = s"The client expected message $id, but the server did not find it and could not complete() the message." override def description: String = s"The client expected message $id from , but the server did not find it and could not complete() the message." + s" Message either has never been received or already been completed!" } case class ExceptionWhileCleaningUpMessageProblem(timeOutInMillis: Long, x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override def summary: String = s"The Hub encountered an exception while trying to " + s"cleanup messages that has been outstanding for more than $timeOutInMillis milliseconds" override def description: String = s"The Hub encountered an exception while trying to " + s"cleanup messages that has been received for more than $timeOutInMillis milliseconds " + s"on Thread ${Thread.currentThread().getName}: ${x.getMessage}" } case class ExceptionWhileSchedulingSentinelProblem(timeOutInMillis: Long, x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override def summary: String = s"The Hub encountered an exception while trying to " + s"schedule a sentinel that cleans up outstanding messages exceed $timeOutInMillis milliseconds" override def description: String = s"The Hub encountered an exception while trying to " + s"schedule a sentinel that cleans up outstanding messages exceed $timeOutInMillis milliseconds " + s"on Thread ${Thread.currentThread().getName}: ${x.getMessage}" } \ No newline at end of file