diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala index 9bb003173..cc806c7e0 100644 --- a/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala +++ b/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala @@ -1,241 +1,243 @@ package net.shrine.metadata import java.util.concurrent.atomic.AtomicBoolean import javax.servlet.{ServletContextEvent, ServletContextListener} import com.typesafe.config.Config import net.shrine.broadcaster.{IdAndUrl, NodeListParser} import net.shrine.config.ConfigExtensions import net.shrine.log.Log import net.shrine.messagequeueservice.protocol.Envelope import net.shrine.messagequeueservice.{CouldNotCompleteMomTaskButOKToRetryException, Message, MessageQueueService, Queue} import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.protocol.{AggregatedRunQueryResponse, QueryResult, ResultOutputType, ResultOutputTypes} import net.shrine.qep.querydb.{QepQueryDb, QepQueryDbChangeNotifier, QueryResultRow} import net.shrine.source.ConfigSource import net.shrine.status.protocol.IncrementalQueryResult import scala.concurrent.duration.Duration import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} /** * Receives messages and writes the result to the QEP's cache * * @author david * @since 8/18/17 */ object QepReceiver { val config: Config = ConfigSource.config val nodeName = config.getString("shrine.humanReadableNodeName") val pollDuration = config.get("shrine.messagequeue.receiveWaitTime",Duration(_)) //create a daemon thread that long-polls for messages forever val runner = QepReceiverRunner(nodeName,pollDuration) val pollingThread = new Thread(runner,s"${getClass.getSimpleName} poller") pollingThread.setDaemon(true) pollingThread.setUncaughtExceptionHandler(QepReceiverUncaughtExceptionHandler) def start(): Unit = { pollingThread.start() Log.debug(s"Started the QepReceiver thread for $nodeName") } def stop(): Unit = { runner.stop() } case class QepReceiverRunner(nodeName:String,pollDuration:Duration) extends Runnable { val keepGoing = new AtomicBoolean(true) def stop(): Unit = { keepGoing.set(false) Log.debug(s"${this.getClass.getSimpleName} keepGoing set to ${keepGoing.get()}. Will stop asking for messages after the current request.") } val breakdownTypes: Set[ResultOutputType] = ConfigSource.config.getOptionConfigured("shrine.breakdownResultOutputTypes", ResultOutputTypes.fromConfig).getOrElse(Set.empty) override def run(): Unit = { //if hub, create all the queues if(config.getBoolean("shrine.hub.create")) { val otherNodes: List[IdAndUrl] = config.getOptionConfigured("shrine.hub.downstreamNodes", NodeListParser(_)).getOrElse(Nil).to[List] val thisNode:Option[String] = if (config.getBoolean("shrine.hub.shouldQuerySelf")) Some(nodeName) else None val nodeNames = ( thisNode :: otherNodes.map(n => Some(n.nodeId.name)) ).flatten nodeNames.foreach(createQueue) } val queue = createQueue(nodeName) while (keepGoing.get()) { //forever try { Log.debug("About to call receive.") receiveAMessage(queue) Log.debug("Called receive.") } catch { case NonFatal(x) => ExceptionWhileReceivingMessage(queue,x) //pass-through to blow up the thread, receive no more results, do something dramatic in UncaughtExceptionHandler. case x => Log.error("Fatal exception while receiving a message", x) throw x } } Log.debug(s"QepReceiverRunner will stop. keepGoing is ${keepGoing.get()}") } def receiveAMessage(queue:Queue): Unit = { val maybeMessage: Try[Option[Message]] = MessageQueueService.service.receive(queue, pollDuration) + Log.debug(s"$maybeMessage") + maybeMessage.transform({m => m.map(interpretAMessage(_,queue)).getOrElse(Success()) },{x => x match { case cncmtbotrx:CouldNotCompleteMomTaskButOKToRetryException => { Log.debug(s"Last attempt to receive resulted in ${cncmtbotrx.getMessage}. Sleeping $pollDuration before next attempt") Thread.sleep(pollDuration.toMillis) } case NonFatal(nfx) => ExceptionWhileReceivingMessage(queue,x) //todo start here. Look in at the logs to see what's what - case _ => //pass through + case fatal => throw fatal } Failure(x) }) } def interpretAMessage(message: Message,queue: Queue): Try[Unit] = { val unit = () Log.debug(s"Received a message from $queue of $message") val envelopeJson = message.contents Envelope.fromJson(envelopeJson). flatMap{ case e:Envelope => e.checkVersionExactMatch case notE => Failure(new IllegalArgumentException(s"Not an expected message Envelope but a ${notE.getClass}")) }. flatMap { case Envelope(contentsType, contents, shrineVersion) if contentsType == AggregatedRunQueryResponse.getClass.getSimpleName => AggregatedRunQueryResponse.fromXmlString(breakdownTypes)(contents).flatMap{ rqr => QepQueryDb.db.insertQueryResult(rqr.queryId, rqr.results.head) Log.debug(s"Inserted result from ${rqr.results.head.description} for query ${rqr.queryId}") QepQueryDbChangeNotifier.triggerDataChangeFor(rqr.queryId) Success(unit) } case Envelope(contentsType, contents, shrineVersion) if contentsType == IncrementalQueryResult.incrementalQueryResultsEnvelopeContentsType => val changeDate = System.currentTimeMillis() IncrementalQueryResult.seqFromJson(contents).flatMap { iqrs: Seq[IncrementalQueryResult] => val rows = iqrs.map(iqr => QueryResultRow( resultId = -1L, networkQueryId = iqr.networkQueryId, instanceId = -1L, adapterNode = iqr.adapterNodeName, resultType = None, size = 0L, startDate = None, endDate = None, status = QueryResult.StatusType.valueOf(iqr.statusTypeName).get, statusMessage = Some(iqr.statusMessage), changeDate = changeDate )) QepQueryDb.db.insertQueryResultRows(rows) Log.debug(s"Inserted incremental results $iqrs") rows.headOption.foreach(row => QepQueryDbChangeNotifier.triggerDataChangeFor(row.networkQueryId)) Success(unit) } case e:Envelope => Failure(UnexpectedMessageContentsTypeException(e,queue)) case _ => Failure(new IllegalArgumentException(s"Received something other than an envelope from this queue: $envelopeJson")) }.transform({ s => message.complete() Success(unit) },{ x => x match { case NonFatal(nfx) => QepReceiverCouldNotDecodeMessage(envelopeJson,queue,x) case throwable => throw throwable//blow something up } message.complete() //complete anyway. Can't be interpreted, so we don't want to see it again Failure(x) }) } def createQueue(nodeName:String):Queue = { //Either come back with the right exception to try again, or a Queue def tryToCreateQueue():Try[Queue] = MessageQueueService.service.createQueueIfAbsent(nodeName) def keepGoing(attempt:Try[Queue]):Try[Boolean] = attempt.transform({queue => Success(false)}, { case okIsh: CouldNotCompleteMomTaskButOKToRetryException => Success(true) case x => Failure(x) }) //todo for fun figure out how to do this without the var. maybe a Stream ? SHRINE-2211 var lastAttempt:Try[Queue] = tryToCreateQueue() while(keepGoing(lastAttempt).get) { Log.debug(s"Last attempt to create a queue resulted in $lastAttempt. Sleeping $pollDuration before next attempt") Thread.sleep(pollDuration.toMillis) lastAttempt = tryToCreateQueue() } Log.info(s"Finishing createQueue with $lastAttempt") lastAttempt.get } } } object QepReceiverUncaughtExceptionHandler extends Thread.UncaughtExceptionHandler { override def uncaughtException(thread: Thread, throwable: Throwable): Unit = QepReceiverThreadEndedByThrowable(thread,throwable) } class QueueReceiverContextListener extends ServletContextListener { override def contextInitialized(servletContextEvent: ServletContextEvent): Unit = { QepReceiver.start() } override def contextDestroyed(servletContextEvent: ServletContextEvent): Unit = { QepReceiver.stop() QepQueryDbChangeNotifier.scheduler.shutdown() } } case class UnexpectedMessageContentsTypeException(envelope: Envelope, queue: Queue) extends Exception(s"Could not interpret message with contents type of ${envelope.contentsType} from queue ${queue.name} from shrine version ${envelope.shrineVersion}") case class ExceptionWhileReceivingMessage(queue:Queue, x:Throwable) extends AbstractProblem(ProblemSources.Qep) { override val throwable = Some(x) override def summary: String = s"The QEP encountered an exception while trying to receive a message from $queue" override def description: String = s"The QEP encountered an exception while trying to receive a message from $queue on ${Thread.currentThread().getName}: ${x.getMessage}" } case class QepReceiverCouldNotDecodeMessage(messageString:String,queue:Queue, x:Throwable) extends AbstractProblem(ProblemSources.Qep) { override val throwable = Some(x) override def summary: String = s"The QEP could not decode a message from $queue" override def description: String = s"""The QEP encountered an exception while trying to decode a message from $queue on ${Thread.currentThread().getName}: |${x.getMessage} |$messageString""".stripMargin } case class QepReceiverThreadEndedByThrowable(thread:Thread, x:Throwable) extends AbstractProblem(ProblemSources.Qep) { override val throwable = Some(x) override def summary: String = s"The Qep Receiver's thread stopped because of an uncaught exception." override def description: String = s"""The Qep Receiver's thread ${thread.getName} stopped because of an uncaught exception""" } \ No newline at end of file diff --git a/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala b/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala index 8e39fa0ed..8ec3292b7 100644 --- a/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala +++ b/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala @@ -1,218 +1,216 @@ package net.shrine.hornetqclient import java.util.UUID import akka.actor.ActorSystem import net.shrine.config.ConfigExtensions import net.shrine.hornetqmom.MessageContainer import net.shrine.log.Loggable import net.shrine.messagequeueservice.{CouldNotCompleteMomTaskButOKToRetryException, Message, MessageQueueService, Queue} import net.shrine.source.ConfigSource import org.json4s.NoTypeHints import org.json4s.native.Serialization import org.json4s.native.Serialization.read import spray.http.{HttpEntity, HttpMethods, HttpRequest, HttpResponse, StatusCodes} import scala.collection.immutable.Seq import scala.concurrent.duration.Duration import scala.language.postfixOps import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} /** * A simple HornetQMomWebClient that uses HornetQMomWebApi to createQueue, * deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * @author yifan * @since 8/10/17 */ object HornetQMomWebClient extends MessageQueueService with Loggable { // we need an ActorSystem to host our application in implicit val system: ActorSystem = ActorSystem("momServer", ConfigSource.config) val configPath = "shrine.messagequeue.blockingq" def webClientConfig = ConfigSource.config.getConfig("shrine.messagequeue.blockingq") //todo Yifan's work changes the name to webClientTimeOut val webClientTimeOut: Duration = webClientConfig.get("webClientTimeOutSecond", Duration(_)) // TODO in SHRINE-2167: Extract and share a SHRINE actor system // the service actor replies to incoming HttpRequests // implicit val serviceActor: ActorRef = startServiceActor() // def startActorSystem(): ActorSystem = try { // val actorSystem: ActorSystem = ActorSystem("momServer", ConfigSource.config) // info(s"Starting ActorSystem: ${actorSystem.name} for HornetQMomWebClient at time: ${actorSystem.startTime}") // actorSystem // } catch { // case NonFatal(x) => { // debug(s"NonFatalException thrown while starting ActorSystem for HornetQMomWebClient: ${x.getMessage}") // throw x // } // case x: ExceptionInInitializerError => { // debug(s"ExceptionInInitializerError thrown while starting ActorSystem for HornetQMomWebClient: ${x.getMessage}") // throw x // } // } // // def startServiceActor(): ActorRef = try { // // the service actor replies to incoming HttpRequests // val actor: ActorRef = system.actorOf(Props[HornetQMomWebClientServiceActor]) // info(s"Starting ServiceActor: ${actor.toString()} for HornetQMomWebClient") // actor // } // catch { // case NonFatal(x) => { // debug(s"NonFatalException thrown while starting ServiceActor for HornetQMomWebClient: ${x.getMessage}") // throw x // } // case x: ExceptionInInitializerError => { // debug(s"ExceptionInInitializerError thrown while starting ServiceActor for HornetQMomWebClient: ${x.getMessage}") // throw x // } // } val momUrl: String = webClientConfig.getString("serverUrl") override def createQueueIfAbsent(queueName: String): Try[Queue] = { val proposedQueue: Queue = Queue(queueName) val createQueueUrl = momUrl + s"/createQueue/${proposedQueue.name}" val request: HttpRequest = HttpRequest(HttpMethods.PUT, createQueueUrl) for { response: HttpResponse <- Try(HttpClient.webApiCall(request, webClientTimeOut)) queue: Queue <- queueFromResponse(response,queueName) } yield queue } def queueFromResponse(response: HttpResponse,queueName:String):Try[Queue] = Try { if(response.status == StatusCodes.Created) { val queueString = response.entity.asString implicit val formats = Serialization.formats(NoTypeHints) read[Queue](queueString)(formats, manifest[Queue]) } else { if((response.status == StatusCodes.NotFound) || (response.status == StatusCodes.RequestTimeout)) throw new CouldNotCompleteMomTaskButOKToRetryException(s"create a queue named $queueName",Some(response.status),Some(response.entity.asString)) else throw new IllegalStateException(s"Response status is ${response.status}, not Created. Cannot make a queue from this response: ${response.entity.asString}") //todo more specific custom exception SHRINE-2213 } }.transform({ s => Success(s) },{throwable => throwable match { case NonFatal(x) => error(s"Unable to create a Queue from '${response.entity.asString}' due to exception",throwable) //todo probably want to wrap more information into a new Throwable here SHRINE-2213 case _ => } Failure(throwable) }) override def deleteQueue(queueName: String): Try[Unit] = { val proposedQueue: Queue = Queue(queueName) val deleteQueueUrl = momUrl + s"/deleteQueue/${proposedQueue.name}" val request: HttpRequest = HttpRequest(HttpMethods.PUT, deleteQueueUrl) Try(HttpClient.webApiCall(request, webClientTimeOut)) // StatusCodes.OK } override def queues: Try[Seq[Queue]] = { val getQueuesUrl = momUrl + s"/getQueues" val request: HttpRequest = HttpRequest(HttpMethods.GET, getQueuesUrl) for { response: HttpResponse <- Try(HttpClient.webApiCall(request, webClientTimeOut)) allQueues: Seq[Queue] <- Try { val allQueues: String = response.entity.asString implicit val formats = Serialization.formats(NoTypeHints) read[Seq[Queue]](allQueues)(formats, manifest[Seq[Queue]]) } } yield allQueues } override def send(contents: String, to: Queue): Try[Unit] = { debug(s"send to $to '$contents'") val sendMessageUrl = momUrl + s"/sendMessage/${to.name}" val request: HttpRequest = HttpRequest( method = HttpMethods.PUT, uri = sendMessageUrl, entity = HttpEntity(contents) //todo set contents as XML or json SHRINE-2215 ) for { response: HttpResponse <- Try(HttpClient.webApiCall(request, webClientTimeOut)) } yield response } //todo test receiving no message SHRINE-2213 override def receive(from: Queue, timeout: Duration): Try[Option[Message]] = { val seconds = timeout.toSeconds val receiveMessageUrl = momUrl + s"/receiveMessage/${from.name}?timeOutSeconds=$seconds" val request: HttpRequest = HttpRequest(HttpMethods.GET, receiveMessageUrl) for { //use the time to make the API call plus the timeout for the long poll response: HttpResponse <- Try(HttpClient.webApiCall(request, webClientTimeOut + timeout)) messageResponse: Option[Message] <- messageOptionFromResponse(response,from) } yield messageResponse } def messageOptionFromResponse(response: HttpResponse,from:Queue):Try[Option[Message]] = Try { if(response.status == StatusCodes.NotFound) None else if (response.status == StatusCodes.RequestTimeout) { - //todo wait a bit before trying again - None - } - else if (response.status == StatusCodes.OK) Some { + throw new CouldNotCompleteMomTaskButOKToRetryException(s"receive a message from ${from.name}",Some(response.status),Some(response.entity.asString)) + } else if (response.status == StatusCodes.OK) Some { //todo move to top SHRINE-2216 val responseString = response.entity.asString MessageContainer.fromJson(responseString) } else if(response.status == StatusCodes.InternalServerError) { throw new CouldNotCompleteMomTaskButOKToRetryException(s"receive a message from ${from.name}",Some(response.status),Some(response.entity.asString)) } else { throw new IllegalStateException(s"Response status is ${response.status}, not OK or NotFound. Cannot make a Message from this response: ${response.entity.asString}") } }.transform({ s => val hornetQMessage = s.map(msg => HornetQClientMessage(UUID.fromString(msg.id), msg.contents)) Success(hornetQMessage) },{throwable => throwable match { case NonFatal(x) => error(s"Unable to create a Message from '${response.entity.asString}' due to exception",throwable) //todo probably want to report a Problem here SHRINE-2216 case _ => } Failure(throwable) }) case class HornetQClientMessage private(messageID: UUID, messageContent: String) extends Message { override def contents: String = messageContent override def complete(): Try[Unit] = { val entity: HttpEntity = HttpEntity(messageID.toString) val completeMessageUrl: String = s"$momUrl/acknowledge" val request: HttpRequest = HttpRequest(HttpMethods.PUT, completeMessageUrl).withEntity(entity) for { response: HttpResponse <- Try(HttpClient.webApiCall(request)).transform({r => info(s"Message ${this.messageID} completed with ${r.status}") Success(r) }, { throwable => debug(s"Message ${this.messageID} failed in its complete process due to ${throwable.getMessage}") Failure(throwable) }) } yield response } } } // TODO in SHRINE-2167: Extract and share a SHRINE actor system //class HornetQMomWebClientServiceActor extends Actor with MetaDataService { // // // the HttpService trait defines only one abstract member, which // // connects the services environment to the enclosing actor or test // def actorRefFactory: ActorRefFactory = context // // // this actor only runs our route, but you could add // // other things here, like request stream processing // // or timeout handling // def receive: Receive = runRoute(route) // // override implicit val ec: ExecutionContext = ExecutionContext.Implicits.global // // override def system: ActorSystem = context.system //} \ No newline at end of file