diff --git a/apps/shrine-app/src/main/scala/net/shrine/wiring/ShrineOrchestrator.scala b/apps/shrine-app/src/main/scala/net/shrine/wiring/ShrineOrchestrator.scala index a7557cd80..95f190edd 100644 --- a/apps/shrine-app/src/main/scala/net/shrine/wiring/ShrineOrchestrator.scala +++ b/apps/shrine-app/src/main/scala/net/shrine/wiring/ShrineOrchestrator.scala @@ -1,156 +1,157 @@ package net.shrine.wiring import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.adapter.AdapterComponents import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.service.{AdapterRequestHandler, AdapterResource, AdapterService, I2b2AdminResource, I2b2AdminService} import net.shrine.broadcaster.dao.HubDao import net.shrine.broadcaster.dao.squeryl.SquerylHubDao import net.shrine.broadcaster.service.HubComponents import net.shrine.client.{EndpointConfig, JerseyHttpClient, OntClient, Poster, PosterOntClient} import net.shrine.config.ConfigExtensions import net.shrine.config.mappings.AdapterMappings import net.shrine.crypto.{BouncyKeyStoreCollection, KeyStoreDescriptorParser, SignerVerifierAdapter, TrustParam} import net.shrine.dao.squeryl.{DataSourceSquerylInitializer, SquerylDbAdapterSelecter, SquerylInitializer} import net.shrine.log.Loggable import net.shrine.ont.data.OntClientOntologyMetadata import net.shrine.protocol.{HiveCredentials, NodeId, ResultOutputType, ResultOutputTypes} import net.shrine.qep.{I2b2BroadcastResource, QueryEntryPointComponents, ShrineResource} import net.shrine.slick.TestableDataSourceCreator import net.shrine.source.ConfigSource import net.shrine.status.StatusJaxrs import org.squeryl.internals.DatabaseAdapter /** * @author clint * @since Jan 14, 2014 * * Application wiring for Shrine. */ object ShrineOrchestrator extends ShrineJaxrsResources with Loggable { override def resources: Iterable[AnyRef] = { Seq(statusJaxrs) ++ shrineResource ++ i2b2BroadcastResource ++ adapterResource ++ i2b2AdminResource ++ hubComponents.map(_.broadcasterMultiplexerResource) } //todo another pass to put things only used in one place into that place's apply(Config) //Load config from file on the classpath called "shrine.conf" lazy val config: Config = ConfigSource.config + //todo for SHRINE-2120 , can access the QEP's node name from config! val shrineConfig = config.getConfig("shrine") protected lazy val nodeId: NodeId = NodeId(shrineConfig.getString("humanReadableNodeName")) //TODO: Don't assume keystore lives on the filesystem, could come from classpath, etc lazy val keyStoreDescriptor = KeyStoreDescriptorParser(shrineConfig.getConfig("keystore"), shrineConfig.getConfigOrEmpty("hub"), shrineConfig.getConfigOrEmpty("queryEntryPoint")) lazy val certCollection: BouncyKeyStoreCollection = BouncyKeyStoreCollection.fromFileRecoverWithClassPath(keyStoreDescriptor) protected lazy val keystoreTrustParam: TrustParam = TrustParam.BouncyKeyStore(certCollection) //todo used by the adapterService and happyShrineService, but not by the QEP. maybe each can have its own signerVerifier lazy val signerVerifier = SignerVerifierAdapter(certCollection) protected lazy val dataSource: DataSource = TestableDataSourceCreator.dataSource(shrineConfig.getConfig("squerylDataSource.database")) protected lazy val squerylAdapter: DatabaseAdapter = SquerylDbAdapterSelecter.determineAdapter(shrineConfig.getString("shrineDatabaseType")) protected lazy val squerylInitializer: SquerylInitializer = new DataSourceSquerylInitializer(dataSource, squerylAdapter) //todo it'd be better for the adapter and qep to each have its own connection to the pm cell. private lazy val pmEndpoint: EndpointConfig = shrineConfig.getConfigured("pmEndpoint", EndpointConfig(_)) lazy val pmPoster: Poster = Poster(certCollection,pmEndpoint) protected lazy val breakdownTypes: Set[ResultOutputType] = shrineConfig.getOptionConfigured("breakdownResultOutputTypes", ResultOutputTypes.fromConfig).getOrElse(Set.empty) //todo why does the qep need a HubDao ? protected lazy val hubDao: HubDao = new SquerylHubDao(squerylInitializer, new net.shrine.broadcaster.dao.squeryl.tables.Tables) //todo really should be part of the adapter config, but is out in shrine's part of the name space lazy val crcHiveCredentials: HiveCredentials = shrineConfig.getConfigured("hiveCredentials", HiveCredentials(_, HiveCredentials.CRC)) val adapterComponents:Option[AdapterComponents] = shrineConfig.getOptionConfiguredIf("adapter", AdapterComponents( _, certCollection, squerylInitializer, breakdownTypes, crcHiveCredentials, signerVerifier, pmPoster, nodeId )) if (adapterComponents.isEmpty) warn("Adapter Components is improperly configured, please check the adapter section in shrine.conf") //todo maybe just break demeter too use this lazy val adapterService: Option[AdapterService] = adapterComponents.map(_.adapterService) //todo maybe just break demeter too use this lazy val i2b2AdminService: Option[I2b2AdminService] = adapterComponents.map(_.i2b2AdminService) //todo this is only used by happy lazy val adapterDao: Option[AdapterDao] = adapterComponents.map(_.adapterDao) //todo this is only used by happy lazy val adapterMappings: Option[AdapterMappings] = adapterComponents.map(_.adapterMappings) val shouldQuerySelf = "hub.shouldQuerySelf" lazy val localAdapterServiceOption: Option[AdapterRequestHandler] = if(shrineConfig.getOption(shouldQuerySelf,_.getBoolean).getOrElse(false)) { //todo give this a default value (of false) in the reference.conf for the Hub, and make it part of the Hub's apply(config) require(adapterService.isDefined, s"Self-querying requested because shrine.$shouldQuerySelf is true, but this node is not configured to have an adapter") adapterService } else None //todo eventually make this just another downstream node accessed via loopback lazy val hubComponents: Option[HubComponents] = shrineConfig.getOptionConfiguredIf("hub",HubComponents(_, keystoreTrustParam, nodeId, localAdapterServiceOption, breakdownTypes, hubDao )) //todo anything that requires qepConfig should be inside QueryEntryPointComponents's apply protected lazy val qepConfig = shrineConfig.getConfig("queryEntryPoint") lazy val queryEntryPointComponents:Option[QueryEntryPointComponents] = shrineConfig.getOptionConfiguredIf("queryEntryPoint", QueryEntryPointComponents(_, certCollection, breakdownTypes, hubComponents.map(_.broadcastDestinations), hubDao, //todo the QEP should not need the hub dao squerylInitializer, //todo could really have its own pmPoster //todo could really have its own )) protected lazy val pmUrlString: String = pmEndpoint.url.toString private[shrine] lazy val ontEndpoint: EndpointConfig = shrineConfig.getConfigured("ontEndpoint", EndpointConfig(_)) protected lazy val ontPoster: Poster = Poster(certCollection,ontEndpoint) //todo only used by happy outside of here lazy val ontologyMetadata: OntClientOntologyMetadata = { import scala.concurrent.duration._ //TODO: XXX: Un-hard-code max wait time param val ontClient: OntClient = new PosterOntClient(shrineConfig.getConfigured("hiveCredentials", HiveCredentials(_, HiveCredentials.ONT)), 1.minute, ontPoster) new OntClientOntologyMetadata(ontClient) } protected lazy val statusJaxrs: StatusJaxrs = StatusJaxrs(config) protected lazy val shrineResource: Option[ShrineResource] = queryEntryPointComponents.map(x => ShrineResource(x.shrineService)) protected lazy val i2b2BroadcastResource: Option[I2b2BroadcastResource] = queryEntryPointComponents.map(x => new I2b2BroadcastResource(x.i2b2Service,breakdownTypes)) protected lazy val adapterResource: Option[AdapterResource] = adapterService.map(AdapterResource(_)) protected lazy val i2b2AdminResource: Option[I2b2AdminResource] = i2b2AdminService.map(I2b2AdminResource(_, breakdownTypes)) def poster(keystoreCertCollection: BouncyKeyStoreCollection)(endpoint: EndpointConfig): Poster = { val httpClient = JerseyHttpClient(keystoreCertCollection, endpoint) Poster(endpoint.url.toString, httpClient) } } diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/BaseShrineRequest.scala b/commons/protocol/src/main/scala/net/shrine/protocol/BaseShrineRequest.scala index 45220cee7..307bfe770 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/BaseShrineRequest.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/BaseShrineRequest.scala @@ -1,17 +1,18 @@ package net.shrine.protocol import scala.concurrent.duration.Duration /** * @author clint * @since Feb 14, 2014 */ trait BaseShrineRequest extends ShrineMessage { def authn: AuthenticationInfo def waitTime: Duration def requestType: RequestType + //todo maybe add a request-originated-from optional field here? } object BaseShrineRequest extends XmlUnmarshallers.Chained(ShrineRequest.fromXml, NonI2b2ShrineRequest.fromXml) diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala b/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala index e06478e03..a9fd96f4f 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala @@ -1,189 +1,193 @@ package net.shrine.protocol import net.shrine.util.{Tries, XmlUtil, NodeSeqEnrichments, OptionEnrichments} import net.shrine.protocol.query.QueryDefinition import scala.xml.NodeSeq import scala.xml.Elem import scala.concurrent.duration.Duration import scala.util.Try import net.shrine.serialization.I2b2UnmarshallingHelpers /** * @author Bill Simons * @since 3/9/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html * * NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing */ final case class RunQueryRequest( override val projectId: String, override val waitTime: Duration, override val authn: AuthenticationInfo, networkQueryId: Long, topicId: Option[String], //data steward when required only, must be separate from topicName because HMS DSA does not supply topic names. topicName: Option[String], //data steward when required only outputTypes: Set[ResultOutputType], queryDefinition: QueryDefinition + //todo pick up SHRINE-2174 here nodeId: Option[NodeId] = None ) extends ShrineRequest(projectId, waitTime, authn) with CrcRequest with TranslatableRequest[RunQueryRequest] with HandleableShrineRequest with HandleableI2b2Request { override val requestType = RequestType.QueryDefinitionRequest override def handle(handler: ShrineRequestHandler, shouldBroadcast: Boolean) = handler.runQuery(this, shouldBroadcast) override def handleI2b2(handler: I2b2RequestHandler, shouldBroadcast: Boolean) = handler.runQuery(this, shouldBroadcast) def elideAuthenticationInfo: RunQueryRequest = copy(authn = AuthenticationInfo.elided) //NB: Sort ResultOutputTypes, for deterministic testing private def sortedOutputTypes: Seq[ResultOutputType] = outputTypes.toSeq.sortBy(_.name) override def toXml: NodeSeq = XmlUtil.stripWhitespace { import OptionEnrichments._ { headerFragment } { networkQueryId } { topicId.toXml() } { topicName.toXml() } { sortedOutputTypes.map(_.toXml) } { queryDefinition.toXml } - } + } //todo put { nodeId.map(_.toXml) } at the end for SHRINE-2174 protected override def i2b2MessageBody: NodeSeq = XmlUtil.stripWhitespace { import OptionEnrichments._ { i2b2PsmHeaderWithDomain } { queryDefinition.toI2b2 } { for { (outputType, i) <- sortedOutputTypes.zipWithIndex priorityIndex = outputType.id.getOrElse(i + 1) } yield { } } { topicId.toXml() } { topicName.toXml() } } override def withProject(proj: String) = this.copy(projectId = proj) override def withAuthn(ai: AuthenticationInfo) = this.copy(authn = ai) def withQueryDefinition(qDef: QueryDefinition) = this.copy(queryDefinition = qDef) def mapQueryDefinition(f: QueryDefinition => QueryDefinition) = this.withQueryDefinition(f(queryDefinition)) def withNetworkQueryId(id: Long) = this.copy(networkQueryId = id) } object RunQueryRequest extends I2b2XmlUnmarshaller[RunQueryRequest] with ShrineXmlUnmarshaller[RunQueryRequest] with ShrineRequestUnmarshaller with I2b2UnmarshallingHelpers { def apply(projectId: String, waitTime: Duration, authn: AuthenticationInfo, topicId: Option[String], //data steward when required only, must be separate from topicName because HMS DSA does not supply topic names. topicName: Option[String], //data steward when required only outputTypes: Set[ResultOutputType], queryDefinition: QueryDefinition ):RunQueryRequest = RunQueryRequest( projectId, waitTime, authn, BroadcastMessage.Ids.next, topicId, topicName, outputTypes, queryDefinition ) val neededI2b2Namespace = "http://www.i2b2.org/xsd/cell/crc/psm/1.1/" override def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[RunQueryRequest] = { val queryDefNode = xml \ "message_body" \ "request" \ "query_definition" val queryDefXml = queryDefNode.head match { //NB: elem.scope.getPrefix(neededI2b2Namespace) will return null if elem isn't part of a larger XML chunk that has //the http://www.i2b2.org/xsd/cell/crc/psm/1.1/ declared case elem: Elem => elem.copy(elem.scope.getPrefix(neededI2b2Namespace)) case _ => throw new Exception("When unmarshalling a RunQueryRequest, encountered unexpected XML: '" + queryDefNode + "', might be missing.") } val attempt = for { projectId <- i2b2ProjectId(xml) waitTime <- i2b2WaitTime(xml) authn <- i2b2AuthenticationInfo(xml) topicId = (xml \ "message_body" \ "shrine" \ "queryTopicID").headOption.map(XmlUtil.trim) topicName = (xml \ "message_body" \ "shrine" \ "queryTopicName").headOption.map(XmlUtil.trim) outputTypes = determineI2b2OutputTypes(breakdownTypes)(xml \ "message_body" \ "request" \ "result_output_list") queryDef <- QueryDefinition.fromI2b2(queryDefXml) } yield { RunQueryRequest( projectId, waitTime, authn, topicId, topicName, outputTypes, - queryDef) + queryDef + //None //todo inject the QEP's nodeId here for SHRINE-2120 + ) } attempt.map(addPatientCountXmlIfNecessary) } private def determineI2b2OutputTypes(breakdownTypes: Set[ResultOutputType])(nodeSeq: NodeSeq): Set[ResultOutputType] = { val sequence = (nodeSeq \ "result_output").flatMap { breakdownXml => val breakdownName = XmlUtil.trim(breakdownXml \ "@name") ResultOutputType.valueOf(breakdownTypes)(breakdownName) } sequence.toSet } private def determineShrineOutputTypes(nodeSeq: NodeSeq): Set[ResultOutputType] = { val attempts = (nodeSeq \ "resultType").map(ResultOutputType.fromXml) Tries.sequence(attempts).map(_.toSet).get } private[protocol] def addPatientCountXmlIfNecessary(req: RunQueryRequest): RunQueryRequest = { import ResultOutputType.PATIENT_COUNT_XML if (req.outputTypes.contains(PATIENT_COUNT_XML)) { req } else { req.copy(outputTypes = req.outputTypes + PATIENT_COUNT_XML) } } override def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[RunQueryRequest] = { import NodeSeqEnrichments.Strictness._ val attempt = for { projectId <- shrineProjectId(xml) waitTime <- shrineWaitTime(xml) authn <- shrineAuthenticationInfo(xml) queryId <- xml.withChild("queryId").map(XmlUtil.toLong) topicId = (xml \ "topicId").headOption.map(XmlUtil.trim) topicName = (xml \ "topicName").headOption.map(XmlUtil.trim) outputTypes <- xml.withChild("outputTypes").map(determineShrineOutputTypes) queryDef <- xml.withChild(QueryDefinition.rootTagName).flatMap(QueryDefinition.fromXml) +// nodeId <- NodeId.fromXml(xml \ "nodeId") } yield { - RunQueryRequest(projectId, waitTime, authn, queryId, topicId, topicName, outputTypes, queryDef) + RunQueryRequest(projectId, waitTime, authn, queryId, topicId, topicName, outputTypes, queryDef)//, Some(nodeId)) } attempt.map(addPatientCountXmlIfNecessary) } } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala index 2a0e0999a..b4a84a870 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala @@ -1,80 +1,85 @@ package net.shrine.broadcaster import net.shrine.adapter.client.{AdapterClient, RemoteAdapterClient} import net.shrine.broadcaster.dao.HubDao import net.shrine.client.TimeoutException import net.shrine.log.Loggable -import net.shrine.protocol.{BroadcastMessage, FailureResult, FailureResult$, RunQueryRequest, SingleNodeResult, Timeout} +import net.shrine.protocol.{BroadcastMessage, FailureResult, RunQueryRequest, SingleNodeResult, Timeout} import scala.concurrent.Future import scala.util.control.NonFatal /** * @author clint * @since Nov 15, 2013 */ final case class AdapterClientBroadcaster(destinations: Set[NodeHandle], dao: HubDao) extends Broadcaster with Loggable { logStartup() import scala.concurrent.ExecutionContext.Implicits.global override def broadcast(message: BroadcastMessage): Multiplexer = { logOutboundIfNecessary(message) val multiplexer: Multiplexer = new BufferingMultiplexer(destinations.map(_.nodeId)) for { nodeHandle <- destinations +//todo for SHRINE-2120 shrineResponse: SingleNodeResult <- callAdapter(message, nodeHandle) shrineResponse <- callAdapter(message, nodeHandle) } { - try { multiplexer.processResponse(shrineResponse) } + try { + //todo SHRINE-2120 send to the QEP queue here ... but how to tunnel in the additional piece ... where did the query come from ? + //message.request. + //todo send a shrineResponse.toXml . + multiplexer.processResponse(shrineResponse) } finally { logResultsIfNecessary(message, shrineResponse) } } multiplexer } private[broadcaster] def callAdapter(message: BroadcastMessage, nodeHandle: NodeHandle): Future[SingleNodeResult] = { val NodeHandle(nodeId, client) = nodeHandle client.query(message).recover { case e: TimeoutException => error(s"Broadcasting to $nodeId timed out") Timeout(nodeId) case NonFatal(e) => error(s"Broadcasting to $nodeId failed with ", e) FailureResult(nodeId, e) } } private[broadcaster] def logResultsIfNecessary(message: BroadcastMessage, result: SingleNodeResult): Unit = logIfNecessary(message) { _ => dao.logQueryResult(message.requestId, result) } private[broadcaster] def logOutboundIfNecessary(message: BroadcastMessage): Unit = logIfNecessary(message) { runQueryReq => dao.logOutboundQuery(message.requestId, message.networkAuthn, runQueryReq.queryDefinition) } private[broadcaster] def logIfNecessary(message: BroadcastMessage)(f: RunQueryRequest => Any): Unit = { message.request match { case runQueryReq: RunQueryRequest => f(runQueryReq) case _ => () } } private def logStartup(): Unit = { def clientToString(client: AdapterClient): String = client match { case r: RemoteAdapterClient => r.poster.url.toString case _ => "" } info(s"Initialized ${getClass.getSimpleName}, will broadcast to the following destinations:") destinations.toSeq.sortBy(_.nodeId.name).foreach { handle => info(s" ${handle.nodeId}: ${clientToString(handle.client)}") } } } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BufferingMultiplexer.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BufferingMultiplexer.scala index c394ffadb..7a4726d8d 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BufferingMultiplexer.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BufferingMultiplexer.scala @@ -1,52 +1,56 @@ package net.shrine.broadcaster import java.util.concurrent.atomic.AtomicInteger import net.shrine.log.Loggable import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Buffer import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration.Duration import scala.util.Try import net.shrine.protocol.NodeId import net.shrine.protocol.SingleNodeResult /** * @author clint * @date Nov 15, 2013 */ final class BufferingMultiplexer(broadcastTo: Set[NodeId]) extends Multiplexer with Loggable { - + + //todo rename for SHRINE-2120 + //todo if this code survives, change to a BlockingQueue and get rid of this lock private[this] val queue: Buffer[SingleNodeResult] = new ArrayBuffer private[this] val heardFrom = new AtomicInteger private[this] val promise = Promise[Iterable[SingleNodeResult]] private[this] val lock = new AnyRef private[this] def locked[T](f: => T): T = lock.synchronized { f } def resultsSoFar: Iterable[SingleNodeResult] = locked { queue.toIndexedSeq } def numHeardFrom: Int = heardFrom.get override def responses: Future[Iterable[SingleNodeResult]] = promise.future override def processResponse(response: SingleNodeResult): Unit = { - val latestCount = locked { + + val latestCount = locked { queue += response heardFrom.incrementAndGet } debug(s"Latest result count: $latestCount")//todo ; handled response" $response") if(latestCount == broadcastTo.size) { + //todo change to trySuccess or tryComplete promise.complete(Try(resultsSoFar)) } } } \ No newline at end of file diff --git a/hub/broadcaster-service/src/main/scala/net/shrine/broadcaster/service/BroadcasterMultiplexerService.scala b/hub/broadcaster-service/src/main/scala/net/shrine/broadcaster/service/BroadcasterMultiplexerService.scala index 4727822d1..c1df746d9 100644 --- a/hub/broadcaster-service/src/main/scala/net/shrine/broadcaster/service/BroadcasterMultiplexerService.scala +++ b/hub/broadcaster-service/src/main/scala/net/shrine/broadcaster/service/BroadcasterMultiplexerService.scala @@ -1,20 +1,21 @@ package net.shrine.broadcaster.service import net.shrine.protocol.BroadcastMessage import scala.concurrent.duration.Duration import net.shrine.broadcaster.{Broadcaster} import net.shrine.protocol.SingleNodeResult import scala.concurrent.Await /** * @author clint * @since Feb 28, 2014 */ //todo this class looks pretty thin and very suspicious final case class BroadcasterMultiplexerService(broadcaster: Broadcaster, maxQueryWaitTime: Duration) extends BroadcasterMultiplexerRequestHandler { override def broadcastAndMultiplex(message: BroadcastMessage): Iterable[SingleNodeResult] = { val multiplexer = broadcaster.broadcast(message) - + + //todo here's one end of a race condition waiting on a response from the Adapter. Await.result(multiplexer.responses, maxQueryWaitTime) } } \ No newline at end of file