Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F60620427
MessageQueueService.scala
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Wed, May 1, 12:49
Size
3 KB
Mime Type
text/x-c++
Expires
Fri, May 3, 12:49 (1 d, 23 h)
Engine
blob
Format
Raw Data
Handle
17384988
Attached To
R2664 SHRINE MedCo Fork
MessageQueueService.scala
View Options
package net.shrine.messagequeueservice
import net.shrine.source.ConfigSource
import net.shrine.spray.DefaultJsonSupport
import org.hornetq.api.core.client.ClientMessage
import org.hornetq.core.client.impl.ClientMessageImpl
import org.json4s.JsonAST.{JField, JObject}
import org.json4s.{CustomSerializer, DefaultFormats, Formats, _}
import scala.collection.immutable.Seq
import scala.concurrent.duration.Duration
import scala.util.Try
/**
* This object mostly imitates AWS SQS' API via an embedded HornetQ. See http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs.html
*
* @author david
* @since 7/18/17
*/
//todo in 1.23 all but the server side will use the client RemoteHornetQ implementation (which will call to the server at the hub)
//todo in 1.24, create an AwsSqs implementation of the trait
trait MessageQueueService {
def createQueueIfAbsent(queueName:String): Try[Queue]
def deleteQueue(queueName:String): Try[Unit]
def queues: Try[Seq[Queue]]
def send(contents:String,to:Queue): Try[Unit]
def receive(from:Queue,timeout:Duration): Try[Option[Message]]
def completeMessage(message:Message): Try[Unit]
}
object MessageQueueService {
lazy val service:MessageQueueService = {
import scala.reflect.runtime.universe.runtimeMirror
val momClassName = ConfigSource.config.getString("shrine.messagequeue.implementation")
val classLoaderMirror = runtimeMirror(getClass.getClassLoader)
val module = classLoaderMirror.staticModule(momClassName)
classLoaderMirror.reflectModule(module).instance.asInstanceOf[MessageQueueService]
}
}
case class Message(hornetQMessage:ClientMessage) extends DefaultJsonSupport {
override implicit def json4sFormats: Formats = DefaultFormats
val propName = "contents"
def getClientMessage = hornetQMessage
def contents = hornetQMessage.getStringProperty(propName)
def getMessageID = hornetQMessage.getMessageID
def complete() = hornetQMessage.acknowledge()
}
case class Queue(var name:String) extends DefaultJsonSupport {
// filter all (Unicode) characters that are not letters
// filter neither letters nor (decimal) digits, replaceAll("[^\\p{L}]+", "")
name = name.filterNot(c => c.isWhitespace).replaceAll("[^\\p{L}\\p{Nd}]+", "")
if (name.length == 0) {
throw new IllegalArgumentException("ERROR: A valid Queue name must contain at least one letter!")
}
}
class QueueSerializer extends CustomSerializer[Queue](format => (
{
case JObject(JField("name", JString(s)) :: Nil) => Queue(s)
},
{
case queue: Queue =>
JObject(JField("name", JString(queue.name)) :: Nil)
}
))
class MessageSerializer extends CustomSerializer[Message](format => (
{
//JObject(List((hornetQMessage,JObject(List((type,JInt(0)), (durable,JBool(false)), (expiration,JInt(0)), (timestamp,JInt(1502218873012)), (priority,JInt(4)))))))
// type, durable, expiration, timestamp, priority, initialMessageBufferSize
case JObject(JField("hornetQMessage", JObject(JField("type", JInt(s)) :: JField("durable", JBool(d)) :: JField("expiration", JInt(e))
:: JField("timestamp", JInt(t)) :: JField("priority", JInt(p)) :: Nil)) :: Nil) =>
new Message(new ClientMessageImpl(s.toByte, d, e.toLong, t.toLong, p.toByte, 0))
},
{
case msg: Message =>
JObject(JField("hornetQMessage",
JObject(JField("type", JLong(msg.getClientMessage.getType)) ::
JField("durable", JBool(msg.getClientMessage.isDurable)) ::
JField("expiration", JLong(msg.getClientMessage.getExpiration)) ::
JField("timestamp", JLong(msg.getClientMessage.getTimestamp)) ::
JField("priority", JLong(msg.getClientMessage.getPriority)) :: Nil)) :: Nil)
}
))
case class NoSuchQueueExistsInHornetQ(proposedQueue: Queue) extends Exception {
override def getMessage: String = {
s"Given Queue ${proposedQueue.name} does not exist in HornetQ server! Please create the queue first!"
}
}
Event Timeline
Log In to Comment