diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala index 1ff0117b5..a7d3356a2 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala @@ -1,254 +1,235 @@ package net.shrine.hornetqmom import java.util.UUID import net.shrine.log.Loggable import net.shrine.messagequeueservice.{Message, Queue} import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.source.ConfigSource import org.json4s.JsonAST.{JField, JObject} import org.json4s.native.Serialization import org.json4s.native.Serialization.{read, write} import org.json4s.{CustomSerializer, Formats, JString, NoTypeHints} import spray.http.StatusCodes import spray.routing.{HttpService, Route} import scala.collection.immutable.Seq import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} /** * A web API that provides access to the internal HornetQMom library. * Allows client to createQueue, deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * Created by yifan on 7/24/17. */ trait HornetQMomWebApi extends HttpService with Loggable { def enabled: Boolean = ConfigSource.config.getBoolean("shrine.messagequeue.hornetQWebApi.enabled") val warningMessage: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" def momRoute: Route = pathPrefix("mom") { if (!enabled) { val configProblem: CannotUseHornetQMomWebApiProblem = CannotUseHornetQMomWebApiProblem(new UnsupportedOperationException) warn(s"HornetQMomWebApi is not available to use due to configProblem ${configProblem.description}!") respondWithStatus(StatusCodes.NotFound) { complete(warningMessage) } } else { put { createQueue ~ sendMessage ~ acknowledge } ~ receiveMessage ~ getQueues ~ deleteQueue } } // SQS returns CreateQueueResult, which contains queueUrl: String def createQueue: Route = path("createQueue" / Segment) { queueName => detach() { val createdQueueTry: Try[Queue] = LocalHornetQMom.createQueueIfAbsent(queueName) createdQueueTry match { case Success(queue) => { implicit val formats = Serialization.formats(NoTypeHints) + QueueSerializer val response: String = write[Queue](queue)(formats) respondWithStatus(StatusCodes.Created) { complete(response) } } case Failure(x) => { internalServerErrorOccured(x, "createQueue") } } } } // SQS takes in DeleteMessageRequest, which contains a queueUrl: String and a ReceiptHandle: String // returns a DeleteMessageResult, toString for debugging def deleteQueue: Route = path("deleteQueue" / Segment) { queueName => put { detach() { val deleteQueueTry: Try[Unit] = LocalHornetQMom.deleteQueue(queueName) deleteQueueTry match { case Success(v) => { complete(StatusCodes.OK) } case Failure(x) => { internalServerErrorOccured(x, "deleteQueue") } } } } } // SQS sendMessage(String queueUrl, String messageBody) => SendMessageResult def sendMessage: Route = path("sendMessage" / Segment) { toQueue => requestInstance { request => val messageContent = request.entity.asString debug(s"sendMessage to $toQueue '$messageContent'") detach() { val sendTry: Try[Unit] = LocalHornetQMom.send(messageContent, Queue(toQueue)) sendTry match { case Success(v) => { complete(StatusCodes.Accepted) } case Failure(x) => { internalServerErrorOccured(x, "sendMessage") } } } } } // SQS ReceiveMessageResult receiveMessage(String queueUrl) def receiveMessage: Route = get { path("receiveMessage" / Segment) { fromQueue => parameter('timeOutSeconds ? 20) { timeOutSeconds => val timeout: Duration = Duration.create(timeOutSeconds, "seconds") detach() { val receiveTry: Try[Option[Message]] = LocalHornetQMom.receive(Queue(fromQueue), timeout) receiveTry match { case Success(optMessage) => { optMessage.fold(complete(StatusCodes.NotFound)){msg => implicit val formats = Serialization.formats(NoTypeHints) + MessageSerializer val messageJson = write(msg) complete(messageJson) } } case Failure(x) => { internalServerErrorOccured(x, "receiveMessage") } } } } } } // SQS has DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) def acknowledge: Route = path("acknowledge") { entity(as[String]) { messageUUID => implicit val formats: Formats = Serialization.formats(NoTypeHints) + MessageSerializer detach() { val msg: Message = read[Message](messageUUID)(formats, manifest[Message]) val id: UUID = msg.messageUUID val acknowledgeTry: Try[Unit] = LocalHornetQMom.completeMessage(id) acknowledgeTry match { case Success(v) => { complete(StatusCodes.ResetContent) } case Failure(x) => { internalServerErrorOccured(x, "acknowledge") } } } } } // Returns the names of the queues created on this server. Seq[Any] def getQueues: Route = path("getQueues") { get { detach() { implicit val formats = Serialization.formats(NoTypeHints) + QueueSerializer respondWithStatus(StatusCodes.OK) { val getQueuesTry: Try[Seq[Queue]] = LocalHornetQMom.queues getQueuesTry match { case Success(seqQueue) => { complete(write[Seq[Queue]](LocalHornetQMom.queues.get)(formats)) } case Failure(x) => { internalServerErrorOccured(x, "getQueues") } } } } } } def internalServerErrorOccured(x: Throwable, function: String): Route = { respondWithStatus(StatusCodes.InternalServerError) { val serverErrorProblem: HornetQMomServerErrorProblem = HornetQMomServerErrorProblem(x, function) debug(s"HornetQ encountered a Problem during $function, Problem Details: $serverErrorProblem") complete(s"HornetQ throws an exception while trying to $function. HornetQ Server response: ${x.getMessage}" + s"Exception is from ${x.getClass}") } } } object QueueSerializer extends CustomSerializer[Queue](format => ( { case JObject(JField("name", JString(s)) :: Nil) => Queue(s) }, { case queue: Queue => JObject(JField("name", JString(queue.name)) :: Nil) } )) -object MessageSerializer extends CustomSerializer[Message](format => ( - { - case JObject( - JField("hornetQClientMessage", - JObject( -// JField("type", JInt(msgType)) :: -// JField("messageID", JInt(id)) :: -// JField("durable", JBool(durable)) :: -// JField("expiration", JInt(expiration)) :: -// JField("timestamp", JInt(timestamp)) :: -// JField("priority", JInt(priority)) :: -// JField(Message.contentsKey, JString(contents)) :: - JField("uuid", JString(messageUUID)) :: - JField(Message.contentsKey, JString(contents)) - :: Nil)) - :: Nil) => { -// val hornetQClientMessage: ClientMessageImpl = new ClientMessageImpl(msgType.toByte, durable, expiration.toLong, timestamp.toLong, priority.toByte, 0) -// hornetQClientMessage.putStringProperty(Message.contentsKey, contents) - val id: UUID = UUID.fromString(messageUUID) - val message: Message = Message(id, contents) - message - } - }, { +object MessageSerializer extends CustomSerializer[Message](format => ( { + case JObject( + JField("hornetQClientMessage", + JObject( + JField("uuid", JString(messageUUID)) :: + JField(Message.contentsKey, JString(contents)) + :: Nil)) + :: Nil) => { + val id: UUID = UUID.fromString(messageUUID) + val message: Message = Message(id, contents) + message + } +}, { case msg: Message => JObject( JField("hornetQClientMessage", JObject( JField("uuid", JString(msg.messageUUID.toString)) :: JField(Message.contentsKey, JString(msg.contents)) - - // JField("type", JLong(msg.getClientMessage.getType)) :: -// JField("messageID", JLong(msg.getClientMessage.getMessageID)) :: -// JField("durable", JBool(msg.getClientMessage.isDurable)) :: -// JField("expiration", JLong(msg.getClientMessage.getExpiration)) :: -// JField("timestamp", JLong(msg.getClientMessage.getTimestamp)) :: -// JField("priority", JLong(msg.getClientMessage.getPriority)) :: -// JField(Message.contentsKey, JString(msg.contents)) :: -// JField("belongsToQueue", JString(msg.getBelongedQueueName)) :: Nil)) :: Nil) } )) case class HornetQMomServerErrorProblem(x:Throwable, function:String) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to a server error occurred in hornetQ." override val description: String = s"HornetQ throws an exception while trying to $function," + s" the server's response is: ${x.getMessage} from ${x.getClass}." } case class CannotUseHornetQMomWebApiProblem(x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to configuration in shrine.conf." override val description: String = "If you intend for this node to serve as this SHRINE network's messaging hub " + "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." + " You do not want to do this unless you are the hub admin!" } diff --git a/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala index af5b7d6a0..7936949d2 100644 --- a/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala +++ b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala @@ -1,106 +1,70 @@ package net.shrine.messagequeueservice import java.util.UUID import net.shrine.source.ConfigSource import net.shrine.spray.DefaultJsonSupport import org.hornetq.api.core.SimpleString import org.hornetq.api.core.client.{ClientConsumer, ClientMessage} import org.hornetq.core.client.impl.ClientMessageImpl import org.json4s.JsonAST.{JField, JObject} import org.json4s.{CustomSerializer, DefaultFormats, Formats, _} import scala.collection.immutable.Seq import scala.concurrent.duration.Duration import scala.util.Try /** * This object mostly imitates AWS SQS' API via an embedded HornetQ. See http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs.html * * @author david * @since 7/18/17 */ //todo in 1.23 all but the server side will use the client RemoteHornetQ implementation (which will call to the server at the hub) //todo in 1.24, create an AwsSqs implementation of the trait trait MessageQueueService { def createQueueIfAbsent(queueName:String): Try[Queue] def deleteQueue(queueName:String): Try[Unit] def queues: Try[Seq[Queue]] def send(contents:String,to:Queue): Try[Unit] def receive(from:Queue,timeout:Duration): Try[Option[Message]] def completeMessage(messageID:UUID): Try[Unit] } object MessageQueueService { lazy val service:MessageQueueService = { import scala.reflect.runtime.universe.runtimeMirror val momClassName = ConfigSource.config.getString("shrine.messagequeue.implementation") val classLoaderMirror = runtimeMirror(getClass.getClassLoader) val module = classLoaderMirror.staticModule(momClassName) classLoaderMirror.reflectModule(module).instance.asInstanceOf[MessageQueueService] } } case class Message(messageUUID: UUID, contents: String) extends DefaultJsonSupport { override implicit def json4sFormats: Formats = DefaultFormats -// object ClientMessageImpl { -// -// lazy val clientMessageImpl:ClientMessageImpl = { -// import scala.reflect.runtime.universe.runtimeMirror -// -// val clientMsgImplClassName = ConfigSource.config.getString("shrine.messagequeue.clientMessageImpl") -// val classLoaderMirror = runtimeMirror(getClass.getClassLoader) -// val module = classLoaderMirror.staticModule(clientMsgImplClassName) -// -// classLoaderMirror.reflectModule(module).instance.asInstanceOf[ClientMessageImpl] -// } -// } - -// def getClientMessage = hornetQMessage -// -// def contents = hornetQMessage.getStringProperty(Message.contentsKey) -// -// def getMessageID = hornetQMessage.getMessageID - - -// private def setMessageID(messageID: Long) = { -// val clientMessageImplClass = hornetQMessage.getClass -// println("all fields: " + clientMessageImplClass.getDeclaredFields.foreach(a => println("field: "+ a))) -// val messageIDField = clientMessageImplClass.getDeclaredField("messageID") -// messageIDField.setAccessible(true) -// messageIDField.setLong(hornetQMessage, messageID) -// println("messageID: " + hornetQMessage.getMessageID) -// } - -// private def setConsumer(consumer: ClientConsumer) = {} - -// def complete() = { -// this.setMessageID(this.messageId) -// -// hornetQMessage.acknowledge() -// } } object Message { val contentsKey = "contents" } case class Queue(var name:String) extends DefaultJsonSupport { // filter all (Unicode) characters that are not letters // filter neither letters nor (decimal) digits, replaceAll("[^\\p{L}]+", "") name = name.filterNot(c => c.isWhitespace).replaceAll("[^\\p{L}\\p{Nd}]+", "") if (name.length == 0) { throw new IllegalArgumentException("ERROR: A valid Queue name must contain at least one letter!") } } case class NoSuchQueueExistsInHornetQ(proposedQueue: Queue) extends Exception { override def getMessage: String = { s"Given Queue ${proposedQueue.name} does not exist in HornetQ server! Please create the queue first!" } }