diff --git a/apps/vagrant_deploy.sh b/apps/vagrant_deploy.sh
index ae01cd875..9642baa10 100755
--- a/apps/vagrant_deploy.sh
+++ b/apps/vagrant_deploy.sh
@@ -1,121 +1,122 @@
-#!/bin/bash -x
+#!/bin/bash
# Deploys SHRINE local builds into vagrant box after maven build
# For subprojects that can produce a .war file, install the .war file in a SHRINE vagrant with
# > mvn exec:exec
#--------------MODIFIABLE------------------------
# Change this to be: a list of destination vagrant machine names
declare -a MACHINES=( shrine-hub shrine-node1 )
# Change this to be: the .war file directory in vagrant box
VAGRANT_WAR_CONTEXT=/opt/shrine/tomcat/webapps
# Change this to be: local vagrant directory
VAGRANT_CONTEXT=~/vagrant-shrine-network
+#VAGRANT_CONTEXT=~/projects/vagrantShrine/vagrant-shrine-network
#------------DO NOT MODIFY-----------------------
BUFFER=/home/vagrant/m3shrine
APP_DIR=$PWD
# Local .war file directory
LOCAL_WAR_CONTEXT=target
# Functions
validates_war_context()
{
local local_war_context=$1
if [[ ! -d $local_war_context ]]; then
echo "ERROR: given $local_war_context does not exist!"
exit 1
fi
count=`ls -1 *.war 2>/dev/null | wc -l`
if [[ $count == 0 ]]; then
echo "ERROR: war file does not exist in dir $local_war_context!"
echo "Run 'mvn clean install' to generate the war file"
exit 1
fi
if [[ $count > 1 ]]; then
echo "ERROR: more than 1 war file exists in $local_war_context!"
exit 1
fi
}
validates_vagrant_context()
{
local vagrant_context=$1
if [[ ! -e $vagrant_context ]]; then
echo "ERROR: Vagrant context: $vagrant_context doesn't exist!"
exit 1
fi
if [[ ! -d $vagrant_context ]]; then
echo "ERROR: Vagrant context: $vagrant_context exists,\
but it is not a directory!"
exit 1
fi
}
generates_ssh_cfg()
{
local vagrant_context=$1
cd $vagrant_context
vagrant ssh-config > ssh.cfg
for machine in ${MACHINES[@]}; do
if [[ -d $BUFFER ]]; then
echo "Removing previous buffer $BUFFER"
vagrant ssh $machine -c "rm -r $BUFFER"
fi
vagrant ssh $machine -c "mkdir $BUFFER"
vagrant ssh $machine -c "chmod 0711 /home/vagrant"
vagrant ssh $machine -c "chmod 1777 $BUFFER"
done
}
cleanup()
{
local vagrant_context=$1
cd $vagrant_context
for machine in ${MACHINES[@]}; do
vagrant ssh $machine -c "rm -r $BUFFER"
done
rm ssh.cfg
}
scp_war()
{
local local_war_context=$1
local vagrant_war_context=$2
local vagrant_context=$3
local app_dir=$4
cd $app_dir
validates_war_context $local_war_context
local file_path=`find $local_war_context -type f -name "*.war"`
local local_war_file=`basename $file_path`
for machine in ${MACHINES[@]}; do
# copy local .war file into vagrant box buffer directory
echo "Deploying app .war file: $app_dir/$local_war_context/$local_war_file to machine: $machine"
cd $vagrant_context
scp -F ssh.cfg $app_dir/$local_war_context/$local_war_file $machine:$BUFFER
vagrant ssh $machine -c "sudo -u shrine cp --no-preserve=mode $BUFFER/$local_war_file $vagrant_war_context"
vagrant ssh $machine -c "sudo rm $BUFFER/$local_war_file"
done
}
# Main
echo "Starting deployment process to vagrant..."
echo "Current app dir: $APP_DIR"
echo "Local Vagrant dir: $VAGRANT_CONTEXT"
echo "Deploying app to ${#MACHINES[@]} machines"
#vagrant ssh shrine-hub -c
validates_war_context $LOCAL_WAR_CONTEXT
validates_vagrant_context $VAGRANT_CONTEXT
generates_ssh_cfg $VAGRANT_CONTEXT
scp_war $LOCAL_WAR_CONTEXT $VAGRANT_WAR_CONTEXT $VAGRANT_CONTEXT $APP_DIR
cleanup $VAGRANT_CONTEXT
echo "SUCCESS: Process completed! App has been deployed to vagrant!"
\ No newline at end of file
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala b/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala
index fb44de49e..17dcc3743 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/RunQueryRequest.scala
@@ -1,194 +1,194 @@
package net.shrine.protocol
import net.shrine.util.{Tries, XmlUtil, NodeSeqEnrichments, OptionEnrichments}
import net.shrine.protocol.query.QueryDefinition
import scala.xml.NodeSeq
import scala.xml.Elem
import scala.concurrent.duration.Duration
import scala.util.Try
import net.shrine.serialization.I2b2UnmarshallingHelpers
/**
* @author Bill Simons
* @since 3/9/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*
* NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing
*/
final case class RunQueryRequest(
override val projectId: String,
override val waitTime: Duration,
override val authn: AuthenticationInfo,
networkQueryId: Long,
topicId: Option[String], //data steward when required only, must be separate from topicName because HMS DSA does not supply topic names.
topicName: Option[String], //data steward when required only
outputTypes: Set[ResultOutputType],
queryDefinition: QueryDefinition,
nodeId: Option[NodeId] = None
) extends ShrineRequest(projectId, waitTime, authn) with CrcRequest with TranslatableRequest[RunQueryRequest] with HandleableShrineRequest with HandleableI2b2Request {
override val requestType = RequestType.QueryDefinitionRequest
override def handle(handler: ShrineRequestHandler, shouldBroadcast: Boolean) = handler.runQuery(this, shouldBroadcast)
override def handleI2b2(handler: I2b2RequestHandler, shouldBroadcast: Boolean) = handler.runQuery(this, shouldBroadcast)
def elideAuthenticationInfo: RunQueryRequest = copy(authn = AuthenticationInfo.elided)
//NB: Sort ResultOutputTypes, for deterministic testing
private def sortedOutputTypes: Seq[ResultOutputType] = outputTypes.toSeq.sortBy(_.name)
override def toXml: NodeSeq = XmlUtil.stripWhitespace {
import OptionEnrichments._
{ headerFragment }
{ networkQueryId }
{ topicId.toXml() }
{ topicName.toXml() }
{ sortedOutputTypes.map(_.toXml) }
{ queryDefinition.toXml }
{ nodeId.fold(NodeSeq.Empty)(_.toXml) }
- } //todo put { nodeId.map(_.toXml) } at the end for SHRINE-2174
+ }
protected override def i2b2MessageBody: NodeSeq = XmlUtil.stripWhitespace {
import OptionEnrichments._
{ i2b2PsmHeaderWithDomain }
{ queryDefinition.toI2b2 }
{
for {
(outputType, i) <- sortedOutputTypes.zipWithIndex
priorityIndex = outputType.id.getOrElse(i + 1)
} yield {
}
}
{ topicId.toXml() }
{ topicName.toXml() }
}
override def withProject(proj: String) = this.copy(projectId = proj)
override def withAuthn(ai: AuthenticationInfo) = this.copy(authn = ai)
def withQueryDefinition(qDef: QueryDefinition) = this.copy(queryDefinition = qDef)
def mapQueryDefinition(f: QueryDefinition => QueryDefinition) = this.withQueryDefinition(f(queryDefinition))
def withNetworkQueryId(id: Long) = this.copy(networkQueryId = id)
}
object RunQueryRequest extends I2b2XmlUnmarshaller[RunQueryRequest] with ShrineXmlUnmarshaller[RunQueryRequest] with ShrineRequestUnmarshaller with I2b2UnmarshallingHelpers {
def apply(projectId: String,
waitTime: Duration,
authn: AuthenticationInfo,
topicId: Option[String], //data steward when required only, must be separate from topicName because HMS DSA does not supply topic names.
topicName: Option[String], //data steward when required only
outputTypes: Set[ResultOutputType],
queryDefinition: QueryDefinition
):RunQueryRequest = RunQueryRequest(
projectId,
waitTime,
authn,
BroadcastMessage.Ids.next,
topicId,
topicName,
outputTypes,
queryDefinition
)
val neededI2b2Namespace = "http://www.i2b2.org/xsd/cell/crc/psm/1.1/"
override def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[RunQueryRequest] = {
val queryDefNode = xml \ "message_body" \ "request" \ "query_definition"
val queryDefXml = queryDefNode.head match {
//NB: elem.scope.getPrefix(neededI2b2Namespace) will return null if elem isn't part of a larger XML chunk that has
//the http://www.i2b2.org/xsd/cell/crc/psm/1.1/ declared
case elem: Elem => elem.copy(elem.scope.getPrefix(neededI2b2Namespace))
case _ => throw new Exception("When unmarshalling a RunQueryRequest, encountered unexpected XML: '" + queryDefNode + "', might be missing.")
}
val attempt = for {
projectId <- i2b2ProjectId(xml)
waitTime <- i2b2WaitTime(xml)
authn <- i2b2AuthenticationInfo(xml)
topicId = (xml \ "message_body" \ "shrine" \ "queryTopicID").headOption.map(XmlUtil.trim)
topicName = (xml \ "message_body" \ "shrine" \ "queryTopicName").headOption.map(XmlUtil.trim)
outputTypes = determineI2b2OutputTypes(breakdownTypes)(xml \ "message_body" \ "request" \ "result_output_list")
queryDef <- QueryDefinition.fromI2b2(queryDefXml)
} yield {
RunQueryRequest(
projectId,
waitTime,
authn,
topicId,
topicName,
outputTypes,
queryDef
//None //todo inject the QEP's nodeId here for SHRINE-2120
)
}
attempt.map(addPatientCountXmlIfNecessary)
}
private def determineI2b2OutputTypes(breakdownTypes: Set[ResultOutputType])(nodeSeq: NodeSeq): Set[ResultOutputType] = {
val sequence = (nodeSeq \ "result_output").flatMap { breakdownXml =>
val breakdownName = XmlUtil.trim(breakdownXml \ "@name")
ResultOutputType.valueOf(breakdownTypes)(breakdownName)
}
sequence.toSet
}
private def determineShrineOutputTypes(nodeSeq: NodeSeq): Set[ResultOutputType] = {
val attempts = (nodeSeq \ "resultType").map(ResultOutputType.fromXml)
Tries.sequence(attempts).map(_.toSet).get
}
private[protocol] def addPatientCountXmlIfNecessary(req: RunQueryRequest): RunQueryRequest = {
import ResultOutputType.PATIENT_COUNT_XML
if (req.outputTypes.contains(PATIENT_COUNT_XML)) { req }
else { req.copy(outputTypes = req.outputTypes + PATIENT_COUNT_XML) }
}
override def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[RunQueryRequest] = {
import NodeSeqEnrichments.Strictness._
val attempt = for {
projectId <- shrineProjectId(xml)
waitTime <- shrineWaitTime(xml)
authn <- shrineAuthenticationInfo(xml)
queryId <- xml.withChild("queryId").map(XmlUtil.toLong)
topicId = (xml \ "topicId").headOption.map(XmlUtil.trim)
topicName = (xml \ "topicName").headOption.map(XmlUtil.trim)
outputTypes <- xml.withChild("outputTypes").map(determineShrineOutputTypes)
queryDef <- xml.withChild(QueryDefinition.rootTagName).flatMap(QueryDefinition.fromXml)
nodeId <- NodeId.fromXmlOption(xml \ "nodeId")
} yield {
RunQueryRequest(projectId, waitTime, authn, queryId, topicId, topicName, outputTypes, queryDef, nodeId)
}
attempt.map(addPatientCountXmlIfNecessary)
}
}
\ No newline at end of file
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala
index b4a84a870..fc7c105b1 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala
@@ -1,85 +1,92 @@
package net.shrine.broadcaster
import net.shrine.adapter.client.{AdapterClient, RemoteAdapterClient}
import net.shrine.broadcaster.dao.HubDao
import net.shrine.client.TimeoutException
import net.shrine.log.Loggable
import net.shrine.protocol.{BroadcastMessage, FailureResult, RunQueryRequest, SingleNodeResult, Timeout}
import scala.concurrent.Future
import scala.util.control.NonFatal
/**
* @author clint
* @since Nov 15, 2013
*/
final case class AdapterClientBroadcaster(destinations: Set[NodeHandle], dao: HubDao) extends Broadcaster with Loggable {
logStartup()
import scala.concurrent.ExecutionContext.Implicits.global
override def broadcast(message: BroadcastMessage): Multiplexer = {
logOutboundIfNecessary(message)
val multiplexer: Multiplexer = new BufferingMultiplexer(destinations.map(_.nodeId))
for {
nodeHandle <- destinations
//todo for SHRINE-2120 shrineResponse: SingleNodeResult <- callAdapter(message, nodeHandle)
shrineResponse <- callAdapter(message, nodeHandle)
} {
try {
- //todo SHRINE-2120 send to the QEP queue here ... but how to tunnel in the additional piece ... where did the query come from ?
- //message.request.
- //todo send a shrineResponse.toXml .
+ message.request match {
+ case rqr:RunQueryRequest => {
+ debug(s"RunQueryRequest's nodeId is ${rqr.nodeId}")
+ //todo SHRINE-2120 send to the QEP queue named nodeId
+ //todo send a shrineResponse.toXml .
+ //todo use the json envelope when you get to SHRINE-2177
+ }
+ case _ => debug(s"Not a RunQueryRequest but a ${message.request.getClass.getSimpleName}.")
+ }
+
multiplexer.processResponse(shrineResponse) }
finally { logResultsIfNecessary(message, shrineResponse) }
}
multiplexer
}
private[broadcaster] def callAdapter(message: BroadcastMessage, nodeHandle: NodeHandle): Future[SingleNodeResult] = {
val NodeHandle(nodeId, client) = nodeHandle
client.query(message).recover {
case e: TimeoutException =>
error(s"Broadcasting to $nodeId timed out")
Timeout(nodeId)
case NonFatal(e) =>
error(s"Broadcasting to $nodeId failed with ", e)
FailureResult(nodeId, e)
}
}
private[broadcaster] def logResultsIfNecessary(message: BroadcastMessage, result: SingleNodeResult): Unit = logIfNecessary(message) { _ =>
dao.logQueryResult(message.requestId, result)
}
private[broadcaster] def logOutboundIfNecessary(message: BroadcastMessage): Unit = logIfNecessary(message) { runQueryReq =>
dao.logOutboundQuery(message.requestId, message.networkAuthn, runQueryReq.queryDefinition)
}
private[broadcaster] def logIfNecessary(message: BroadcastMessage)(f: RunQueryRequest => Any): Unit = {
message.request match {
case runQueryReq: RunQueryRequest => f(runQueryReq)
case _ => ()
}
}
private def logStartup(): Unit = {
def clientToString(client: AdapterClient): String = client match {
case r: RemoteAdapterClient => r.poster.url.toString
case _ => ""
}
info(s"Initialized ${getClass.getSimpleName}, will broadcast to the following destinations:")
destinations.toSeq.sortBy(_.nodeId.name).foreach { handle =>
info(s" ${handle.nodeId}: ${clientToString(handle.client)}")
}
}
}
\ No newline at end of file