diff --git a/messagequeue/hornetqclient/src/main/resources/reference.conf b/messagequeue/hornetqclient/src/main/resources/reference.conf index 1d6ba53d4..1147a8d80 100644 --- a/messagequeue/hornetqclient/src/main/resources/reference.conf +++ b/messagequeue/hornetqclient/src/main/resources/reference.conf @@ -1,24 +1,24 @@ shrine { messagequeue { hornetq { serverUrl = "https://localhost:6443/shrine-metadata/mom" webClientTimeOutSecond = 10 seconds } httpClient { - defaultTimeOutSecond = 10 seconds + defaultTimeOut = 10 seconds timeOutWaitGap = 1 second } } } //todo typesafe config precedence seems to do the right thing, but I haven't found the rules that say this reference.conf should override others // todo go with the actor system, also check other apps, possiblly they use it too akka { loglevel = INFO // log-config-on-start = on loggers = ["akka.event.slf4j.Slf4jLogger"] // logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" // Toggles whether the threads created by this ActorSystem should be daemons or not daemonic = on } \ No newline at end of file diff --git a/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala b/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala index 4495da2c1..4d6e97f03 100644 --- a/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala +++ b/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HornetQMomWebClient.scala @@ -1,197 +1,207 @@ package net.shrine.hornetqclient +import java.util.UUID import akka.actor.ActorSystem import net.shrine.config.ConfigExtensions +import net.shrine.hornetqmom.MessageContainer import net.shrine.log.Loggable -import net.shrine.messagequeueservice.{Message, MessageQueueService, MessageSerializer, Queue, QueueSerializer} +import net.shrine.messagequeueservice.{Message, MessageQueueService, Queue} import net.shrine.source.ConfigSource +import org.json4s.NoTypeHints import org.json4s.native.Serialization -import org.json4s.native.Serialization.{read, write} -import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.Serialization.read import spray.http.{HttpEntity, HttpMethods, HttpRequest, HttpResponse, StatusCode, StatusCodes} import scala.collection.immutable.Seq import scala.concurrent.duration.Duration import scala.language.postfixOps import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} /** * A simple HornetQMomWebClient that uses HornetQMomWebApi to createQueue, * deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * @author yifan * @since 8/10/17 */ object HornetQMomWebClient extends MessageQueueService with Loggable { // we need an ActorSystem to host our application in implicit val system: ActorSystem = ActorSystem("momServer", ConfigSource.config) val webClientTimeOutSecond: Duration = ConfigSource.config.get("shrine.messagequeue.hornetq.webClientTimeOutSecond", Duration(_)) // TODO in SHRINE-2167: Extract and share a SHRINE actor system // the service actor replies to incoming HttpRequests // implicit val serviceActor: ActorRef = startServiceActor() // def startActorSystem(): ActorSystem = try { // val actorSystem: ActorSystem = ActorSystem("momServer", ConfigSource.config) // info(s"Starting ActorSystem: ${actorSystem.name} for HornetQMomWebClient at time: ${actorSystem.startTime}") // actorSystem // } catch { // case NonFatal(x) => { // debug(s"NonFatalException thrown while starting ActorSystem for HornetQMomWebClient: ${x.getMessage}") // throw x // } // case x: ExceptionInInitializerError => { // debug(s"ExceptionInInitializerError thrown while starting ActorSystem for HornetQMomWebClient: ${x.getMessage}") // throw x // } // } // // def startServiceActor(): ActorRef = try { // // the service actor replies to incoming HttpRequests // val actor: ActorRef = system.actorOf(Props[HornetQMomWebClientServiceActor]) // info(s"Starting ServiceActor: ${actor.toString()} for HornetQMomWebClient") // actor // } // catch { // case NonFatal(x) => { // debug(s"NonFatalException thrown while starting ServiceActor for HornetQMomWebClient: ${x.getMessage}") // throw x // } // case x: ExceptionInInitializerError => { // debug(s"ExceptionInInitializerError thrown while starting ServiceActor for HornetQMomWebClient: ${x.getMessage}") // throw x // } // } val momUrl: String = ConfigSource.config.getString("shrine.messagequeue.hornetq.serverUrl") override def createQueueIfAbsent(queueName: String): Try[Queue] = { val proposedQueue: Queue = Queue(queueName) val createQueueUrl = momUrl + s"/createQueue/${proposedQueue.name}" val request: HttpRequest = HttpRequest(HttpMethods.PUT, createQueueUrl) for { response: HttpResponse <- Try(HttpClient.webApiCall(request, webClientTimeOutSecond)) queue: Queue <- queueFromResponse(response) } yield queue } def queueFromResponse(response: HttpResponse):Try[Queue] = Try { if(response.status == StatusCodes.Created) { val queueString = response.entity.asString - implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer + implicit val formats = Serialization.formats(NoTypeHints) read[Queue](queueString)(formats, manifest[Queue]) } else { if((response.status == StatusCodes.NotFound) || (response.status == StatusCodes.RequestTimeout))throw new CouldNotCreateQueueButOKToRetryException(response.status,response.entity.asString) else throw new IllegalStateException(s"Response status is ${response.status}, not Created. Cannot make a queue from this response: ${response.entity.asString}") //todo more specific custom exception SHRINE-2213 } }.transform({ s => Success(s) },{throwable => throwable match { case NonFatal(x) => error(s"Unable to create a Queue from '${response.entity.asString}' due to exception",throwable) //todo probably want to wrap more information into a new Throwable here SHRINE-2213 case _ => } Failure(throwable) }) override def deleteQueue(queueName: String): Try[Unit] = { val proposedQueue: Queue = Queue(queueName) val deleteQueueUrl = momUrl + s"/deleteQueue/${proposedQueue.name}" val request: HttpRequest = HttpRequest(HttpMethods.PUT, deleteQueueUrl) Try(HttpClient.webApiCall(request, webClientTimeOutSecond)) // StatusCodes.OK } override def queues: Try[Seq[Queue]] = { val getQueuesUrl = momUrl + s"/getQueues" val request: HttpRequest = HttpRequest(HttpMethods.GET, getQueuesUrl) for { response: HttpResponse <- Try(HttpClient.webApiCall(request, webClientTimeOutSecond)) allQueues: Seq[Queue] <- Try { val allQueues: String = response.entity.asString - implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer + implicit val formats = Serialization.formats(NoTypeHints) read[Seq[Queue]](allQueues)(formats, manifest[Seq[Queue]]) } } yield allQueues } override def send(contents: String, to: Queue): Try[Unit] = { debug(s"send to $to '$contents'") val sendMessageUrl = momUrl + s"/sendMessage/${to.name}" val request: HttpRequest = HttpRequest( method = HttpMethods.PUT, uri = sendMessageUrl, entity = HttpEntity(contents) //todo set contents as XML or json SHRINE-2215 ) for { response: HttpResponse <- Try(HttpClient.webApiCall(request, webClientTimeOutSecond)) } yield response } //todo test receiving no message SHRINE-2213 override def receive(from: Queue, timeout: Duration): Try[Option[Message]] = { val seconds = timeout.toSeconds val receiveMessageUrl = momUrl + s"/receiveMessage/${from.name}?timeOutSeconds=$seconds" val request: HttpRequest = HttpRequest(HttpMethods.GET, receiveMessageUrl) for { response: HttpResponse <- Try(HttpClient.webApiCall(request, webClientTimeOutSecond)) messageResponse: Option[Message] <- messageOptionFromResponse(response) } yield messageResponse } def messageOptionFromResponse(response: HttpResponse):Try[Option[Message]] = Try { if(response.status == StatusCodes.NotFound) None else if (response.status == StatusCodes.OK) Some { val responseString = response.entity.asString - val formats = Serialization.formats(NoTypeHints) + new MessageSerializer - read[Message](responseString)(formats, manifest[Message]) + MessageContainer.fromJson(responseString) } else { throw new IllegalStateException(s"Response status is ${response.status}, not OK or NotFound. Cannot make a Message from this response: ${response.entity.asString}") } }.transform({ s => - Success(s) + val hornetQMessage = s.map(msg => HornetQClientMessage(UUID.fromString(msg.id), msg.contents)) + Success(hornetQMessage) },{throwable => throwable match { case NonFatal(x) => error(s"Unable to create a Message from '${response.entity.asString}' due to exception",throwable) //todo probably want to report a Problem here SHRINE-2216 case _ => } Failure(throwable) }) - - override def completeMessage(message: Message): Try[Unit] = { - implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer - val messageString: String = write[Message](message)(formats) - - val entity: HttpEntity = HttpEntity(messageString) - val completeMessageUrl: String = momUrl + s"/acknowledge" // HttpEntity - val request: HttpRequest = HttpRequest(HttpMethods.PUT, completeMessageUrl).withEntity(entity) - for { - response: HttpResponse <- Try(HttpClient.webApiCall(request, webClientTimeOutSecond)) - } yield response + case class HornetQClientMessage private(messageID: UUID, messageContent: String) extends Message { + + override def contents: String = messageContent + + override def complete(): Try[Unit] = { + val entity: HttpEntity = HttpEntity(messageID.toString) + val completeMessageUrl: String = s"$momUrl/acknowledge" + val request: HttpRequest = HttpRequest(HttpMethods.PUT, completeMessageUrl).withEntity(entity) + for { + response: HttpResponse <- Try(HttpClient.webApiCall(request)).transform({r => + info(s"Message ${this.messageID} completed with ${r.status}") + Success(r) + }, { throwable => + debug(s"Message ${this.messageID} failed in its complete process due to ${throwable.getMessage}") + Failure(throwable) + }) + } yield response + } } + } case class CouldNotCreateQueueButOKToRetryException(status:StatusCode,contents:String) extends Exception(s"Could not create a queue due to status code $status with message '$contents'") // TODO in SHRINE-2167: Extract and share a SHRINE actor system //class HornetQMomWebClientServiceActor extends Actor with MetaDataService { // // // the HttpService trait defines only one abstract member, which // // connects the services environment to the enclosing actor or test // def actorRefFactory: ActorRefFactory = context // // // this actor only runs our route, but you could add // // other things here, like request stream processing // // or timeout handling // def receive: Receive = runRoute(route) // // override implicit val ec: ExecutionContext = ExecutionContext.Implicits.global // // override def system: ActorSystem = context.system //} \ No newline at end of file diff --git a/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HttpClient.scala b/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HttpClient.scala index 0ffa574ac..a9c0d5823 100644 --- a/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HttpClient.scala +++ b/messagequeue/hornetqclient/src/main/scala/net/shrine/hornetqclient/HttpClient.scala @@ -1,102 +1,102 @@ package net.shrine.hornetqclient import java.security.cert.X509Certificate import javax.net.ssl.{SSLContext, X509TrustManager} import akka.actor.{ActorRef, ActorSystem} import akka.io.IO import akka.pattern.ask import net.shrine.log.Loggable import net.shrine.source.ConfigSource import net.shrine.config.ConfigExtensions import spray.can.Http import spray.can.Http.{ConnectionAttemptFailedException, HostConnectorSetup} import spray.http.{HttpEntity, HttpRequest, HttpResponse, StatusCodes} import spray.io.ClientSSLEngineProvider import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration.{Duration, DurationInt, DurationLong} import scala.concurrent.{Await, Future, TimeoutException} import scala.language.postfixOps import scala.util.control.NonFatal /** * A simple HttpClient to use inside the HttpDirectives */ object HttpClient extends Loggable { //todo hand back a Try, Failures with custom exceptions instead of a crappy response //todo Really a Future would be even better def webApiCall(request:HttpRequest, - timeout:Duration = ConfigSource.config.get("shrine.messagequeue.httpClient.defaultTimeOutSecond", Duration(_))) + timeout:Duration = ConfigSource.config.get("shrine.messagequeue.httpClient.defaultTimeOut", Duration(_))) (implicit system: ActorSystem): HttpResponse = { val deadline = System.currentTimeMillis() + timeout.toMillis val transport: ActorRef = IO(Http)(system) debug(s"Requesting $request uri is ${request.uri} path is ${request.uri.path}") val future: Future[HttpResponse] = for { Http.HostConnectorInfo(connector, _) <- transport.ask(createConnector(request))(deadline - System.currentTimeMillis() milliseconds) response <- connector.ask(request)(deadline - System.currentTimeMillis() milliseconds).mapTo[HttpResponse] } yield response try { //wait a second longer than the deadline before timing out the Await, to let the actors timeout val timeOutWaitGap = ConfigSource.config.get("shrine.messagequeue.httpClient.timeOutWaitGap", Duration(_)).toMillis Await.result(future, deadline + timeOutWaitGap - System.currentTimeMillis() milliseconds) } catch { //todo definitely need the Try instead of this sloppy replacement of the HttpResponse. case x: TimeoutException => { debug(s"${request.uri} failed with ${x.getMessage}", x) HttpResponse(status = StatusCodes.RequestTimeout, entity = HttpEntity(s"${request.uri} timed out after ${timeout}. ${x.getMessage}")) } //todo is there a better message? What comes up in real life? case x: ConnectionAttemptFailedException => { //no web service is there to respond debug(s"${request.uri} failed with ${x.getMessage}", x) HttpResponse(status = StatusCodes.NotFound, entity = HttpEntity(s"${request.uri} failed with ${x.getMessage}")) } case NonFatal(x) => { debug(s"${request.uri} failed with ${x.getMessage}", x) HttpResponse(status = StatusCodes.InternalServerError, entity = HttpEntity(s"${request.uri} failed with ${x.getMessage}")) } } } //from https://github.com/TimothyKlim/spray-ssl-poc/blob/master/src/main/scala/Main.scala //trust all SSL contexts. We just want encrypted comms. implicit val trustfulSslContext: SSLContext = { class IgnoreX509TrustManager extends X509TrustManager { def checkClientTrusted(chain: Array[X509Certificate], authType: String) {} def checkServerTrusted(chain: Array[X509Certificate], authType: String) {} def getAcceptedIssuers = null } val context = SSLContext.getInstance("TLS") context.init(null, Array(new IgnoreX509TrustManager), null) info("trustfulSslContex initialized") context } implicit val clientSSLEngineProvider = //todo lookup this constructor ClientSSLEngineProvider { _ => val engine = trustfulSslContext.createSSLEngine() engine.setUseClientMode(true) engine } def createConnector(request: HttpRequest) = { val connector = new HostConnectorSetup(host = request.uri.authority.host.toString, port = request.uri.effectivePort, sslEncryption = request.uri.scheme == "https", defaultHeaders = request.headers) connector } } diff --git a/messagequeue/hornetqclient/src/test/hornetQMomClientTest/HornetQMomWebClientTest.scala b/messagequeue/hornetqclient/src/test/hornetQMomClientTest/HornetQMomWebClientTest.scala index c8c46d198..bb1bd15fd 100644 --- a/messagequeue/hornetqclient/src/test/hornetQMomClientTest/HornetQMomWebClientTest.scala +++ b/messagequeue/hornetqclient/src/test/hornetQMomClientTest/HornetQMomWebClientTest.scala @@ -1,28 +1,28 @@ /** * A simple scala script that test HornetQMomWedClient * * Load the file in scala REPL to test HornetQMomWebClient * * Created by yifan on 8/14/17. */ // To Run the test, simply start the scala REPL and load the file // Enter the following commands using the command line // #1. Start scala REPL with 2.11.8 version // mvn -Dscala-version="2.11.8" scala:console // #2. Load file in REPL: // :load import net.shrine.hornetqclient.HornetQMomWebClient import net.shrine.messagequeueservice.{Message, Queue} import scala.collection.immutable.Seq val firstQueue: Queue = HornetQMomWebClient.createQueueIfAbsent("firstQueue").get HornetQMomWebClient.send("firstMessage", firstQueue) import scala.concurrent.duration.Duration val firstDuration: Duration = Duration.create(1, "seconds") val receivedMsg: Option[Message] = HornetQMomWebClient.receive(firstQueue, firstDuration).get val msg: Message = receivedMsg.get val allQueues: Seq[Queue] = HornetQMomWebClient.queues.get -HornetQMomWebClient.completeMessage(msg) +msg.complete() HornetQMomWebClient.deleteQueue("q1") \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/main/resources/reference.conf b/messagequeue/hornetqmom/src/main/resources/reference.conf index f6c2ac407..216a2d455 100644 --- a/messagequeue/hornetqmom/src/main/resources/reference.conf +++ b/messagequeue/hornetqmom/src/main/resources/reference.conf @@ -1,9 +1,10 @@ shrine { messagequeue { hornetq { } hornetQWebApi { enabled = false + messageTimeOutSeconds = 10 seconds } } } \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala index fd7805849..98840ca69 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala @@ -1,197 +1,320 @@ package net.shrine.hornetqmom -import net.shrine.log.Loggable -import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue, QueueSerializer} +import java.util.UUID + +import net.shrine.config.ConfigExtensions +import net.shrine.hornetqmom.LocalHornetQMom.LocalHornetQMessage +import net.shrine.log.{Log, Loggable} +import net.shrine.messagequeueservice.{Message, Queue} import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.source.ConfigSource import org.json4s.native.Serialization -import org.json4s.native.Serialization.{read, write} -import org.json4s.{Formats, NoTypeHints} +import org.json4s.native.Serialization.write +import org.json4s.{NoTypeHints, ShortTypeHints} import spray.http.StatusCodes import spray.routing.{HttpService, Route} +import scala.collection.concurrent.TrieMap import scala.collection.immutable.Seq import scala.concurrent.duration.Duration +import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} /** * A web API that provides access to the internal HornetQMom library. * Allows client to createQueue, deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * Created by yifan on 7/24/17. */ trait HornetQMomWebApi extends HttpService with Loggable { def enabled: Boolean = ConfigSource.config.getBoolean("shrine.messagequeue.hornetQWebApi.enabled") val warningMessage: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" + // keep a map of messages and ids + private val idToMessages: TrieMap[UUID, (Message, Long)] = TrieMap.empty + + case class MapSentinelRunner(timeOutInMillis: Long) extends Runnable { + // watches the map + override def run(): Unit = { + val currentTimeInMillis = System.currentTimeMillis() + try { + Log.debug("About to clean up outstanding messages.") + idToMessages.retain({ (uuid, localHornetQMessageAndCreatedTime) => + (currentTimeInMillis - localHornetQMessageAndCreatedTime._2) < timeOutInMillis + }) + Log.debug(s"Outstanding messages that exceed $timeOutInMillis milliseconds have been cleaned from the map.") + } catch { + case NonFatal(x) => ExceptionWhileCleaningUpMessageProblem(timeOutInMillis, x) + //pass-through to blow up the thread, receive no more results, do something dramatic in UncaughtExceptionHandler. + case x => Log.error("Fatal exception while cleaning up outstanding messages", x) + throw x + } + } + } + def momRoute: Route = pathPrefix("mom") { if (!enabled) { val configProblem: CannotUseHornetQMomWebApiProblem = CannotUseHornetQMomWebApiProblem(new UnsupportedOperationException) warn(s"HornetQMomWebApi is not available to use due to configProblem ${configProblem.description}!") respondWithStatus(StatusCodes.NotFound) { complete(warningMessage) } } else { put { createQueue ~ sendMessage ~ acknowledge } ~ receiveMessage ~ getQueues ~ deleteQueue } } // SQS returns CreateQueueResult, which contains queueUrl: String def createQueue: Route = path("createQueue" / Segment) { queueName => detach() { val createdQueueTry: Try[Queue] = LocalHornetQMom.createQueueIfAbsent(queueName) createdQueueTry match { case Success(queue) => { - implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer + implicit val formats = Serialization.formats(NoTypeHints) val response: String = write[Queue](queue)(formats) respondWithStatus(StatusCodes.Created) { complete(response) } } case Failure(x) => { internalServerErrorOccured(x, "createQueue") } } } } // SQS takes in DeleteMessageRequest, which contains a queueUrl: String and a ReceiptHandle: String // returns a DeleteMessageResult, toString for debugging def deleteQueue: Route = path("deleteQueue" / Segment) { queueName => put { detach() { val deleteQueueTry: Try[Unit] = LocalHornetQMom.deleteQueue(queueName) deleteQueueTry match { case Success(v) => { complete(StatusCodes.OK) } case Failure(x) => { internalServerErrorOccured(x, "deleteQueue") } } } } } // SQS sendMessage(String queueUrl, String messageBody) => SendMessageResult def sendMessage: Route = path("sendMessage" / Segment) { toQueue => requestInstance { request => val messageContent = request.entity.asString debug(s"sendMessage to $toQueue '$messageContent'") detach() { val sendTry: Try[Unit] = LocalHornetQMom.send(messageContent, Queue(toQueue)) sendTry match { case Success(v) => { complete(StatusCodes.Accepted) } case Failure(x) => { internalServerErrorOccured(x, "sendMessage") } } } } } // SQS ReceiveMessageResult receiveMessage(String queueUrl) def receiveMessage: Route = get { path("receiveMessage" / Segment) { fromQueue => parameter('timeOutSeconds ? 20) { timeOutSeconds => val timeout: Duration = Duration.create(timeOutSeconds, "seconds") detach() { val receiveTry: Try[Option[Message]] = LocalHornetQMom.receive(Queue(fromQueue), timeout) receiveTry match { - case Success(optMessage) => { - optMessage.fold(complete(StatusCodes.NotFound)){msg => - implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer - val messageJson = write(msg) - complete(messageJson) + case Success(optionMessage) => { + optionMessage.fold(complete(StatusCodes.NotFound)){localHornetQMessage => + // add message in the map with an unique UUID + val msgID = UUID.randomUUID() + scheduleCleanupMessageMap(msgID, localHornetQMessage) + complete(MessageContainer(msgID.toString, localHornetQMessage.contents).toJson) } } case Failure(x) => { internalServerErrorOccured(x, "receiveMessage") } } } } } } + private def scheduleCleanupMessageMap(msgID: UUID, localHornetQMessage: Message) = { + import java.util.concurrent.Executors + import java.util.concurrent.ScheduledExecutorService + import java.util.concurrent.TimeUnit + + idToMessages.update(msgID, (localHornetQMessage, System.currentTimeMillis())) + // a sentinel that monitors the hashmap of idToMessages, any message that has been outstanding for more than 3X or 10X + // time-to-live need to get cleaned out of this map + val messageTimeOutInMillis: Long = ConfigSource.config.get("shrine.messagequeue.hornetQWebApi.messageTimeOutSeconds", Duration(_)).toMillis + val sentinelRunner: MapSentinelRunner = MapSentinelRunner(messageTimeOutInMillis) + try { + Log.debug(s"Starting the sentinel scheduler that cleans outstanding messages exceeds 3 times $messageTimeOutInMillis") + val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1) + scheduler.schedule(sentinelRunner, messageTimeOutInMillis * 3, TimeUnit.MILLISECONDS) + } catch { + case NonFatal(x) => ExceptionWhileSchedulingSentinelProblem(messageTimeOutInMillis, x) + //pass-through to blow up the thread, receive no more results, do something dramatic in UncaughtExceptionHandler. + case x => Log.error("Fatal exception while scheduling a sentinel for cleaning up outstanding messages", x) + throw x + } + } + // SQS has DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) def acknowledge: Route = path("acknowledge") { - entity(as[String]) { messageJSON => - implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer + entity(as[String]) { messageUUID => detach() { - val msg: Message = read[Message](messageJSON)(formats, manifest[Message]) - val acknowledgeTry: Try[Unit] = LocalHornetQMom.completeMessage(msg) - acknowledgeTry match { - case Success(v) => { - complete(StatusCodes.ResetContent) + val id: UUID = UUID.fromString(messageUUID) + // retrieve the localMessage from the concurrent hashmap + val getMessageTry: Try[Option[(Message, Long)]] = Try { + idToMessages.remove(id) + }.transform({ messageAndTime => + Success(messageAndTime) + }, { throwable => + Failure(MessageDoesNotExistException(id)) + }) + + getMessageTry match { + case Success(messageAndTimeOption) => { + messageAndTimeOption.fold({ + respondWithStatus(StatusCodes.NotFound) { + val noMessageProblem = MessageDoesNotExistInMapProblem(id) + complete(noMessageProblem.description) + } + }) { messageAndTime => + messageAndTime._1.complete() + complete(StatusCodes.ResetContent) + } } case Failure(x) => { - internalServerErrorOccured(x, "acknowledge") + x match { + case m: MessageDoesNotExistException => { + respondWithStatus(StatusCodes.NotFound) { + complete(m.getMessage) + } + } + case _ => internalServerErrorOccured(x, "acknowledge") + } } } } } } // Returns the names of the queues created on this server. Seq[Any] def getQueues: Route = path("getQueues") { get { detach() { - implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer + implicit val formats = Serialization.formats(NoTypeHints) respondWithStatus(StatusCodes.OK) { val getQueuesTry: Try[Seq[Queue]] = LocalHornetQMom.queues getQueuesTry match { case Success(seqQueue) => { complete(write[Seq[Queue]](LocalHornetQMom.queues.get)(formats)) } case Failure(x) => { internalServerErrorOccured(x, "getQueues") } } } } } } def internalServerErrorOccured(x: Throwable, function: String): Route = { respondWithStatus(StatusCodes.InternalServerError) { val serverErrorProblem: HornetQMomServerErrorProblem = HornetQMomServerErrorProblem(x, function) debug(s"HornetQ encountered a Problem during $function, Problem Details: $serverErrorProblem") complete(s"HornetQ throws an exception while trying to $function. HornetQ Server response: ${x.getMessage}" + s"Exception is from ${x.getClass}") } } } + +case class MessageContainer(id: String, contents: String) { + def toJson: String = { + Serialization.write(this)(MessageContainer.messageFormats) + } +} + +object MessageContainer { + val messageFormats = Serialization.formats(ShortTypeHints(List(classOf[MessageContainer]))) + + def fromJson(jsonString: String): MessageContainer = { + implicit val formats = messageFormats + Serialization.read[MessageContainer](jsonString) + } +} + + case class HornetQMomServerErrorProblem(x:Throwable, function:String) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to a server error occurred in hornetQ." override val description: String = s"HornetQ throws an exception while trying to $function," + s" the server's response is: ${x.getMessage} from ${x.getClass}." } case class CannotUseHornetQMomWebApiProblem(x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to configuration in shrine.conf." override val description: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" } + +case class MessageDoesNotExistException(id: UUID) extends Exception(s"Cannot match given ${id.toString} to any Message in HornetQ server! Message does not exist!") + +case class MessageDoesNotExistInMapProblem(id: UUID) extends AbstractProblem(ProblemSources.Hub) { + + override def summary: String = s"The client expected message $id, but the server did not find it and could not complete() the message." + + override def description: String = s"The client expected message $id, but the server did not find it and could not complete() the message." + + s" Message either has never been received or already been completed!" +} + +case class ExceptionWhileCleaningUpMessageProblem(timeOutInMillis: Long, x:Throwable) extends AbstractProblem(ProblemSources.Hub) { + + override val throwable = Some(x) + + override def summary: String = s"The Hub encountered an exception while trying to " + + s"cleanup messages that has been outstanding for more than $timeOutInMillis milliseconds" + + override def description: String = s"The Hub encountered an exception while trying to " + + s"cleanup messages that has been received for more than $timeOutInMillis milliseconds " + + s"on Thread ${Thread.currentThread().getName}: ${x.getMessage}" +} + +case class ExceptionWhileSchedulingSentinelProblem(timeOutInMillis: Long, x:Throwable) extends AbstractProblem(ProblemSources.Hub) { + override val throwable = Some(x) + + override def summary: String = s"The Hub encountered an exception while trying to " + + s"schedule a sentinel that cleans up outstanding messages exceed $timeOutInMillis milliseconds" + + override def description: String = s"The Hub encountered an exception while trying to " + + s"schedule a sentinel that cleans up outstanding messages exceed $timeOutInMillis milliseconds " + + s"on Thread ${Thread.currentThread().getName}: ${x.getMessage}" +} \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala index 6e9a2160e..0fcbfab50 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/LocalHornetQMom.scala @@ -1,185 +1,197 @@ package net.shrine.hornetqmom import com.typesafe.config.Config import net.shrine.log.Log import net.shrine.messagequeueservice.{Message, MessageQueueService, NoSuchQueueExistsInHornetQ, Queue} import net.shrine.source.ConfigSource import org.hornetq.api.core.{HornetQQueueExistsException, TransportConfiguration} import org.hornetq.api.core.client.{ClientConsumer, ClientMessage, ClientProducer, ClientSession, ClientSessionFactory, HornetQClient, ServerLocator} import org.hornetq.api.core.management.HornetQServerControl import org.hornetq.core.config.impl.ConfigurationImpl import org.hornetq.core.remoting.impl.invm.{InVMAcceptorFactory, InVMConnectorFactory} import org.hornetq.core.server.{HornetQServer, HornetQServers} import scala.collection.concurrent.{TrieMap, Map => ConcurrentMap} import scala.collection.immutable.Seq import scala.concurrent.blocking import scala.concurrent.duration.Duration import scala.util.Try /** * This object is the local version of the Message-Oriented Middleware API, which uses HornetQ service * * @author david * @since 7/18/17 */ object LocalHornetQMom extends MessageQueueService { val config: Config = ConfigSource.config.getConfig("shrine.messagequeue.hornetq") // todo use the config to set everything needed here that isn't hard-coded. val hornetQConfiguration = new ConfigurationImpl() // todo from config? What is the journal file about? If temporary, use a Java temp file. hornetQConfiguration.setJournalDirectory("target/data/journal") // todo want this. There are likely many other config bits hornetQConfiguration.setPersistenceEnabled(false) // todo maybe want this hornetQConfiguration.setSecurityEnabled(false) // todo probably just want the InVM version, but need to read up on options hornetQConfiguration.getAcceptorConfigurations.add(new TransportConfiguration(classOf[InVMAcceptorFactory].getName)) // Create and start the server val hornetQServer: HornetQServer = HornetQServers.newHornetQServer(hornetQConfiguration) hornetQServer.start() val serverLocator: ServerLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(classOf[InVMConnectorFactory].getName)) val sessionFactory: ClientSessionFactory = serverLocator.createSessionFactory() //arguments are boolean xa, boolean autoCommitSends, boolean autoCommitAcks . val session: ClientSession = sessionFactory.createSession(false, true, true) session.start() //keep a map of live queues to ClientConsumers to provide a path for completing messages val queuesToConsumers: ConcurrentMap[Queue, ClientConsumer] = TrieMap.empty - val propName = "contents" - /** * Use HornetQMomStopper to stop the hornetQServer without unintentially starting it */ // todo drop this into a try private[hornetqmom] def stop() = { queuesToConsumers.values.foreach(_.close()) session.close() sessionFactory.close() hornetQServer.stop() } //queue lifecycle def createQueueIfAbsent(queueName: String): Try[Queue] = { val unit = () val proposedQueue: Queue = Queue(queueName) for { serverControl: HornetQServerControl <- Try{ hornetQServer.getHornetQServerControl } queuesSoFar <- queues queueToUse <- Try { queuesSoFar.find(_.name == proposedQueue.name).fold{ try { serverControl.createQueue(proposedQueue.name, proposedQueue.name, true) } catch { case hqqex:HornetQQueueExistsException => Log.debug(s"Caught and ignored a HornetQQueueExistsException in createQueueIfAbsent.",hqqex) } proposedQueue }{ queue => queue} } consumer <- Try { queuesToConsumers.getOrElseUpdate(proposedQueue, { session.createConsumer(proposedQueue.name) }) } } yield queueToUse } def deleteQueue(queueName: String): Try[Unit] = { val proposedQueue: Queue = Queue(queueName) for { deleteTry <- Try { queuesToConsumers.remove(proposedQueue).foreach(_.close()) val serverControl: HornetQServerControl = hornetQServer.getHornetQServerControl serverControl.destroyQueue(proposedQueue.name) } } yield deleteTry } override def queues: Try[Seq[Queue]] = { for { hornetQTry: HornetQServerControl <- Try { hornetQServer.getHornetQServerControl } getQueuesTry: Seq[Queue] <- Try { val queueNames: Array[String] = hornetQTry.getQueueNames queueNames.map(Queue(_)).to[Seq] } } yield getQueuesTry } //send a message def send(contents: String, to: Queue): Try[Unit] = { for { sendTry <- Try { // check if the queue exists first if (!this.queues.get.map(_.name).contains(to.name)) { throw NoSuchQueueExistsInHornetQ(to) } } producer: ClientProducer <- Try{ session.createProducer(to.name) } - message <- Try{ session.createMessage(true).putStringProperty(propName, contents) } + message <- Try{ + session.createMessage(true).putStringProperty(Message.contentsKey, contents) + } sendMessage <- Try { producer.send(message) Log.debug(s"Message $message sent to $to in HornetQ") producer.close() } } yield sendMessage } //receive a message /** * Always do AWS SQS-style long polling. * Be sure your code can handle receiving the same message twice. * * @return Some message before the timeout, or None */ def receive(from: Queue, timeout: Duration): Try[Option[Message]] = { for { //todo handle the case where either stop or close has been called on something gracefully messageConsumer: ClientConsumer <- Try { if (!queuesToConsumers.contains(from)) { throw new NoSuchElementException(s"Given Queue ${from.name} does not exist in HornetQ server! Please create the queue first!") } queuesToConsumers(from) } - message: Option[Message] <- Try { + + message: Option[LocalHornetQMessage] <- Try { blocking { val messageReceived: Option[ClientMessage] = Option(messageConsumer.receive(timeout.toMillis)) - messageReceived.foreach(m => Log.debug(s"Received ${m} from $from in HornetQ")) - messageReceived.map(Message(_)) + messageReceived.foreach(m => Log.debug(s"Received $m from $from in HornetQ")) + messageReceived.map(clientMsg => LocalHornetQMessage(clientMsg)) } } } yield message } - //todo dead letter queue for all messages. See http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs-dead-letter-queues.html - - //complete a message - override def completeMessage(message: Message): Try[Unit] = { + def getQueueConsumer(queue: Queue): Try[ClientConsumer] = { for { - completeMessageTry <- Try { message.complete() } - } yield completeMessageTry + messageConsumer: ClientConsumer <- Try { + if (!queuesToConsumers.contains(queue)) { + throw new NoSuchElementException(s"Given Queue ${queue.name} does not exist in HornetQ server! Please create the queue first!") + } + queuesToConsumers(queue) + } + } yield messageConsumer } + //todo dead letter queue for all messages SHRINE-2261 + + case class LocalHornetQMessage private(clientMessage: ClientMessage) extends Message { + + override def contents: String = clientMessage.getStringProperty(Message.contentsKey) + + //complete a message + override def complete(): Try[Unit] = Try { clientMessage.acknowledge() } + } } /** * If the configuration is such that HornetQ should have been started use this object to stop it */ object LocalHornetQMomStopper { def stop(): Unit = { if(ConfigSource.config.getBoolean("shrine.messagequeue.hornetQWebApi.enabled")) { LocalHornetQMom.stop() } } } \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiConfigTest.scala b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiConfigTest.scala new file mode 100644 index 000000000..f9e930750 --- /dev/null +++ b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiConfigTest.scala @@ -0,0 +1,31 @@ +package net.shrine.hornetqmom + +import akka.actor.ActorRefFactory +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.junit.JUnitRunner +import spray.http.StatusCodes.NotFound +import spray.testkit.ScalatestRouteTest + +/** + * Test to make sure hornetQMomWebApi is only available when correctly configured + * Created by yifan on 9/8/17. + */ + + +@RunWith(classOf[JUnitRunner]) +class HornetQMomWebApiConfigTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi { + override def actorRefFactory: ActorRefFactory = system + + private val queueName = "testQueue" + + "HornetQMomWebApi" should "block user from using the API and return a 404 response" in { + + Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { + val response = new String(body.data.toByteArray) + + assertResult(warningMessage)(response) + assertResult(NotFound)(status) + } + } +} \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala index e38cc2fe4..87de86273 100644 --- a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala +++ b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala @@ -1,186 +1,114 @@ package net.shrine.hornetqmom - +import java.util.UUID import akka.actor.ActorRefFactory -import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue, QueueSerializer} +import net.shrine.messagequeueservice.Queue import net.shrine.source.ConfigSource import org.json4s.NoTypeHints import org.json4s.native.Serialization import org.json4s.native.Serialization.read import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import spray.http.HttpEntity import spray.http.StatusCodes._ import spray.testkit.ScalatestRouteTest import scala.collection.immutable.Seq /** + * Test basic functions of HornetQMomWebApi * Created by yifan on 7/27/17. */ @RunWith(classOf[JUnitRunner]) class HornetQMomWebApiTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi { override def actorRefFactory: ActorRefFactory = system private val proposedQueueName = "test Queue" private val queue: Queue = Queue(proposedQueueName) private val queueName: String = queue.name // "testQueue" private val messageContent = "test Content" "HornetQMomWebApi" should "create/delete the given queue, send/receive message, get queues" in { - ConfigSource.atomicConfig.configForBlock("shrine.messagequeue.hornetQWebApi.enabled","true","HornetQMomWebApiTest") { + ConfigSource.atomicConfig.configForBlock("shrine.messagequeue.hornetQWebApi.enabled", "true", "HornetQMomWebApiTest") { Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer - println(response) - val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) - val responseQueueName = jsonToQueue.name - assertResult(Created)(status) - assertResult(queueName)(responseQueueName) - } + implicit val formats = Serialization.formats(NoTypeHints) + val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) + val responseQueueName = jsonToQueue.name + assertResult(Created)(status) + assertResult(queueName)(responseQueueName) } // should be OK to create a queue twice Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer - val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) - val responseQueueName = jsonToQueue.name - assertResult(Created)(status) - assertResult(queueName)(responseQueueName) - } + implicit val formats = Serialization.formats(NoTypeHints) + val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) + val responseQueueName = jsonToQueue.name + assertResult(Created)(status) + assertResult(queueName)(responseQueueName) } Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - assertResult(Accepted)(status) - } + assertResult(Accepted)(status) } Get(s"/mom/getQueues") ~> momRoute ~> check { val response: String = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer - val jsonToSeq: Seq[Queue] = read[Seq[Queue]](response, false)(formats, manifest[Seq[Queue]]) - - assertResult(OK)(status) - assertResult(queueName)(jsonToSeq.head.name) - } - } + implicit val formats = Serialization.formats(NoTypeHints) + val jsonToSeq: Seq[Queue] = read[Seq[Queue]](response)(formats, manifest[Seq[Queue]]) - var receivedMessage: String = "" + assertResult(OK)(status) + assertResult(queueName)(jsonToSeq.head.name) + } + var messageUUID: String = "" // given timeout is 2 seconds Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=2") ~> momRoute ~> check { val response = new String(body.data.toByteArray) - receivedMessage = response - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer - val responseToMessage: Message = read[Message](response)(formats, manifest[Message]) - - assertResult(OK)(status) - assert(responseToMessage.isInstanceOf[Message]) - assertResult(messageContent)(responseToMessage.contents) - } + assertResult(OK)(status) + val responseMsg = MessageContainer.fromJson(response) + messageUUID = responseMsg.id + assertResult(responseMsg.contents)(messageContent) + } + + Put("/mom/acknowledge", HttpEntity(s"$messageUUID")) ~> momRoute ~> check { + assertResult(ResetContent)(status) } - Put("/mom/acknowledge", HttpEntity(s"$receivedMessage")) ~> momRoute ~> check { + val nonExistingUUID = UUID.randomUUID() + Put("/mom/acknowledge", HttpEntity(s"$nonExistingUUID")) ~> momRoute ~> check { val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer - assertResult(ResetContent)(status) - } + assertResult(NotFound)(status) + assertResult(MessageDoesNotExistInMapProblem(nonExistingUUID).description)(response) } Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - assertResult(OK)(status) - } + assertResult(OK)(status) } } } - "HornetQMomWebApi" should "respond Internal server error with the corresponding error message when " + - "failures occur while creating/deleting the given queue, sending/receiving message, getting queues" in { + "HornetQMomWebApi" should "respond InternalServerError with the corresponding error message when " + + "failures occur while creating/deleting the given queue, sending/receiving message, getting queues" in { + + ConfigSource.atomicConfig.configForBlock("shrine.messagequeue.hornetQWebApi.enabled", "true", "HornetQMomWebApiTest") { Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - assertResult(InternalServerError)(status) - } + assertResult(InternalServerError)(status) } Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - assertResult(InternalServerError)(status) - } + assertResult(InternalServerError)(status) } // given timeout is 1 seconds Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=1") ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - if (!enabled) { - assertResult(NotFound)(status) - assertResult(warningMessage)(response) - } else { - assertResult(InternalServerError)(status) - } + assertResult(InternalServerError)(status) } } - -} - -@RunWith(classOf[JUnitRunner]) -class HornetQMomWebApiConfigTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi { - override def actorRefFactory: ActorRefFactory = system - - private val queueName = "testQueue" - - - override val enabled: Boolean = ConfigSource.config.getString("shrine.messagequeue.hornetQWebApiTest.enabled").toBoolean - - "HornetQMomWebApi" should "block user from using the API and return a 404 response" in { - - Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { - val response = new String(body.data.toByteArray) - - assertResult(warningMessage)(response) - assertResult(NotFound)(status) - } } } \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/LocalHornetQMomTest.scala b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/LocalHornetQMomTest.scala index 5834fdfe6..bdfeeb62b 100644 --- a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/LocalHornetQMomTest.scala +++ b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/LocalHornetQMomTest.scala @@ -1,156 +1,158 @@ package net.shrine.hornetqmom import net.shrine.messagequeueservice.{Message, Queue} import org.junit.runner.RunWith import org.scalatest.concurrent.ScalaFutures import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import scala.collection.immutable.Seq import scala.concurrent.duration._ import scala.language.postfixOps +import scala.util.Try /** * Test create, delete queue, send, and receive message, getQueueNames, and acknoledge using HornetQ service */ @RunWith(classOf[JUnitRunner]) class LocalHornetQMomTest extends FlatSpec with BeforeAndAfterAll with ScalaFutures with Matchers { "HornetQ" should "be able to send and receive just one message" in { val queueName = "testQueue" assert(LocalHornetQMom.queues.get.isEmpty) val queue = LocalHornetQMom.createQueueIfAbsent(queueName).get assert(LocalHornetQMom.queues.get == Seq(queue)) val testContents = "Test message" val sendTry = LocalHornetQMom.send(testContents, queue) assert(sendTry.isSuccess) val message: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(message.isDefined) assert(message.get.contents == testContents) - LocalHornetQMom.completeMessage(message.get) + val completeTry: Try[Unit] = message.get.complete() + assert(completeTry.isSuccess) val shouldBeNoMessage: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(shouldBeNoMessage.isEmpty) val deleteTry = LocalHornetQMom.deleteQueue(queueName) assert(deleteTry.isSuccess) assert(LocalHornetQMom.queues.get.isEmpty) } "HornetQ" should "be able to send and receive a few messages" in { val queueName = "testQueue" assert(LocalHornetQMom.queues.get.isEmpty) val queue = LocalHornetQMom.createQueueIfAbsent(queueName).get assert(LocalHornetQMom.queues.get == Seq(queue)) val testContents1 = "Test message1" val sendTry = LocalHornetQMom.send(testContents1,queue) assert(sendTry.isSuccess) val message1: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(message1.isDefined) assert(message1.get.contents == testContents1) - val completeTry = LocalHornetQMom.completeMessage(message1.get) + val completeTry = message1.get.complete() assert(completeTry.isSuccess) val shouldBeNoMessage1: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(shouldBeNoMessage1.isEmpty) val testContents2 = "Test message2" val sendTry2 = LocalHornetQMom.send(testContents2,queue) assert(sendTry2.isSuccess) val testContents3 = "Test message3" val sendTry3 = LocalHornetQMom.send(testContents3,queue) assert(sendTry3.isSuccess) val message2: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(message2.isDefined) assert(message2.get.contents == testContents2) - val completeTry2 = LocalHornetQMom.completeMessage(message2.get) + val completeTry2 = message2.get.complete() assert(completeTry2.isSuccess) val message3: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(message3.isDefined) assert(message3.get.contents == testContents3) - val completeTry3 = LocalHornetQMom.completeMessage(message3.get) + val completeTry3 = message3.get.complete() assert(completeTry3.isSuccess) val shouldBeNoMessage4: Option[Message] = LocalHornetQMom.receive(queue,1 second).get assert(shouldBeNoMessage4.isEmpty) val deleteTry = LocalHornetQMom.deleteQueue(queueName) assert(deleteTry.isSuccess) assert(LocalHornetQMom.queues.get.isEmpty) } "HornetQ" should "be OK if asked to create the same queue twice " in { val queueName = "testQueue" val queue = LocalHornetQMom.createQueueIfAbsent(queueName) assert(queue.isSuccess) val sameQueue = LocalHornetQMom.createQueueIfAbsent(queueName) assert(sameQueue.isSuccess) assert(LocalHornetQMom.queues.get == Seq(sameQueue.get)) val deleteTry = LocalHornetQMom.deleteQueue(queueName) assert(deleteTry.isSuccess) assert(LocalHornetQMom.queues.get.isEmpty) } "HornetQ" should "return a failure if deleting a non-existing queue" in { val queueName = "testQueue" val deleteQueue = LocalHornetQMom.deleteQueue(queueName) assert(deleteQueue.isFailure) } "HornetQ" should "return a failure if sending message to a non-existing queue" in { val queueName = "non-existingQueue" val sendTry = LocalHornetQMom.send("testContent", Queue(queueName)) assert(sendTry.isFailure) } "HornetQ" should "return a failure if receiving a message to a non-existing queue" in { val queueName = "non-existingQueue" val receiveTry = LocalHornetQMom.receive(Queue(queueName), Duration(1, "second")) assert(receiveTry.isFailure) } "HornetQ" should "be able to filter the special characters in queue name" in { val queueName = "test# Qu%eueFilter" assert(LocalHornetQMom.queues.get.isEmpty) val queue = LocalHornetQMom.createQueueIfAbsent(queueName).get assert(LocalHornetQMom.queues.get == Seq(queue)) LocalHornetQMom.deleteQueue(queue.name) } override def afterAll() = LocalHornetQMomStopper.stop() } \ No newline at end of file diff --git a/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/Message.scala b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/Message.scala new file mode 100644 index 000000000..fddaf3cce --- /dev/null +++ b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/Message.scala @@ -0,0 +1,17 @@ +package net.shrine.messagequeueservice + +import scala.util.Try + +/** + * A Message Trait that is implemented by LocalHornetQMessage and HornetQClientMessage + * Created by yifan on 9/8/17. + */ + +trait Message { + def complete(): Try[Unit] + def contents: String +} + +object Message { + val contentsKey = "contents" +} \ No newline at end of file diff --git a/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala index 325ae3bef..52a5722bc 100644 --- a/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala +++ b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala @@ -1,123 +1,54 @@ package net.shrine.messagequeueservice import net.shrine.source.ConfigSource import net.shrine.spray.DefaultJsonSupport -import org.hornetq.api.core.client.ClientMessage -import org.hornetq.core.client.impl.ClientMessageImpl -import org.json4s.JsonAST.{JField, JObject} -import org.json4s.{CustomSerializer, DefaultFormats, Formats, _} import scala.collection.immutable.Seq import scala.concurrent.duration.Duration import scala.util.Try /** * This object mostly imitates AWS SQS' API via an embedded HornetQ. See http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs.html * * @author david * @since 7/18/17 */ //todo in 1.23 all but the server side will use the client RemoteHornetQ implementation (which will call to the server at the hub) //todo in 1.24, create an AwsSqs implementation of the trait trait MessageQueueService { def createQueueIfAbsent(queueName:String): Try[Queue] def deleteQueue(queueName:String): Try[Unit] def queues: Try[Seq[Queue]] def send(contents:String,to:Queue): Try[Unit] def receive(from:Queue,timeout:Duration): Try[Option[Message]] - def completeMessage(message:Message): Try[Unit] - } object MessageQueueService { lazy val service:MessageQueueService = { import scala.reflect.runtime.universe.runtimeMirror val momClassName = ConfigSource.config.getString("shrine.messagequeue.implementation") val classLoaderMirror = runtimeMirror(getClass.getClassLoader) val module = classLoaderMirror.staticModule(momClassName) classLoaderMirror.reflectModule(module).instance.asInstanceOf[MessageQueueService] } } -case class Message(hornetQMessage:ClientMessage) extends DefaultJsonSupport { - override implicit def json4sFormats: Formats = DefaultFormats - - def getClientMessage = hornetQMessage - - def contents = hornetQMessage.getStringProperty(Message.contentsKey) - - def getMessageID = hornetQMessage.getMessageID - - def complete() = hornetQMessage.acknowledge() -} - -object Message { - val contentsKey = "contents" -} case class Queue(var name:String) extends DefaultJsonSupport { // filter all (Unicode) characters that are not letters // filter neither letters nor (decimal) digits, replaceAll("[^\\p{L}]+", "") name = name.filterNot(c => c.isWhitespace).replaceAll("[^\\p{L}\\p{Nd}]+", "") if (name.length == 0) { throw new IllegalArgumentException("ERROR: A valid Queue name must contain at least one letter!") } } -class QueueSerializer extends CustomSerializer[Queue](format => ( - { - case JObject(JField("name", JString(s)) :: Nil) => Queue(s) - }, - { - case queue: Queue => - JObject(JField("name", JString(queue.name)) :: Nil) - } -)) - -//todo make this an object SHRINE-2218 -//todo move to HornetQMomWebApi SHRINE-2218 -class MessageSerializer extends CustomSerializer[Message](format => ( - { - - //JObject(List((hornetQMessage,JObject(List((type,JInt(0)), (durable,JBool(true)), (expiration,JInt(0)), (timestamp,JInt(1504275142323)), (priority,JInt(4)), (contents,JString(test Content))))))) - case JObject( - JField("hornetQMessage", JObject( - JField("type", JInt(s)) :: - JField("durable", JBool(d)) :: - JField("expiration", JInt(e)) :: - JField("timestamp", JInt(t)) :: - JField("priority", JInt(p)) :: - JField(Message.contentsKey, JString(contents)) :: - Nil) - ) :: Nil ) => { - val hornetQClientMessage = new ClientMessageImpl(s.toByte, d, e.toLong, t.toLong, p.toByte, 0) - hornetQClientMessage.putStringProperty(Message.contentsKey,contents) - - Message(hornetQClientMessage) - } - }, - { - case msg: Message => - JObject( - JField("hornetQMessage", - JObject( - JField("type", JLong(msg.getClientMessage.getType)) :: - JField("durable", JBool(msg.getClientMessage.isDurable)) :: - JField("expiration", JLong(msg.getClientMessage.getExpiration)) :: - JField("timestamp", JLong(msg.getClientMessage.getTimestamp)) :: - JField("priority", JLong(msg.getClientMessage.getPriority)) :: - JField(Message.contentsKey, JString(msg.contents)) :: - Nil)) :: Nil) - } -)) - - case class NoSuchQueueExistsInHornetQ(proposedQueue: Queue) extends Exception { override def getMessage: String = { s"Given Queue ${proposedQueue.name} does not exist in HornetQ server! Please create the queue first!" } } diff --git a/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/protocol/Envelope.scala b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/protocol/Envelope.scala index 7b708efaa..8e93c217e 100644 --- a/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/protocol/Envelope.scala +++ b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/protocol/Envelope.scala @@ -1,46 +1,45 @@ package net.shrine.messagequeueservice.protocol import net.shrine.util.Versions import org.json4s.ShortTypeHints import org.json4s.native.Serialization import scala.util.{Failure, Success, Try} -import org.json4s.native.Serialization /** * A json-friendly container for unpacking different messages of known types based on metadata. * * @author david * @since 9/7/17 */ //if you need to add fields to this case class they must be Options with default values of None to support serializing to and from JSON case class Envelope( contentsType:String, contents:String, shrineVersion:String = Versions.version) { def decode[T](decoder:String => T):Try[T] = Try{ decoder(contents) } def toJson:String = { Serialization.write(this)(Envelope.envelopeFormats) } def checkVersionExactMatch: Try[Envelope] = { if(shrineVersion == Versions.version) Success(this) else Failure(VersionMismatchException(shrineVersion)) } } case class VersionMismatchException(badVersion:String) extends Exception(s"Cannot use an Envelope with version $badVersion in ${Versions.version}") object Envelope { val envelopeFormats = Serialization.formats(ShortTypeHints(List(classOf[Envelope]))) def fromJson(jsonString:String):Try[Envelope] = Try{ implicit val formats = envelopeFormats Serialization.read[Envelope](jsonString) } } \ No newline at end of file diff --git a/shrine-setup/src/main/resources/shrine.conf b/shrine-setup/src/main/resources/shrine.conf index 2dc95a555..3e2237709 100644 --- a/shrine-setup/src/main/resources/shrine.conf +++ b/shrine-setup/src/main/resources/shrine.conf @@ -1,361 +1,367 @@ shrine { metaData { ping = "pong" } pmEndpoint { // url = "http://shrine-dev1.catalyst/i2b2/services/PMService/getServices" //use your i2b2 pm url } ontEndpoint { // url = "http://shrine-dev1.catalyst/i2b2/rest/OntologyService/" //use your i2b2 ontology url } hiveCredentials { //use your i2b2 hive credentials // domain = "i2b2demo" // username = "demo" // password = "examplePassword" // crcProjectId = "Demo" // ontProjectId = "SHRINE" } messagequeue { hornetq { serverUrl = "https://localhost:6443/shrine-metadata/mom" } // If you intend for your node to serve as this SHRINE network's messaging hub, // then set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf. // You do not want to do this unless you are the hub admin hornetQWebApi { - // enabled = false + // enabled = false + // messageTimeOutSeconds = 10 seconds + // webClientTimeOutSecond = 10 seconds + } + httpClient { + defaultTimeOut = 10 seconds + timeOutWaitGap = 1 second } messagequeue { // receiveWaitTime = "15 seconds" } } breakdownResultOutputTypes { //use breakdown values appropriate for your shrine network // PATIENT_AGE_COUNT_XML { // description = "Age patient breakdown" // } // PATIENT_RACE_COUNT_XML { // description = "Race patient breakdown" // } // PATIENT_VITALSTATUS_COUNT_XML { // description = "Vital Status patient breakdown" // } // PATIENT_GENDER_COUNT_XML { // description = "Gender patient breakdown" // } } queryEntryPoint { // create = true //false for no qep // audit { // collectQepAudit = true //false to not use the 1.20 audit db tables // database { // dataSourceFrom = "JNDI" //Can be JNDI or testDataSource . Use testDataSource for tests, JNDI everywhere else // jndiDataSourceName = "java:comp/env/jdbc/qepAuditDB" //or leave out for tests // slickProfileClassName = "slick.driver.MySQLDriver$" // Can be // slick.driver.H2Driver$ // slick.driver.MySQLDriver$ // slick.driver.PostgresDriver$ // slick.driver.SQLServerDriver$ // slick.driver.JdbcDriver$ // freeslick.OracleProfile$ // freeslick.MSSQLServerProfile$ // // (Yes, with the $ on the end) // For testing without JNDI // testDataSource { //typical test settings for unit tests //driverClassName = "org.h2.Driver" //url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1" //H2 embedded in-memory for unit tests //url = "jdbc:h2:~/stewardTest.h2" //H2 embedded on disk at ~/test // } // timeout = 30 //time to wait before db gives up, in seconds. // createTablesOnStart = false //for testing with H2 in memory, when not running unit tests. Set to false normally // } // } // trustModelIsHub = true // true by default, false for P2P networks. // authenticationType = "pm" //can be none, pm, or ecommons // authorizationType = "shrine-steward" //can be none, shrine-steward, or hms-steward //hms-steward config // sheriffEndpoint { // url = "http://localhost:8080/shrine-hms-authorization/queryAuthorization" // timeout { // seconds = 1 // } // } // sheriffCredentials { // username = "sheriffUsername" // password = "sheriffPassword" // } //shrine-steward config // shrineSteward { // qepUserName = "qep" // qepPassword = "trustme" // stewardBaseUrl = "https://localhost:6443" // } // includeAggregateResults = false // // maxQueryWaitTime { // minutes = 5 //must be longer than the hub's maxQueryWaitTime // } // broadcasterServiceEndpoint { // url = "http://example.com/shrine/rest/broadcaster/broadcast" //url for the hub // acceptAllCerts = true // timeout { // seconds = 1 // } // } } hub { // create = false //change to true to start a hub maxQueryWaitTime { // minutes = 4.5 //Needs to be longer than the adapter's maxQueryWaitTime, but shorter than the qep's } // downstreamNodes { //Add your downstream nodes here // shrine-dev2 = "https://shrine-dev2.catalyst:6443/shrine/rest/adapter/requests" // } shouldQuerySelf = false //true if there is an adapter at the hub , or just add a loopback address to downstreamNodes } adapter { // create = true by default. False to not create an adapter. // audit { // collectAdapterAudit = true by default. False to not fill in the audit database // database { // dataSourceFrom = "JNDI" //Can be JNDI or testDataSource . Use testDataSource for tests, JNDI everywhere else // jndiDataSourceName = "java:comp/env/jdbc/adapterAuditDB" //or leave out for tests // slickProfileClassName = "slick.driver.MySQLDriver$" // Can be // slick.driver.H2Driver$ // slick.driver.MySQLDriver$ // slick.driver.PostgresDriver$ // slick.driver.SQLServerDriver$ // slick.driver.JdbcDriver$ // freeslick.OracleProfile$ // freeslick.MSSQLServerProfile$ // // (Yes, with the $ on the end) // For testing without JNDI // testDataSource { //typical test settings for unit tests //driverClassName = "org.h2.Driver" //url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1" //H2 embedded in-memory for unit tests //url = "jdbc:h2:~/stewardTest.h2" //H2 embedded on disk at ~/test // } // createTablesOnStart = false //for testing with H2 in memory, when not running unit tests. Set to false normally // } // obfuscation { // binSize = 5 by default //Round to the nearest binSize. Use 1 for no effect (to match SHRINE 1.21 and earlier). // sigma = 6.5 by default //Noise to inject. Use 0 for no effect. (Use 1.33 to match SHRINE 1.21 and earlier). // clamp = 10 by default //Maximum ammount of noise to inject. (Use 3 to match SHRINE 1.21 and earlier). // } // adapterLockoutAttemptsThreshold = 0 by default // Number of allowed queries with the same actual result that can exist before a researcher is locked out of the adapter. In 1.24 the lockout code and this config value will be removed // botDefense { // countsAndMilliseconds = [ //to turn off, use an empty json list // {count = 10, milliseconds = 60000}, //allow up to 10 queries in one minute by default // {count = 200, milliseconds = 36000000} //allow up to 4 queries in 10 hours by default // ] // } crcEndpoint { //must be filled in url = "http://shrine-dev1.catalyst/i2b2/services/QueryToolService/" } setSizeObfuscation = true //must be set. false turns off obfuscation adapterMappingsFileName = "AdapterMappings.xml" maxSignatureAge { minutes = 5 //must be longer than the hub's maxQueryWaitTime } immediatelyRunIncomingQueries = true //false to queue them //delayResponse = "0 seconds" //time to delay before responding to a query. Should be 0 except for testing in shrine-qa } networkStatusQuery = "\\\\SHRINE\\SHRINE\\Demographics\\Gender\\Male\\" humanReadableNodeName = "shrine-dev1" shrineDatabaseType = "mysql" keystore { file = "/opt/shrine/shrine.keystore" password = "changeit" privateKeyAlias = "shrine-dev1.catalyst" keyStoreType = "JKS" caCertAliases = [ "shrine-dev-ca" ] } problem { // problemHandler = "net.shrine.problem.LogAndDatabaseProblemHandler$" Can be other specialized problemHandler implementations // database { // dataSourceFrom = "JNDI" //Can be JNDI or testDataSource . Use testDataSource for tests, JNDI everywhere else // jndiDataSourceName = "java:comp/env/jdbc/problemDB" // slickProfileClassName = "slick.driver.MySQLDriver$" // Can be // slick.driver.H2Driver$ // slick.driver.MySQLDriver$ // slick.driver.PostgresDriver$ // slick.driver.SQLServerDriver$ // slick.driver.JdbcDriver$ // freeslick.OracleProfile$ // freeslick.MSSQLServerProfile$ // // (Yes, with the $ on the end) // For testing without JNDI // testDataSource { //typical test settings for unit tests //driverClassName = "org.h2.Driver" //url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1" //H2 embedded in-memory for unit tests //url = "jdbc:h2:~/stewardTest.h2" //H2 embedded on disk at ~/test // } // createTablesOnStart = false //for testing with H2 in memory, when not running unit tests. Set to false normally // } } dashboard { // gruntWatch = false //false for production, true for mvn tomcat7:run . Allows the client javascript and html files to be loaded via gruntWatch . // happyBaseUrl = "https://localhost:6443/shrine/rest/happy" If the shine servlet is running on a different machime from the dashboard, change this URL to match // statusBaseUrl = "https://localhost:6443/shrine/rest/internalstatus" If the shine servlet is running on a different machime from the dashboard, change this URL to match // } // status { //permittedHostOfOrigin = "localhost" //If absent then get the host name via java.net.InetAddress.getLocalHost.getHostName . Override to control } //Get the older squerl-basd databases through JNDI (inside of tomcant, using tomcat's db connection pool) or directly via a db config here (for testing squerylDataSource { // database { // dataSourceFrom = "JNDI" //Can be JNDI or testDataSource . Use testDataSource for tests, JNDI everywhere else // jndiDataSourceName = "java:comp/env/jdbc/shrineDB" //or leave out for tests // } } authenticate { // realm = "SHRINE Researcher API" //todo figure out what this means. SHRINE-1978 usersource { // domain = "i2b2demo" //you must provide your own domain } } steward { // createTopicsMode = Pending //Can be Pending, Approved, or TopcisIgnoredJustLog. Pending by default //Pending - new topics start in the Pending state; researchers must wait for the Steward to approve them //Approved - new topics start in the Approved state; researchers can use them immediately //TopicsIgnoredJustLog - all queries are logged and approved; researchers don't need to create topics emailDataSteward { // sendAuditEmails = true //false to turn off the whole works of emailing the data steward // interval = "1 day" //Audit researchers daily // timeAfterMidnight = "6 hours" //Audit researchers at 6 am. If the interval is less than 1 day then this delay is ignored. // maxQueryCountBetweenAudits = 30 //If a researcher runs more than this many queries since the last audit audit her // minTimeBetweenAudits = "30 days" //If a researcher runs at least one query, audit those queries if this much time has passed //You must provide the email address of the shrine node system admin, to handle bounces and invalid addresses //from = "shrine-admin@example.com" //You must provide the email address of the data steward //to = "shrine-steward@example.com" // subject = "Audit SHRINE researchers" //The baseUrl for the data steward to be substituted in to email text. Must be supplied if it is used in the email text. //stewardBaseUrl = "https://example.com:8443/steward/" //Text to use for the email audit. // AUDIT_LINES will be replaced by a researcherLine for each researcher to audit. // STEWARD_BASE_URL will be replaced by the value in stewardBaseUrl if available. // emailBody = """Please audit the following users at STEWARD_BASE_URL at your earliest convinience: // //AUDIT_LINES""" //note that this can be a multiline message //Text to use per researcher to audit. //FULLNAME, USERNAME, COUNT and LAST_AUDIT_DATE will be replaced with appropriate text. // researcherLine = "FULLNAME (USERNAME) has run COUNT queries since LAST_AUDIT_DATE." } // database { // dataSourceFrom = "JNDI" //Can be JNDI or testDataSource . Use testDataSource for tests, JNDI everywhere else // jndiDataSourceName = "java:comp/env/jdbc/stewardDB" //or leave out for tests // slickProfileClassName = "slick.driver.MySQLDriver$" // Can be // slick.driver.H2Driver$ // slick.driver.MySQLDriver$ // slick.driver.PostgresDriver$ // slick.driver.SQLServerDriver$ // slick.driver.JdbcDriver$ // freeslick.OracleProfile$ // freeslick.MSSQLServerProfile$ // // (Yes, with the $ on the end) // For testing without JNDI // testDataSource { //typical test settings for unit tests //driverClassName = "org.h2.Driver" //url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1" //H2 embedded in-memory for unit tests //url = "jdbc:h2:~/stewardTest.h2" //H2 embedded on disk at ~/test // } // createTablesOnStart = false // true for testing with H2 in memory, when not running unit tests. Set to false normally // } // gruntWatch = false //false for production, true for mvn tomcat7:run . Allows the client javascript and html files to be loaded via gruntWatch . } email { //add javax mail properties from https://www.tutorialspoint.com/javamail_api/javamail_api_smtp_servers.htm here // javaxmail { // mail { // smtp { //for postfix on localhost // host = localhost // port = 25 //for AWS SES - See http://docs.aws.amazon.com/ses/latest/DeveloperGuide/send-using-smtp-java.html // host = email-smtp.us-east-1.amazonaws.com // port = 25 // transport.protocol = smtps // auth = true // starttls.enable = true // starttls.required = true // } // } // } //Must be set for AWS SES. See http://docs.aws.amazon.com/ses/latest/DeveloperGuide/send-using-smtp-java.html // authenticator { // username = yourUsername // password = yourPassword // } } } //Default settings for akka //akka { // loglevel = INFO // log-config-on-start = on // loggers = ["akka.event.slf4j.Slf4jLogger"] // Toggles whether the threads created by this ActorSystem should be daemons or not. Use daemonic inside of tomcat to support shutdown // daemonic = on //} //You'll see these settings for spray, baked into the .war files. //spray.servlet { // boot-class = "net.shrine.dashboard.net.shrine.metadata.Boot" //Don't change this one. It'll start the wrong (or no) application if you change it. // request-timeout = 30s //}