diff --git a/apps/vagrant_deploy.sh b/apps/vagrant_deploy.sh index ae01cd875..9642baa10 100755 --- a/apps/vagrant_deploy.sh +++ b/apps/vagrant_deploy.sh @@ -1,121 +1,122 @@ -#!/bin/bash -x +#!/bin/bash # Deploys SHRINE local builds into vagrant box after maven build # For subprojects that can produce a .war file, install the .war file in a SHRINE vagrant with # > mvn exec:exec #--------------MODIFIABLE------------------------ # Change this to be: a list of destination vagrant machine names declare -a MACHINES=( shrine-hub shrine-node1 ) # Change this to be: the .war file directory in vagrant box VAGRANT_WAR_CONTEXT=/opt/shrine/tomcat/webapps # Change this to be: local vagrant directory VAGRANT_CONTEXT=~/vagrant-shrine-network +#VAGRANT_CONTEXT=~/projects/vagrantShrine/vagrant-shrine-network #------------DO NOT MODIFY----------------------- BUFFER=/home/vagrant/m3shrine APP_DIR=$PWD # Local .war file directory LOCAL_WAR_CONTEXT=target # Functions validates_war_context() { local local_war_context=$1 if [[ ! -d $local_war_context ]]; then echo "ERROR: given $local_war_context does not exist!" exit 1 fi count=`ls -1 *.war 2>/dev/null | wc -l` if [[ $count == 0 ]]; then echo "ERROR: war file does not exist in dir $local_war_context!" echo "Run 'mvn clean install' to generate the war file" exit 1 fi if [[ $count > 1 ]]; then echo "ERROR: more than 1 war file exists in $local_war_context!" exit 1 fi } validates_vagrant_context() { local vagrant_context=$1 if [[ ! -e $vagrant_context ]]; then echo "ERROR: Vagrant context: $vagrant_context doesn't exist!" exit 1 fi if [[ ! -d $vagrant_context ]]; then echo "ERROR: Vagrant context: $vagrant_context exists,\ but it is not a directory!" exit 1 fi } generates_ssh_cfg() { local vagrant_context=$1 cd $vagrant_context vagrant ssh-config > ssh.cfg for machine in ${MACHINES[@]}; do if [[ -d $BUFFER ]]; then echo "Removing previous buffer $BUFFER" vagrant ssh $machine -c "rm -r $BUFFER" fi vagrant ssh $machine -c "mkdir $BUFFER" vagrant ssh $machine -c "chmod 0711 /home/vagrant" vagrant ssh $machine -c "chmod 1777 $BUFFER" done } cleanup() { local vagrant_context=$1 cd $vagrant_context for machine in ${MACHINES[@]}; do vagrant ssh $machine -c "rm -r $BUFFER" done rm ssh.cfg } scp_war() { local local_war_context=$1 local vagrant_war_context=$2 local vagrant_context=$3 local app_dir=$4 cd $app_dir validates_war_context $local_war_context local file_path=`find $local_war_context -type f -name "*.war"` local local_war_file=`basename $file_path` for machine in ${MACHINES[@]}; do # copy local .war file into vagrant box buffer directory echo "Deploying app .war file: $app_dir/$local_war_context/$local_war_file to machine: $machine" cd $vagrant_context scp -F ssh.cfg $app_dir/$local_war_context/$local_war_file $machine:$BUFFER vagrant ssh $machine -c "sudo -u shrine cp --no-preserve=mode $BUFFER/$local_war_file $vagrant_war_context" vagrant ssh $machine -c "sudo rm $BUFFER/$local_war_file" done } # Main echo "Starting deployment process to vagrant..." echo "Current app dir: $APP_DIR" echo "Local Vagrant dir: $VAGRANT_CONTEXT" echo "Deploying app to ${#MACHINES[@]} machines" #vagrant ssh shrine-hub -c validates_war_context $LOCAL_WAR_CONTEXT validates_vagrant_context $VAGRANT_CONTEXT generates_ssh_cfg $VAGRANT_CONTEXT scp_war $LOCAL_WAR_CONTEXT $VAGRANT_WAR_CONTEXT $VAGRANT_CONTEXT $APP_DIR cleanup $VAGRANT_CONTEXT echo "SUCCESS: Process completed! App has been deployed to vagrant!" \ No newline at end of file diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala b/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala index fb44de49e..17dcc3743 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala @@ -1,194 +1,194 @@ package net.shrine.protocol import net.shrine.util.{Tries, XmlUtil, NodeSeqEnrichments, OptionEnrichments} import net.shrine.protocol.query.QueryDefinition import scala.xml.NodeSeq import scala.xml.Elem import scala.concurrent.duration.Duration import scala.util.Try import net.shrine.serialization.I2b2UnmarshallingHelpers /** * @author Bill Simons * @since 3/9/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html * * NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing */ final case class RunQueryRequest( override val projectId: String, override val waitTime: Duration, override val authn: AuthenticationInfo, networkQueryId: Long, topicId: Option[String], //data steward when required only, must be separate from topicName because HMS DSA does not supply topic names. topicName: Option[String], //data steward when required only outputTypes: Set[ResultOutputType], queryDefinition: QueryDefinition, nodeId: Option[NodeId] = None ) extends ShrineRequest(projectId, waitTime, authn) with CrcRequest with TranslatableRequest[RunQueryRequest] with HandleableShrineRequest with HandleableI2b2Request { override val requestType = RequestType.QueryDefinitionRequest override def handle(handler: ShrineRequestHandler, shouldBroadcast: Boolean) = handler.runQuery(this, shouldBroadcast) override def handleI2b2(handler: I2b2RequestHandler, shouldBroadcast: Boolean) = handler.runQuery(this, shouldBroadcast) def elideAuthenticationInfo: RunQueryRequest = copy(authn = AuthenticationInfo.elided) //NB: Sort ResultOutputTypes, for deterministic testing private def sortedOutputTypes: Seq[ResultOutputType] = outputTypes.toSeq.sortBy(_.name) override def toXml: NodeSeq = XmlUtil.stripWhitespace { import OptionEnrichments._ { headerFragment } { networkQueryId } { topicId.toXml() } { topicName.toXml() } { sortedOutputTypes.map(_.toXml) } { queryDefinition.toXml } { nodeId.fold(NodeSeq.Empty)(_.toXml) } - } //todo put { nodeId.map(_.toXml) } at the end for SHRINE-2174 + } protected override def i2b2MessageBody: NodeSeq = XmlUtil.stripWhitespace { import OptionEnrichments._ { i2b2PsmHeaderWithDomain } { queryDefinition.toI2b2 } { for { (outputType, i) <- sortedOutputTypes.zipWithIndex priorityIndex = outputType.id.getOrElse(i + 1) } yield { } } { topicId.toXml() } { topicName.toXml() } } override def withProject(proj: String) = this.copy(projectId = proj) override def withAuthn(ai: AuthenticationInfo) = this.copy(authn = ai) def withQueryDefinition(qDef: QueryDefinition) = this.copy(queryDefinition = qDef) def mapQueryDefinition(f: QueryDefinition => QueryDefinition) = this.withQueryDefinition(f(queryDefinition)) def withNetworkQueryId(id: Long) = this.copy(networkQueryId = id) } object RunQueryRequest extends I2b2XmlUnmarshaller[RunQueryRequest] with ShrineXmlUnmarshaller[RunQueryRequest] with ShrineRequestUnmarshaller with I2b2UnmarshallingHelpers { def apply(projectId: String, waitTime: Duration, authn: AuthenticationInfo, topicId: Option[String], //data steward when required only, must be separate from topicName because HMS DSA does not supply topic names. topicName: Option[String], //data steward when required only outputTypes: Set[ResultOutputType], queryDefinition: QueryDefinition ):RunQueryRequest = RunQueryRequest( projectId, waitTime, authn, BroadcastMessage.Ids.next, topicId, topicName, outputTypes, queryDefinition ) val neededI2b2Namespace = "http://www.i2b2.org/xsd/cell/crc/psm/1.1/" override def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[RunQueryRequest] = { val queryDefNode = xml \ "message_body" \ "request" \ "query_definition" val queryDefXml = queryDefNode.head match { //NB: elem.scope.getPrefix(neededI2b2Namespace) will return null if elem isn't part of a larger XML chunk that has //the http://www.i2b2.org/xsd/cell/crc/psm/1.1/ declared case elem: Elem => elem.copy(elem.scope.getPrefix(neededI2b2Namespace)) case _ => throw new Exception("When unmarshalling a RunQueryRequest, encountered unexpected XML: '" + queryDefNode + "', might be missing.") } val attempt = for { projectId <- i2b2ProjectId(xml) waitTime <- i2b2WaitTime(xml) authn <- i2b2AuthenticationInfo(xml) topicId = (xml \ "message_body" \ "shrine" \ "queryTopicID").headOption.map(XmlUtil.trim) topicName = (xml \ "message_body" \ "shrine" \ "queryTopicName").headOption.map(XmlUtil.trim) outputTypes = determineI2b2OutputTypes(breakdownTypes)(xml \ "message_body" \ "request" \ "result_output_list") queryDef <- QueryDefinition.fromI2b2(queryDefXml) } yield { RunQueryRequest( projectId, waitTime, authn, topicId, topicName, outputTypes, queryDef //None //todo inject the QEP's nodeId here for SHRINE-2120 ) } attempt.map(addPatientCountXmlIfNecessary) } private def determineI2b2OutputTypes(breakdownTypes: Set[ResultOutputType])(nodeSeq: NodeSeq): Set[ResultOutputType] = { val sequence = (nodeSeq \ "result_output").flatMap { breakdownXml => val breakdownName = XmlUtil.trim(breakdownXml \ "@name") ResultOutputType.valueOf(breakdownTypes)(breakdownName) } sequence.toSet } private def determineShrineOutputTypes(nodeSeq: NodeSeq): Set[ResultOutputType] = { val attempts = (nodeSeq \ "resultType").map(ResultOutputType.fromXml) Tries.sequence(attempts).map(_.toSet).get } private[protocol] def addPatientCountXmlIfNecessary(req: RunQueryRequest): RunQueryRequest = { import ResultOutputType.PATIENT_COUNT_XML if (req.outputTypes.contains(PATIENT_COUNT_XML)) { req } else { req.copy(outputTypes = req.outputTypes + PATIENT_COUNT_XML) } } override def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[RunQueryRequest] = { import NodeSeqEnrichments.Strictness._ val attempt = for { projectId <- shrineProjectId(xml) waitTime <- shrineWaitTime(xml) authn <- shrineAuthenticationInfo(xml) queryId <- xml.withChild("queryId").map(XmlUtil.toLong) topicId = (xml \ "topicId").headOption.map(XmlUtil.trim) topicName = (xml \ "topicName").headOption.map(XmlUtil.trim) outputTypes <- xml.withChild("outputTypes").map(determineShrineOutputTypes) queryDef <- xml.withChild(QueryDefinition.rootTagName).flatMap(QueryDefinition.fromXml) nodeId <- NodeId.fromXmlOption(xml \ "nodeId") } yield { RunQueryRequest(projectId, waitTime, authn, queryId, topicId, topicName, outputTypes, queryDef, nodeId) } attempt.map(addPatientCountXmlIfNecessary) } } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala index b4a84a870..fc7c105b1 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala @@ -1,85 +1,92 @@ package net.shrine.broadcaster import net.shrine.adapter.client.{AdapterClient, RemoteAdapterClient} import net.shrine.broadcaster.dao.HubDao import net.shrine.client.TimeoutException import net.shrine.log.Loggable import net.shrine.protocol.{BroadcastMessage, FailureResult, RunQueryRequest, SingleNodeResult, Timeout} import scala.concurrent.Future import scala.util.control.NonFatal /** * @author clint * @since Nov 15, 2013 */ final case class AdapterClientBroadcaster(destinations: Set[NodeHandle], dao: HubDao) extends Broadcaster with Loggable { logStartup() import scala.concurrent.ExecutionContext.Implicits.global override def broadcast(message: BroadcastMessage): Multiplexer = { logOutboundIfNecessary(message) val multiplexer: Multiplexer = new BufferingMultiplexer(destinations.map(_.nodeId)) for { nodeHandle <- destinations //todo for SHRINE-2120 shrineResponse: SingleNodeResult <- callAdapter(message, nodeHandle) shrineResponse <- callAdapter(message, nodeHandle) } { try { - //todo SHRINE-2120 send to the QEP queue here ... but how to tunnel in the additional piece ... where did the query come from ? - //message.request. - //todo send a shrineResponse.toXml . + message.request match { + case rqr:RunQueryRequest => { + debug(s"RunQueryRequest's nodeId is ${rqr.nodeId}") + //todo SHRINE-2120 send to the QEP queue named nodeId + //todo send a shrineResponse.toXml . + //todo use the json envelope when you get to SHRINE-2177 + } + case _ => debug(s"Not a RunQueryRequest but a ${message.request.getClass.getSimpleName}.") + } + multiplexer.processResponse(shrineResponse) } finally { logResultsIfNecessary(message, shrineResponse) } } multiplexer } private[broadcaster] def callAdapter(message: BroadcastMessage, nodeHandle: NodeHandle): Future[SingleNodeResult] = { val NodeHandle(nodeId, client) = nodeHandle client.query(message).recover { case e: TimeoutException => error(s"Broadcasting to $nodeId timed out") Timeout(nodeId) case NonFatal(e) => error(s"Broadcasting to $nodeId failed with ", e) FailureResult(nodeId, e) } } private[broadcaster] def logResultsIfNecessary(message: BroadcastMessage, result: SingleNodeResult): Unit = logIfNecessary(message) { _ => dao.logQueryResult(message.requestId, result) } private[broadcaster] def logOutboundIfNecessary(message: BroadcastMessage): Unit = logIfNecessary(message) { runQueryReq => dao.logOutboundQuery(message.requestId, message.networkAuthn, runQueryReq.queryDefinition) } private[broadcaster] def logIfNecessary(message: BroadcastMessage)(f: RunQueryRequest => Any): Unit = { message.request match { case runQueryReq: RunQueryRequest => f(runQueryReq) case _ => () } } private def logStartup(): Unit = { def clientToString(client: AdapterClient): String = client match { case r: RemoteAdapterClient => r.poster.url.toString case _ => "" } info(s"Initialized ${getClass.getSimpleName}, will broadcast to the following destinations:") destinations.toSeq.sortBy(_.nodeId.name).foreach { handle => info(s" ${handle.nodeId}: ${clientToString(handle.client)}") } } } \ No newline at end of file