diff --git a/apps/shrine-app/src/main/scala/net/shrine/wiring/ShrineOrchestrator.scala b/apps/shrine-app/src/main/scala/net/shrine/wiring/ShrineOrchestrator.scala
index a7557cd80..95f190edd 100644
--- a/apps/shrine-app/src/main/scala/net/shrine/wiring/ShrineOrchestrator.scala
+++ b/apps/shrine-app/src/main/scala/net/shrine/wiring/ShrineOrchestrator.scala
@@ -1,156 +1,157 @@
package net.shrine.wiring
import javax.sql.DataSource
import com.typesafe.config.Config
import net.shrine.adapter.AdapterComponents
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.service.{AdapterRequestHandler, AdapterResource, AdapterService, I2b2AdminResource, I2b2AdminService}
import net.shrine.broadcaster.dao.HubDao
import net.shrine.broadcaster.dao.squeryl.SquerylHubDao
import net.shrine.broadcaster.service.HubComponents
import net.shrine.client.{EndpointConfig, JerseyHttpClient, OntClient, Poster, PosterOntClient}
import net.shrine.config.ConfigExtensions
import net.shrine.config.mappings.AdapterMappings
import net.shrine.crypto.{BouncyKeyStoreCollection, KeyStoreDescriptorParser, SignerVerifierAdapter, TrustParam}
import net.shrine.dao.squeryl.{DataSourceSquerylInitializer, SquerylDbAdapterSelecter, SquerylInitializer}
import net.shrine.log.Loggable
import net.shrine.ont.data.OntClientOntologyMetadata
import net.shrine.protocol.{HiveCredentials, NodeId, ResultOutputType, ResultOutputTypes}
import net.shrine.qep.{I2b2BroadcastResource, QueryEntryPointComponents, ShrineResource}
import net.shrine.slick.TestableDataSourceCreator
import net.shrine.source.ConfigSource
import net.shrine.status.StatusJaxrs
import org.squeryl.internals.DatabaseAdapter
/**
* @author clint
* @since Jan 14, 2014
*
* Application wiring for Shrine.
*/
object ShrineOrchestrator extends ShrineJaxrsResources with Loggable {
override def resources: Iterable[AnyRef] = {
Seq(statusJaxrs) ++
shrineResource ++
i2b2BroadcastResource ++
adapterResource ++
i2b2AdminResource ++
hubComponents.map(_.broadcasterMultiplexerResource)
}
//todo another pass to put things only used in one place into that place's apply(Config)
//Load config from file on the classpath called "shrine.conf"
lazy val config: Config = ConfigSource.config
+ //todo for SHRINE-2120 , can access the QEP's node name from config!
val shrineConfig = config.getConfig("shrine")
protected lazy val nodeId: NodeId = NodeId(shrineConfig.getString("humanReadableNodeName"))
//TODO: Don't assume keystore lives on the filesystem, could come from classpath, etc
lazy val keyStoreDescriptor = KeyStoreDescriptorParser(shrineConfig.getConfig("keystore"), shrineConfig.getConfigOrEmpty("hub"), shrineConfig.getConfigOrEmpty("queryEntryPoint"))
lazy val certCollection: BouncyKeyStoreCollection = BouncyKeyStoreCollection.fromFileRecoverWithClassPath(keyStoreDescriptor)
protected lazy val keystoreTrustParam: TrustParam = TrustParam.BouncyKeyStore(certCollection)
//todo used by the adapterService and happyShrineService, but not by the QEP. maybe each can have its own signerVerifier
lazy val signerVerifier = SignerVerifierAdapter(certCollection)
protected lazy val dataSource: DataSource = TestableDataSourceCreator.dataSource(shrineConfig.getConfig("squerylDataSource.database"))
protected lazy val squerylAdapter: DatabaseAdapter = SquerylDbAdapterSelecter.determineAdapter(shrineConfig.getString("shrineDatabaseType"))
protected lazy val squerylInitializer: SquerylInitializer = new DataSourceSquerylInitializer(dataSource, squerylAdapter)
//todo it'd be better for the adapter and qep to each have its own connection to the pm cell.
private lazy val pmEndpoint: EndpointConfig = shrineConfig.getConfigured("pmEndpoint", EndpointConfig(_))
lazy val pmPoster: Poster = Poster(certCollection,pmEndpoint)
protected lazy val breakdownTypes: Set[ResultOutputType] = shrineConfig.getOptionConfigured("breakdownResultOutputTypes", ResultOutputTypes.fromConfig).getOrElse(Set.empty)
//todo why does the qep need a HubDao ?
protected lazy val hubDao: HubDao = new SquerylHubDao(squerylInitializer, new net.shrine.broadcaster.dao.squeryl.tables.Tables)
//todo really should be part of the adapter config, but is out in shrine's part of the name space
lazy val crcHiveCredentials: HiveCredentials = shrineConfig.getConfigured("hiveCredentials", HiveCredentials(_, HiveCredentials.CRC))
val adapterComponents:Option[AdapterComponents] = shrineConfig.getOptionConfiguredIf("adapter", AdapterComponents(
_,
certCollection,
squerylInitializer,
breakdownTypes,
crcHiveCredentials,
signerVerifier,
pmPoster,
nodeId
))
if (adapterComponents.isEmpty)
warn("Adapter Components is improperly configured, please check the adapter section in shrine.conf")
//todo maybe just break demeter too use this
lazy val adapterService: Option[AdapterService] = adapterComponents.map(_.adapterService)
//todo maybe just break demeter too use this
lazy val i2b2AdminService: Option[I2b2AdminService] = adapterComponents.map(_.i2b2AdminService)
//todo this is only used by happy
lazy val adapterDao: Option[AdapterDao] = adapterComponents.map(_.adapterDao)
//todo this is only used by happy
lazy val adapterMappings: Option[AdapterMappings] = adapterComponents.map(_.adapterMappings)
val shouldQuerySelf = "hub.shouldQuerySelf"
lazy val localAdapterServiceOption: Option[AdapterRequestHandler] = if(shrineConfig.getOption(shouldQuerySelf,_.getBoolean).getOrElse(false)) { //todo give this a default value (of false) in the reference.conf for the Hub, and make it part of the Hub's apply(config)
require(adapterService.isDefined, s"Self-querying requested because shrine.$shouldQuerySelf is true, but this node is not configured to have an adapter")
adapterService
}
else None //todo eventually make this just another downstream node accessed via loopback
lazy val hubComponents: Option[HubComponents] = shrineConfig.getOptionConfiguredIf("hub",HubComponents(_,
keystoreTrustParam,
nodeId,
localAdapterServiceOption,
breakdownTypes,
hubDao
))
//todo anything that requires qepConfig should be inside QueryEntryPointComponents's apply
protected lazy val qepConfig = shrineConfig.getConfig("queryEntryPoint")
lazy val queryEntryPointComponents:Option[QueryEntryPointComponents] = shrineConfig.getOptionConfiguredIf("queryEntryPoint", QueryEntryPointComponents(_,
certCollection,
breakdownTypes,
hubComponents.map(_.broadcastDestinations),
hubDao, //todo the QEP should not need the hub dao
squerylInitializer, //todo could really have its own
pmPoster //todo could really have its own
))
protected lazy val pmUrlString: String = pmEndpoint.url.toString
private[shrine] lazy val ontEndpoint: EndpointConfig = shrineConfig.getConfigured("ontEndpoint", EndpointConfig(_))
protected lazy val ontPoster: Poster = Poster(certCollection,ontEndpoint)
//todo only used by happy outside of here
lazy val ontologyMetadata: OntClientOntologyMetadata = {
import scala.concurrent.duration._
//TODO: XXX: Un-hard-code max wait time param
val ontClient: OntClient = new PosterOntClient(shrineConfig.getConfigured("hiveCredentials", HiveCredentials(_, HiveCredentials.ONT)), 1.minute, ontPoster)
new OntClientOntologyMetadata(ontClient)
}
protected lazy val statusJaxrs: StatusJaxrs = StatusJaxrs(config)
protected lazy val shrineResource: Option[ShrineResource] = queryEntryPointComponents.map(x => ShrineResource(x.shrineService))
protected lazy val i2b2BroadcastResource: Option[I2b2BroadcastResource] = queryEntryPointComponents.map(x => new I2b2BroadcastResource(x.i2b2Service,breakdownTypes))
protected lazy val adapterResource: Option[AdapterResource] = adapterService.map(AdapterResource(_))
protected lazy val i2b2AdminResource: Option[I2b2AdminResource] = i2b2AdminService.map(I2b2AdminResource(_, breakdownTypes))
def poster(keystoreCertCollection: BouncyKeyStoreCollection)(endpoint: EndpointConfig): Poster = {
val httpClient = JerseyHttpClient(keystoreCertCollection, endpoint)
Poster(endpoint.url.toString, httpClient)
}
}
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/BaseShrineRequest.scala b/commons/protocol/src/main/scala/net/shrine/protocol/BaseShrineRequest.scala
index 45220cee7..307bfe770 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/BaseShrineRequest.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/BaseShrineRequest.scala
@@ -1,17 +1,18 @@
package net.shrine.protocol
import scala.concurrent.duration.Duration
/**
* @author clint
* @since Feb 14, 2014
*/
trait BaseShrineRequest extends ShrineMessage {
def authn: AuthenticationInfo
def waitTime: Duration
def requestType: RequestType
+ //todo maybe add a request-originated-from optional field here?
}
object BaseShrineRequest extends XmlUnmarshallers.Chained(ShrineRequest.fromXml, NonI2b2ShrineRequest.fromXml)
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala b/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala
index e06478e03..a9fd96f4f 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala
@@ -1,189 +1,193 @@
package net.shrine.protocol
import net.shrine.util.{Tries, XmlUtil, NodeSeqEnrichments, OptionEnrichments}
import net.shrine.protocol.query.QueryDefinition
import scala.xml.NodeSeq
import scala.xml.Elem
import scala.concurrent.duration.Duration
import scala.util.Try
import net.shrine.serialization.I2b2UnmarshallingHelpers
/**
* @author Bill Simons
* @since 3/9/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*
* NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing
*/
final case class RunQueryRequest(
override val projectId: String,
override val waitTime: Duration,
override val authn: AuthenticationInfo,
networkQueryId: Long,
topicId: Option[String], //data steward when required only, must be separate from topicName because HMS DSA does not supply topic names.
topicName: Option[String], //data steward when required only
outputTypes: Set[ResultOutputType],
queryDefinition: QueryDefinition
+ //todo pick up SHRINE-2174 here nodeId: Option[NodeId] = None
) extends ShrineRequest(projectId, waitTime, authn) with CrcRequest with TranslatableRequest[RunQueryRequest] with HandleableShrineRequest with HandleableI2b2Request {
override val requestType = RequestType.QueryDefinitionRequest
override def handle(handler: ShrineRequestHandler, shouldBroadcast: Boolean) = handler.runQuery(this, shouldBroadcast)
override def handleI2b2(handler: I2b2RequestHandler, shouldBroadcast: Boolean) = handler.runQuery(this, shouldBroadcast)
def elideAuthenticationInfo: RunQueryRequest = copy(authn = AuthenticationInfo.elided)
//NB: Sort ResultOutputTypes, for deterministic testing
private def sortedOutputTypes: Seq[ResultOutputType] = outputTypes.toSeq.sortBy(_.name)
override def toXml: NodeSeq = XmlUtil.stripWhitespace {
import OptionEnrichments._
{ headerFragment }
{ networkQueryId }
{ topicId.toXml() }
{ topicName.toXml() }
{ sortedOutputTypes.map(_.toXml) }
{ queryDefinition.toXml }
- }
+ } //todo put { nodeId.map(_.toXml) } at the end for SHRINE-2174
protected override def i2b2MessageBody: NodeSeq = XmlUtil.stripWhitespace {
import OptionEnrichments._
{ i2b2PsmHeaderWithDomain }
{ queryDefinition.toI2b2 }
{
for {
(outputType, i) <- sortedOutputTypes.zipWithIndex
priorityIndex = outputType.id.getOrElse(i + 1)
} yield {
}
}
{ topicId.toXml() }
{ topicName.toXml() }
}
override def withProject(proj: String) = this.copy(projectId = proj)
override def withAuthn(ai: AuthenticationInfo) = this.copy(authn = ai)
def withQueryDefinition(qDef: QueryDefinition) = this.copy(queryDefinition = qDef)
def mapQueryDefinition(f: QueryDefinition => QueryDefinition) = this.withQueryDefinition(f(queryDefinition))
def withNetworkQueryId(id: Long) = this.copy(networkQueryId = id)
}
object RunQueryRequest extends I2b2XmlUnmarshaller[RunQueryRequest] with ShrineXmlUnmarshaller[RunQueryRequest] with ShrineRequestUnmarshaller with I2b2UnmarshallingHelpers {
def apply(projectId: String,
waitTime: Duration,
authn: AuthenticationInfo,
topicId: Option[String], //data steward when required only, must be separate from topicName because HMS DSA does not supply topic names.
topicName: Option[String], //data steward when required only
outputTypes: Set[ResultOutputType],
queryDefinition: QueryDefinition
):RunQueryRequest = RunQueryRequest(
projectId,
waitTime,
authn,
BroadcastMessage.Ids.next,
topicId,
topicName,
outputTypes,
queryDefinition
)
val neededI2b2Namespace = "http://www.i2b2.org/xsd/cell/crc/psm/1.1/"
override def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[RunQueryRequest] = {
val queryDefNode = xml \ "message_body" \ "request" \ "query_definition"
val queryDefXml = queryDefNode.head match {
//NB: elem.scope.getPrefix(neededI2b2Namespace) will return null if elem isn't part of a larger XML chunk that has
//the http://www.i2b2.org/xsd/cell/crc/psm/1.1/ declared
case elem: Elem => elem.copy(elem.scope.getPrefix(neededI2b2Namespace))
case _ => throw new Exception("When unmarshalling a RunQueryRequest, encountered unexpected XML: '" + queryDefNode + "', might be missing.")
}
val attempt = for {
projectId <- i2b2ProjectId(xml)
waitTime <- i2b2WaitTime(xml)
authn <- i2b2AuthenticationInfo(xml)
topicId = (xml \ "message_body" \ "shrine" \ "queryTopicID").headOption.map(XmlUtil.trim)
topicName = (xml \ "message_body" \ "shrine" \ "queryTopicName").headOption.map(XmlUtil.trim)
outputTypes = determineI2b2OutputTypes(breakdownTypes)(xml \ "message_body" \ "request" \ "result_output_list")
queryDef <- QueryDefinition.fromI2b2(queryDefXml)
} yield {
RunQueryRequest(
projectId,
waitTime,
authn,
topicId,
topicName,
outputTypes,
- queryDef)
+ queryDef
+ //None //todo inject the QEP's nodeId here for SHRINE-2120
+ )
}
attempt.map(addPatientCountXmlIfNecessary)
}
private def determineI2b2OutputTypes(breakdownTypes: Set[ResultOutputType])(nodeSeq: NodeSeq): Set[ResultOutputType] = {
val sequence = (nodeSeq \ "result_output").flatMap { breakdownXml =>
val breakdownName = XmlUtil.trim(breakdownXml \ "@name")
ResultOutputType.valueOf(breakdownTypes)(breakdownName)
}
sequence.toSet
}
private def determineShrineOutputTypes(nodeSeq: NodeSeq): Set[ResultOutputType] = {
val attempts = (nodeSeq \ "resultType").map(ResultOutputType.fromXml)
Tries.sequence(attempts).map(_.toSet).get
}
private[protocol] def addPatientCountXmlIfNecessary(req: RunQueryRequest): RunQueryRequest = {
import ResultOutputType.PATIENT_COUNT_XML
if (req.outputTypes.contains(PATIENT_COUNT_XML)) { req }
else { req.copy(outputTypes = req.outputTypes + PATIENT_COUNT_XML) }
}
override def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[RunQueryRequest] = {
import NodeSeqEnrichments.Strictness._
val attempt = for {
projectId <- shrineProjectId(xml)
waitTime <- shrineWaitTime(xml)
authn <- shrineAuthenticationInfo(xml)
queryId <- xml.withChild("queryId").map(XmlUtil.toLong)
topicId = (xml \ "topicId").headOption.map(XmlUtil.trim)
topicName = (xml \ "topicName").headOption.map(XmlUtil.trim)
outputTypes <- xml.withChild("outputTypes").map(determineShrineOutputTypes)
queryDef <- xml.withChild(QueryDefinition.rootTagName).flatMap(QueryDefinition.fromXml)
+// nodeId <- NodeId.fromXml(xml \ "nodeId")
} yield {
- RunQueryRequest(projectId, waitTime, authn, queryId, topicId, topicName, outputTypes, queryDef)
+ RunQueryRequest(projectId, waitTime, authn, queryId, topicId, topicName, outputTypes, queryDef)//, Some(nodeId))
}
attempt.map(addPatientCountXmlIfNecessary)
}
}
\ No newline at end of file
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala
index 2a0e0999a..b4a84a870 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala
@@ -1,80 +1,85 @@
package net.shrine.broadcaster
import net.shrine.adapter.client.{AdapterClient, RemoteAdapterClient}
import net.shrine.broadcaster.dao.HubDao
import net.shrine.client.TimeoutException
import net.shrine.log.Loggable
-import net.shrine.protocol.{BroadcastMessage, FailureResult, FailureResult$, RunQueryRequest, SingleNodeResult, Timeout}
+import net.shrine.protocol.{BroadcastMessage, FailureResult, RunQueryRequest, SingleNodeResult, Timeout}
import scala.concurrent.Future
import scala.util.control.NonFatal
/**
* @author clint
* @since Nov 15, 2013
*/
final case class AdapterClientBroadcaster(destinations: Set[NodeHandle], dao: HubDao) extends Broadcaster with Loggable {
logStartup()
import scala.concurrent.ExecutionContext.Implicits.global
override def broadcast(message: BroadcastMessage): Multiplexer = {
logOutboundIfNecessary(message)
val multiplexer: Multiplexer = new BufferingMultiplexer(destinations.map(_.nodeId))
for {
nodeHandle <- destinations
+//todo for SHRINE-2120 shrineResponse: SingleNodeResult <- callAdapter(message, nodeHandle)
shrineResponse <- callAdapter(message, nodeHandle)
} {
- try { multiplexer.processResponse(shrineResponse) }
+ try {
+ //todo SHRINE-2120 send to the QEP queue here ... but how to tunnel in the additional piece ... where did the query come from ?
+ //message.request.
+ //todo send a shrineResponse.toXml .
+ multiplexer.processResponse(shrineResponse) }
finally { logResultsIfNecessary(message, shrineResponse) }
}
multiplexer
}
private[broadcaster] def callAdapter(message: BroadcastMessage, nodeHandle: NodeHandle): Future[SingleNodeResult] = {
val NodeHandle(nodeId, client) = nodeHandle
client.query(message).recover {
case e: TimeoutException =>
error(s"Broadcasting to $nodeId timed out")
Timeout(nodeId)
case NonFatal(e) =>
error(s"Broadcasting to $nodeId failed with ", e)
FailureResult(nodeId, e)
}
}
private[broadcaster] def logResultsIfNecessary(message: BroadcastMessage, result: SingleNodeResult): Unit = logIfNecessary(message) { _ =>
dao.logQueryResult(message.requestId, result)
}
private[broadcaster] def logOutboundIfNecessary(message: BroadcastMessage): Unit = logIfNecessary(message) { runQueryReq =>
dao.logOutboundQuery(message.requestId, message.networkAuthn, runQueryReq.queryDefinition)
}
private[broadcaster] def logIfNecessary(message: BroadcastMessage)(f: RunQueryRequest => Any): Unit = {
message.request match {
case runQueryReq: RunQueryRequest => f(runQueryReq)
case _ => ()
}
}
private def logStartup(): Unit = {
def clientToString(client: AdapterClient): String = client match {
case r: RemoteAdapterClient => r.poster.url.toString
case _ => ""
}
info(s"Initialized ${getClass.getSimpleName}, will broadcast to the following destinations:")
destinations.toSeq.sortBy(_.nodeId.name).foreach { handle =>
info(s" ${handle.nodeId}: ${clientToString(handle.client)}")
}
}
}
\ No newline at end of file
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BufferingMultiplexer.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BufferingMultiplexer.scala
index c394ffadb..7a4726d8d 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BufferingMultiplexer.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BufferingMultiplexer.scala
@@ -1,52 +1,56 @@
package net.shrine.broadcaster
import java.util.concurrent.atomic.AtomicInteger
import net.shrine.log.Loggable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.Duration
import scala.util.Try
import net.shrine.protocol.NodeId
import net.shrine.protocol.SingleNodeResult
/**
* @author clint
* @date Nov 15, 2013
*/
final class BufferingMultiplexer(broadcastTo: Set[NodeId]) extends Multiplexer with Loggable {
-
+
+ //todo rename for SHRINE-2120
+ //todo if this code survives, change to a BlockingQueue and get rid of this lock
private[this] val queue: Buffer[SingleNodeResult] = new ArrayBuffer
private[this] val heardFrom = new AtomicInteger
private[this] val promise = Promise[Iterable[SingleNodeResult]]
private[this] val lock = new AnyRef
private[this] def locked[T](f: => T): T = lock.synchronized { f }
def resultsSoFar: Iterable[SingleNodeResult] = locked { queue.toIndexedSeq }
def numHeardFrom: Int = heardFrom.get
override def responses: Future[Iterable[SingleNodeResult]] = promise.future
override def processResponse(response: SingleNodeResult): Unit = {
- val latestCount = locked {
+
+ val latestCount = locked {
queue += response
heardFrom.incrementAndGet
}
debug(s"Latest result count: $latestCount")//todo ; handled response" $response")
if(latestCount == broadcastTo.size) {
+ //todo change to trySuccess or tryComplete
promise.complete(Try(resultsSoFar))
}
}
}
\ No newline at end of file
diff --git a/hub/broadcaster-service/src/main/scala/net/shrine/broadcaster/service/BroadcasterMultiplexerService.scala b/hub/broadcaster-service/src/main/scala/net/shrine/broadcaster/service/BroadcasterMultiplexerService.scala
index 4727822d1..c1df746d9 100644
--- a/hub/broadcaster-service/src/main/scala/net/shrine/broadcaster/service/BroadcasterMultiplexerService.scala
+++ b/hub/broadcaster-service/src/main/scala/net/shrine/broadcaster/service/BroadcasterMultiplexerService.scala
@@ -1,20 +1,21 @@
package net.shrine.broadcaster.service
import net.shrine.protocol.BroadcastMessage
import scala.concurrent.duration.Duration
import net.shrine.broadcaster.{Broadcaster}
import net.shrine.protocol.SingleNodeResult
import scala.concurrent.Await
/**
* @author clint
* @since Feb 28, 2014
*/
//todo this class looks pretty thin and very suspicious
final case class BroadcasterMultiplexerService(broadcaster: Broadcaster, maxQueryWaitTime: Duration) extends BroadcasterMultiplexerRequestHandler {
override def broadcastAndMultiplex(message: BroadcastMessage): Iterable[SingleNodeResult] = {
val multiplexer = broadcaster.broadcast(message)
-
+
+ //todo here's one end of a race condition waiting on a response from the Adapter.
Await.result(multiplexer.responses, maxQueryWaitTime)
}
}
\ No newline at end of file