diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
index 25e9f017f..e3ab7c6eb 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
@@ -1,405 +1,407 @@
package net.shrine.protocol
import javax.xml.datatype.XMLGregorianCalendar
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, Problem, ProblemDigest, ProblemSources}
import net.shrine.protocol.QueryResult.StatusType
import scala.xml.NodeSeq
import net.shrine.util.{NodeSeqEnrichments, OptionEnrichments, SEnum, Tries, XmlDateHelper, XmlUtil}
import net.shrine.serialization.{I2b2Marshaller, XmlMarshaller}
import scala.util.Try
/**
* @author Bill Simons
* @since 4/15/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*
* NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing
*/
final case class QueryResult (
resultId: Long,
instanceId: Long,
resultType: Option[ResultOutputType],
setSize: Long,
startDate: Option[XMLGregorianCalendar],
endDate: Option[XMLGregorianCalendar],
description: Option[String],
statusType: StatusType,
statusMessage: Option[String],
problemDigest: Option[ProblemDigest] = None,
breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty
) extends XmlMarshaller with I2b2Marshaller with Loggable {
//only used in tests
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
None, //description
statusType,
None) //statusMessage
}
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
description: String,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
Option(description),
statusType,
None) //statusMessage
}
def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match {
case Some(rt) => rt == testedResultType
case _ => false
}
import QueryResult._
//NB: Fragile, non-type-safe ==
def isError = statusType == StatusType.Error
def elapsed: Option[Long] = {
def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis
for {
start <- startDate
end <- endDate
} yield inMillis(end) - inMillis(start)
}
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = {
breakdowns.values.toSeq.sortBy(_.resultType.name)
}
override def toI2b2: NodeSeq = {
import OptionEnrichments._
XmlUtil.stripWhitespace {
{ resultId }
{ instanceId }
{ description.toXml() }
{
resultType.fold( ResultOutputType.ERROR.toI2b2NameOnly("") ){ rt =>
if(rt.isBreakdown) rt.toI2b2NameOnly()
else if (rt.isError) rt.toI2b2NameOnly() //The result type can be an error
else if (statusType.isError) rt.toI2b2NameOnly() //Or the status type can be an error
else rt.toI2b2
}
}
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ statusType }
{ statusType.toI2b2(this) }
{
//NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case,
//so if we're going to do that, let's produce saner XML.
sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data"))
}
}
}
override def toXml: NodeSeq = XmlUtil.stripWhitespace {
import OptionEnrichments._
{ resultId }
{ instanceId }
{ resultType.toXml(_.toXml) }
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ description.toXml() }
{ statusType }
{ statusMessage.toXml() }
{
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
sortedBreakdowns.map(_.toXml)
}
{ problemDigest.map(_.toXml).getOrElse("") }
}
def withId(id: Long): QueryResult = copy(resultId = id)
def withInstanceId(id: Long): QueryResult = copy(instanceId = id)
def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize))
def withSetSize(size: Long): QueryResult = copy(setSize = size)
def withDescription(desc: String): QueryResult = copy(description = Option(desc))
def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType))
def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData))
def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns)
}
object QueryResult {
final case class StatusType(
name: String,
isDone: Boolean,
i2b2Id: Option[Int] = Some(-1),
private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2,
isCrcCallCompleted:Boolean = true
) extends StatusType.Value {
def isError = this == StatusType.Error
+ def crcPromisedToFinishAfterReply = isCrcCallCompleted && !isDone
+
def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult)
}
object StatusType extends SEnum[StatusType] {
private val defaultToI2b2: QueryResult => NodeSeq = { queryResult =>
val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{
throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id")
}
{ i2b2Id }{ queryResult.statusType.name }
}
val noMessage:NodeSeq = null
val Error = StatusType("ERROR", isDone = true, None, { queryResult =>
(queryResult.statusMessage, queryResult.problemDigest) match {
case (Some(msg),Some(pd)) => { if(msg != "ERROR") msg else pd.summary } ++ pd.toXml
case (Some(msg),None) => { msg }
case (None,Some(pd)) => { pd.summary } ++ pd.toXml
case (None, None) => noMessage
}
})
val Finished = StatusType("FINISHED", isDone = true, Some(3))
//TODO: Can we use the same for Queued, Processing, and Incomplete?
val Processing = StatusType("PROCESSING", isDone = false, Some(2)) //todo only used in tests
val Queued = StatusType("QUEUED", isDone = false, Some(2))
val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2))
//TODO: What s should these have? Does anyone care?
val Held = StatusType("HELD", isDone = false)
val SmallQueue = StatusType("SMALL_QUEUE", isDone = false)
val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false)
val LargeQueue = StatusType("LARGE_QUEUE", isDone = false)
val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false)
//SHRINE's internal states as reported by the hub
val HubWillSubmit = StatusType("HUB_WILL_SUBMIT",isDone = false,isCrcCallCompleted = false)
}
def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong
private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption
def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _)
def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim
def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = {
import ResultOutputType.valueOf
val typeName = asText(elemNames: _*)(xml)
valueOf(typeName) orElse valueOf(breakdownTypes)(typeName)
}
def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = {
val attempt = parse(xml)
attempt.toOption
}
def extractProblemDigest(xml: NodeSeq):Option[ProblemDigest] = {
val subXml = xml \ "problem"
if(subXml.nonEmpty) Some(ProblemDigest.fromXml(xml))
else None
}
def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def extract(elemName: String): Option[String] = {
Option((xml \ elemName).text.trim).filter(!_.isEmpty)
}
def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate)
val asLong = extractLong(xml) _
import NodeSeqEnrichments.Strictness._
import Tries.sequence
def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = {
//noinspection ScalaUnnecessaryParentheses
val mapAttempt = for {
subXml <- xml.withChild(elemName)
envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes)))
mappings = envelopes.map(envelope => (envelope.resultType -> envelope))
} yield Map.empty ++ mappings
mapAttempt.getOrElse(Map.empty)
}
QueryResult(
resultId = asLong("resultId"),
instanceId = asLong("instanceId"),
resultType = extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml),
setSize = asLong("setSize"),
startDate = extractDate("startDate"),
endDate = extractDate("endDate"),
description = extract("description"),
statusType = StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call
statusMessage = extract("statusMessage"),
problemDigest = extractProblemDigest(xml),
breakdowns = extractBreakdowns("resultEnvelope")
)
}
def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def asLong = extractLong(xml) _
def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim)
def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate)
val statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get //TODO: Avoid fragile .get call
val statusMessage: Option[String] = asTextOption("query_status_type", "description")
val encodedProblemDigest = extractProblemDigest(xml \ "query_status_type")
val problemDigest = if (encodedProblemDigest.isDefined) encodedProblemDigest
else if (statusType.isError) Some(ErrorStatusFromCrc(statusMessage,xml.text).toDigest)
else None
case class Filling(
resultType:Option[ResultOutputType],
setSize:Long,
startDate:Option[XMLGregorianCalendar],
endDate:Option[XMLGregorianCalendar]
)
val filling = if(!statusType.isError) {
val resultType: Option[ResultOutputType] = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2)
val setSize = asLong("set_size")
val startDate = asXmlGcOption("start_date")
val endDate = asXmlGcOption("end_date")
Filling(resultType,setSize,startDate,endDate)
}
else {
val resultType = None
val setSize = 0L
val startDate = None
val endDate = None
Filling(resultType,setSize,startDate,endDate)
}
QueryResult(
resultId = asLong("result_instance_id"),
instanceId = asLong("query_instance_id"),
resultType = filling.resultType,
setSize = filling.setSize,
startDate = filling.startDate,
endDate = filling.endDate,
description = asTextOption("description"),
statusType = statusType,
statusMessage = statusMessage,
problemDigest = problemDigest
)
}
def errorResult(description: Option[String], statusMessage: String,problemDigest:ProblemDigest):QueryResult = {
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
def errorResult(description: Option[String], statusMessage: String,problem:Problem):QueryResult = {
val problemDigest = problem.toDigest
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
/**
* For reconstituting errorResults from a database
*/
def errorResult(description:Option[String], statusMessage:String, codec:String,stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq): QueryResult = {
// This would require parsing the stamp text to change, and without a standard locale that's nigh impossible.
// If this is replaced with real problems, then this can be addressed then. For now, passing on zero is the best bet.
val problemDigest = ProblemDigest(codec,stampText,summary,digestDescription,detailsXml,0)
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
}
case class ErrorStatusFromCrc(messageFromCrC:Option[String], xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) {
override val summary: String = "The I2B2 CRC reported an internal error."
override val description:String = s"The I2B2 CRC responded with status type ERROR ${messageFromCrC.fold(" but no message")(message => s"and a message of '$message'")}"
override val detailsXml =
CRC's Response is {xmlResponseFromCrc}
}
diff --git a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
index d4a292767..e304ad3b3 100644
--- a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
+++ b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
@@ -1,317 +1,324 @@
package net.shrine.qep
import net.shrine.aggregation.{Aggregator, Aggregators, DeleteQueryAggregator, FlagQueryAggregator, ReadInstanceResultsAggregator, ReadQueryDefinitionAggregator, RenameQueryAggregator, RunQueryAggregator, UnFlagQueryAggregator}
import net.shrine.audit.NetworkQueryId
import net.shrine.authentication.AuthenticationResult.Authenticated
import net.shrine.authentication.{AuthenticationResult, Authenticator, NotAuthenticatedException}
import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized}
import net.shrine.authorization.QueryAuthorizationService
import net.shrine.broadcaster.BroadcastAndAggregationService
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemSources}
import net.shrine.protocol.{AggregatedReadInstanceResultsResponse, AggregatedRunQueryResponse, AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, Credential, DeleteQueryRequest, FlagQueryRequest, NodeId, QueryInstance, QueryResult, ReadApprovedQueryTopicsRequest, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, ReadQueryDefinitionRequest, ReadQueryInstancesRequest, ReadQueryInstancesResponse, ReadResultOutputTypesRequest, ReadResultOutputTypesResponse, RenameQueryRequest, ResultOutputType, RunQueryRequest, UnFlagQueryRequest}
import net.shrine.qep.audit.QepAuditDb
import net.shrine.qep.dao.AuditDao
import net.shrine.qep.querydb.QepQueryDb
import net.shrine.util.XmlDateHelper
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.control.NonFatal
import scala.xml.NodeSeq
/**
* @author clint
* @since Feb 19, 2014
*/
trait AbstractQepService[BaseResp <: BaseShrineResponse] extends Loggable {
val commonName:String
val auditDao: AuditDao
val authenticator: Authenticator
val authorizationService: QueryAuthorizationService
val includeAggregateResult: Boolean
val broadcastAndAggregationService: BroadcastAndAggregationService
val queryTimeout: Duration
val breakdownTypes: Set[ResultOutputType]
val collectQepAudit:Boolean
val nodeId:NodeId
protected def doReadResultOutputTypes(request: ReadResultOutputTypesRequest): BaseResp = {
info(s"doReadResultOutputTypes($request)")
authenticateAndThen(request) { authResult =>
val resultOutputTypes = ResultOutputType.nonErrorTypes ++ breakdownTypes
//TODO: XXX: HACK: Would like to remove the cast
ReadResultOutputTypesResponse(resultOutputTypes).asInstanceOf[BaseResp]
}
}
protected def doFlagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = {
authenticateAndThen(request) { authResult =>
QepQueryDb.db.insertQepQueryFlag(request)
doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast,authResult)
}
}
protected def doUnFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = {
authenticateAndThen(request) { authResult =>
QepQueryDb.db.insertQepQueryFlag(request)
doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast,authResult)
}
}
protected def doRunQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor")
//store the query in the qep's database
doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast,authResult)
}
}
protected def doReadQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doReadQueryDefinition($request,$shouldBroadcast)")
doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast,authResult)
}
}
protected def doReadInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doReadInstanceResults($request,$shouldBroadcast)")
val networkId = request.shrineNetworkQueryId
//read from the QEP database code here. Only broadcast if some result is in some sketchy state
val resultsFromDb: Seq[QueryResult] = QepQueryDb.db.selectMostRecentQepResultsFor(networkId)
- //If any query result was pending
- val response = if (resultsFromDb.nonEmpty && (!resultsFromDb.exists(!_.statusType.isDone))) {
+ debug(s"result states are ${resultsFromDb.map(_.statusType).mkString(" ")}")
+
+ def shouldAskAdapters:Boolean = {
+ if (resultsFromDb.isEmpty) false //Don't ask if no results exist.
+ else
+ resultsFromDb.forall(_.statusType.isCrcCallCompleted) && //Be sure every CRC has replied to its adapter - hack to deal with this request going to all adapters
+ resultsFromDb.exists(_.statusType.crcPromisedToFinishAfterReply)
+ }
+
+ val response = if (!shouldAskAdapters) {
debug(s"Using qep cached results for query $networkId")
AggregatedReadInstanceResultsResponse(networkId, resultsFromDb).asInstanceOf[BaseResp]
}
else {
- debug(s"Requesting results for $networkId from network")
+ info(s"Requesting results for $networkId from network")
val response = doBroadcastQuery(request, new ReadInstanceResultsAggregator(networkId, false), shouldBroadcast,authResult)
//put the new results in the database if we got what we wanted
response match {
case arirr: AggregatedReadInstanceResultsResponse => arirr.results.foreach(r => QepQueryDb.db.insertQueryResult(networkId, r))
- case _ => //do nothing
+ case _ => warn(s"Response was a ${response.getClass.getSimpleName}, not a ${classOf[AggregatedReadInstanceResultsResponse].getSimpleName}: $response")
}
-
response
}
response
}
}
protected def doReadQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doReadQueryInstances($request,$shouldBroadcast)")
val now = XmlDateHelper.now
val networkQueryId = request.networkQueryId
val username = request.authn.username
val groupId = request.projectId
//NB: Return a dummy response, with a dummy QueryInstance containing the network (Shrine) id of the query we'd like
//to get "instances" for. This allows the legacy web client to formulate a request for query results that Shrine
//can understand, while meeting the conversational requirements of the legacy web client.
val instance = QueryInstance(networkQueryId.toString, networkQueryId.toString, username, groupId, now, now)
//TODO: XXX: HACK: Would like to remove the cast
//NB: Munge in username from authentication result
ReadQueryInstancesResponse(networkQueryId, authResult.username, groupId, Seq(instance)).asInstanceOf[BaseResp]
}
}
protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): ReadPreviousQueriesResponse = {
authenticateAndThen(request){ authResult =>
info(s"doReadPreviousQueries($request,$shouldBroadcast)")
//todo if any results are in one of the pending states go ahead and request them async (has to wait for async Shrine 1.24)
//pull queries from the local database.
QepQueryDb.db.selectPreviousQueries(request)
}
}
protected def doRenameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doRenameQuery($request,$shouldBroadcast)")
QepQueryDb.db.renamePreviousQuery(request)
doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast,authResult)
}
}
protected def doDeleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doDeleteQuery($request,$shouldBroadcast)")
QepQueryDb.db.markDeleted(request)
doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast,authResult)
}
}
protected def doReadApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseResp = authenticateAndThen(request) { _ =>
info(s"doReadApprovedQueryTopics($request,$shouldBroadcast)")
//TODO: XXX: HACK: Would like to remove the cast
authorizationService.readApprovedEntries(request) match {
case Left(errorResponse) => errorResponse.asInstanceOf[BaseResp]
case Right(validResponse) => validResponse.asInstanceOf[BaseResp]
}
}
import broadcastAndAggregationService.sendAndAggregate
protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean, authResult:Authenticated): BaseResp = {
debug(s"doBroadcastQuery($request) authResult is $authResult")
//NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another
//NB: Only audit RunQueryRequests
//When making BroadcastMessages
val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false))
/** Sends the query to the hub and starts a Future to watch for the results. Returns (almost) immediately. */
def queryHub( authorizedRequest: RunQueryRequest): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
info(s"Sending RunQueryRequest ${authorizedRequest.networkQueryId} to the Hub")
//Future[Unit] that this code fires off to the hub, then handles the BaseShrineResponse
sendAndAggregate(networkAuthn,authorizedRequest,aggregator,shouldBroadcast).transform ( { hubResponse:BaseShrineResponse =>
debug(s"Received $hubResponse for ${authorizedRequest.networkQueryId}")
hubResponse match {
case aggregated: AggregatedRunQueryResponse =>
info(s"Received ${aggregated.statusTypeName} and ignored results for ${authorizedRequest.networkQueryId}")
blocking {
//now that queries arrive at the QEP via a queue, no need to put them into the database. They can just fall on the floor
//todo record the query's state in a way that will stop polling in 1.23. See SHRINE-2148
}
case _ => IncorrectResponseFromHub(hubResponse,authorizedRequest.networkQueryId)
}
}
, throwable => {
throwable match {
case NonFatal(t) => ExceptionWhileHubRanQuery(t, authorizedRequest.networkQueryId)
case _ => //Let the infrastructure handle fatal exceptions
}
throwable
}
)
}
request match {
case runQueryRequest: RunQueryRequest =>
// inject modified, authorized runQueryRequest
//although it might make more sense to put this whole if block in the aggregator, the RunQueryAggregator lives in the hub, far from this DB code
//inject QEP NodeId
auditAuthorizeAndThen(runQueryRequest.copy(nodeId = Some(nodeId))) { authorizedRequest =>
debug(s"doBroadcastQuery authorizedRequest is $authorizedRequest")
// tuck the ACT audit metrics data into a database here
if (collectQepAudit) QepAuditDb.db.insertQepQuery(authorizedRequest,commonName)
QepQueryDb.db.insertQepQuery(authorizedRequest)
queryHub(authorizedRequest)
val response = AggregatedRunQueryResponse(
queryId = authorizedRequest.networkQueryId,
createDate = XmlDateHelper.now,
userId = networkAuthn.username,
groupId = networkAuthn.domain,
requestXml = authorizedRequest.queryDefinition,
queryInstanceId = authorizedRequest.networkQueryId,
results = Seq.empty,
statusTypeName = "RECEIVED_BY_QEP" //todo figure out the right statuses for 1.23. See SHRINE-2148
)
response.asInstanceOf[BaseResp]
}
case _ => doSynchronousQuery(networkAuthn,request,aggregator,shouldBroadcast)
}
}
private def doSynchronousQuery(networkAuthn: AuthenticationInfo,request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean) = {
info(s"doSynchronousQuery($request) started")
val response = waitFor(sendAndAggregate(networkAuthn, request, aggregator, shouldBroadcast)).asInstanceOf[BaseResp]
info(s"doSynchronousQuery($request) completed with response $response")
response
}
private[qep] val runQueryAggregatorFor: RunQueryRequest => RunQueryAggregator = Aggregators.forRunQueryRequest(includeAggregateResult)
protected def waitFor[R](futureResponse: Future[R]): R = {
XmlDateHelper.time("Waiting for aggregated results")(debug(_)) {
Await.result(futureResponse, queryTimeout)
}
}
private[qep] def auditAuthorizeAndThen[T](request: RunQueryRequest)(body: (RunQueryRequest => T)): T = {
auditTransactionally(request) {
debug(s"auditAuthorizeAndThen($request) with $authorizationService")
val authorizedRequest = authorizationService.authorizeRunQueryRequest(request) match {
case na: NotAuthorized => throw na.toException
case authorized: Authorized => request.copy(topicName = authorized.topicIdAndName.map(x => x._2))
}
body(authorizedRequest)
}
}
private[qep] def auditTransactionally[T](request: RunQueryRequest)(body: => T): T = {
try { body } finally {
auditDao.addAuditEntry(
request.projectId,
request.authn.domain,
request.authn.username,
request.queryDefinition.toI2b2String, //TODO: Use i2b2 format Still?
request.topicId)
}
}
//todo move auth code with SHRINE-1322
import AuthenticationResult._
private[qep] def authenticateAndThen[T](request: BaseShrineRequest)(f: Authenticated => T): T = {
val authResult = authenticator.authenticate(request.authn)
authResult match {
case a: Authenticated => f(a)
case na:NotAuthenticated => throw NotAuthenticatedException(na)
}
}
}
case class ExceptionWhileHubRanQuery(t: Throwable,networkQueryId: NetworkQueryId) extends AbstractProblem(ProblemSources.Qep) {
override val throwable = Some(t)
override def summary: String = s"${t.getClass.getSimpleName} encountered in an http call to run $networkQueryId query at the hub."
override def description: String = "The QEP generated an exception while making an http call to run a query at the hub."
override def detailsXml: NodeSeq = NodeSeq.fromSeq(
networkQueryId is {networkQueryId}
{throwableDetail.getOrElse("")}
)
}
case class IncorrectResponseFromHub(hubResponse:BaseShrineResponse,networkQueryId: NetworkQueryId) extends AbstractProblem(ProblemSources.Qep) {
override def summary: String = s"The hub responded to query $networkQueryId with a ${hubResponse.getClass.getSimpleName}, not a ${classOf[AggregatedRunQueryResponse].getSimpleName}"
override def description: String = s"The hub responded with something other than a ${classOf[AggregatedRunQueryResponse].getSimpleName} to the QEP's run query request."
override def detailsXml: NodeSeq = NodeSeq.fromSeq(
networkQueryId is {networkQueryId}
hubResponse is {hubResponse}
)
}
\ No newline at end of file