diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala index 4ebef810b..e69ea7b19 100644 --- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala +++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala @@ -1,175 +1,179 @@ package net.shrine.hornetqmom import net.shrine.log.Loggable import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue, QueueSerializer} import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.source.ConfigSource import org.json4s.native.Serialization import org.json4s.native.Serialization.{read, write} import org.json4s.{Formats, NoTypeHints} -import spray.http.{HttpEntity, HttpResponse, StatusCodes} +import spray.http.StatusCodes import spray.routing.{HttpService, Route} import scala.collection.immutable.Seq import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} /** * A web API that provides access to the internal HornetQMom library. * Allows client to createQueue, deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt * * Created by yifan on 7/24/17. */ trait HornetQMomWebApi extends HttpService with Loggable { val enabled: Boolean = ConfigSource.config.getString("shrine.messagequeue.hornetQWebApi.enabled").toBoolean def momRoute: Route = pathPrefix("mom") { -// if (!enabled) {respondWithStatus(StatusCodes.NotFound){complete{"HornetQWebApi needs to be enabled before use!"}}} -// else { + if (!enabled) { + respondWithStatus(StatusCodes.NotFound) { + complete("HornetQWebApi needs to be enabled before use!") + } + } else { put { createQueue ~ sendMessage ~ acknowledge } ~ receiveMessage ~ getQueues ~ deleteQueue + } } // SQS returns CreateQueueResult, which contains queueUrl: String def createQueue: Route = path("createQueue" / Segment) { queueName => detach() { val createdQueueTry: Try[Queue] = LocalHornetQMom.createQueueIfAbsent(queueName) createdQueueTry match { case Success(queue) => { implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer val response: String = write[Queue](queue)(formats) respondWithStatus(StatusCodes.Created) { complete(response) } } case Failure(x) => { internalServerErrorOccured(x, "createQueue") } } } } // SQS takes in DeleteMessageRequest, which contains a queueUrl: String and a ReceiptHandle: String // returns a DeleteMessageResult, toString for debugging def deleteQueue: Route = path("deleteQueue" / Segment) { queueName => put { detach() { val deleteQueueTry: Try[Unit] = LocalHornetQMom.deleteQueue(queueName) deleteQueueTry match { case Success(v) => { complete(StatusCodes.OK) } case Failure(x) => { internalServerErrorOccured(x, "deleteQueue") } } } } } // SQS sendMessage(String queueUrl, String messageBody) => SendMessageResult def sendMessage: Route = path("sendMessage" / Segment) { toQueue => requestInstance { request => val messageContent = request.entity.asString detach() { val sendTry: Try[Unit] = LocalHornetQMom.send(messageContent, Queue(toQueue)) sendTry match { case Success(v) => { complete(StatusCodes.Accepted) } case Failure(x) => { internalServerErrorOccured(x, "sendMessage") } } } } } // SQS ReceiveMessageResult receiveMessage(String queueUrl) def receiveMessage: Route = get { path("receiveMessage" / Segment) { fromQueue => parameter('timeOutSeconds ? 20) { timeOutSeconds => val timeout: Duration = Duration.create(timeOutSeconds, "seconds") detach() { val receiveTry: Try[Option[Message]] = LocalHornetQMom.receive(Queue(fromQueue), timeout) receiveTry match { case Success(optMessage) => { implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer optMessage.fold(complete(StatusCodes.NotFound))(msg => complete(write(optMessage)(formats))) } case Failure(x) => { internalServerErrorOccured(x, "receiveMessage") } } } } } } // SQS has DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) def acknowledge: Route = path("acknowledge") { entity(as[String]) { messageJSON => implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer detach() { val msg: Message = read[Message](messageJSON)(formats, manifest[Message]) val acknowledgeTry: Try[Unit] = LocalHornetQMom.completeMessage(msg) acknowledgeTry match { case Success(v) => { complete(StatusCodes.ResetContent) } case Failure(x) => { internalServerErrorOccured(x, "acknowledge") } } } } } // Returns the names of the queues created on this server. Seq[Any] def getQueues: Route = path("getQueues") { get { detach() { implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer respondWithStatus(StatusCodes.OK) { val getQueuesTry: Try[Seq[Queue]] = LocalHornetQMom.queues getQueuesTry match { case Success(seqQueue) => { complete(write[Seq[Queue]](LocalHornetQMom.queues.get)(formats)) } case Failure(x) => { internalServerErrorOccured(x, "getQueues") } } } } } } def internalServerErrorOccured(x: Throwable, function: String): Route = { respondWithStatus(StatusCodes.InternalServerError) { val serverErrorProblem: HornetQMomServerErrorProblem = HornetQMomServerErrorProblem(x, function) debug(s"HornetQ encountered a Problem during $function, Problem Details: $serverErrorProblem") complete(s"HornetQ throws an exception while trying to $function. HornetQ Server response: ${x.getMessage} from ${x.getClass}") } } } case class HornetQMomServerErrorProblem(x:Throwable, function:String) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) override val summary: String = "SHRINE cannot use HornetQMomWebApi due to a server error occurred in hornetQ." override val description: String = s"HornetQ throws an exception while trying to $function," + s" the server's response is: ${x.getMessage} from ${x.getClass}." } diff --git a/messagequeue/hornetqmom/src/test/resources/shrine.conf b/messagequeue/hornetqmom/src/test/resources/shrine.conf index e69de29bb..ee3c51fd7 100644 --- a/messagequeue/hornetqmom/src/test/resources/shrine.conf +++ b/messagequeue/hornetqmom/src/test/resources/shrine.conf @@ -0,0 +1,7 @@ +shrine { + messagequeue { + hornetQWebApiTest { + enabled = false + } + } +} \ No newline at end of file diff --git a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala index e9ebd6eef..e923ed7bb 100644 --- a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala +++ b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala @@ -1,105 +1,126 @@ package net.shrine.hornetqmom import akka.actor.ActorRefFactory import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue, QueueSerializer} +import net.shrine.source.ConfigSource import org.json4s.NoTypeHints import org.json4s.native.Serialization import org.json4s.native.Serialization.read import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import spray.http.HttpEntity import spray.http.StatusCodes._ import spray.testkit.ScalatestRouteTest import scala.collection.immutable.Seq /** * Created by yifan on 7/27/17. */ @RunWith(classOf[JUnitRunner]) class HornetQMomWebApiTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi { override def actorRefFactory: ActorRefFactory = system private val proposedQueueName = "test Queue" private val queue: Queue = Queue(proposedQueueName) private val queueName: String = queue.name // "testQueue" private val messageContent = "test Content" private var receivedMessage: String = "" "HornetQMomWebApi" should "create/delete the given queue, send/receive message, get queues" in { Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { val response = new String(body.data.toByteArray) implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) val responseQueueName = jsonToQueue.name assertResult(Created)(status) assertResult(queueName)(responseQueueName) } // should be OK to create a queue twice Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { val response = new String(body.data.toByteArray) implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer val jsonToQueue = read[Queue](response)(formats, manifest[Queue]) val responseQueueName = jsonToQueue.name assertResult(Created)(status) assertResult(queueName)(responseQueueName) } Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { assertResult(Accepted)(status) } Get(s"/mom/getQueues") ~> momRoute ~> check { implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer val response: String = new String(body.data.toByteArray) val jsonToSeq: Seq[Queue] = read[Seq[Queue]](response, false)(formats, manifest[Seq[Queue]]) assertResult(OK)(status) assertResult(queueName)(jsonToSeq.head.name) } // given timeout is 2 seconds Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=2") ~> momRoute ~> check { val response = new String(body.data.toByteArray) receivedMessage = response implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer val responseToMessage: Message = read[Message](response)(formats, manifest[Message]) assertResult(OK)(status) assert(responseToMessage.isInstanceOf[Message]) } Put("/mom/acknowledge", HttpEntity(s"$receivedMessage")) ~> momRoute ~> check { implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer assertResult(ResetContent)(status) } Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { assertResult(OK)(status) } } "HornetQMomWebApi" should "respond Internal server error with the corresponding error message when " + "failures occur while creating/deleting the given queue, sending/receiving message, getting queues" in { Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check { assertResult(InternalServerError)(status) } Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check { assertResult(InternalServerError)(status) } // given timeout is 1 seconds Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=1") ~> momRoute ~> check { assertResult(InternalServerError)(status) } } +} + +@RunWith(classOf[JUnitRunner]) +class HornetQMomWebApiConfigTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi { + override def actorRefFactory: ActorRefFactory = system + + private val queueName = "testQueue" + + + override val enabled: Boolean = ConfigSource.config.getString("shrine.messagequeue.hornetQWebApiTest.enabled").toBoolean + + "HornetQMomWebApi" should "block user from using the API and return a 404 response" in { + + Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check { + val response = new String(body.data.toByteArray) + + assertResult("HornetQWebApi needs to be enabled before use!")(response) + assertResult(NotFound)(status) + } + } } \ No newline at end of file