diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala index 25e9f017f..e3ab7c6eb 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala @@ -1,405 +1,407 @@ package net.shrine.protocol import javax.xml.datatype.XMLGregorianCalendar import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, Problem, ProblemDigest, ProblemSources} import net.shrine.protocol.QueryResult.StatusType import scala.xml.NodeSeq import net.shrine.util.{NodeSeqEnrichments, OptionEnrichments, SEnum, Tries, XmlDateHelper, XmlUtil} import net.shrine.serialization.{I2b2Marshaller, XmlMarshaller} import scala.util.Try /** * @author Bill Simons * @since 4/15/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html * * NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing */ final case class QueryResult ( resultId: Long, instanceId: Long, resultType: Option[ResultOutputType], setSize: Long, startDate: Option[XMLGregorianCalendar], endDate: Option[XMLGregorianCalendar], description: Option[String], statusType: StatusType, statusMessage: Option[String], problemDigest: Option[ProblemDigest] = None, breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty ) extends XmlMarshaller with I2b2Marshaller with Loggable { //only used in tests def this( resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, statusType: QueryResult.StatusType) = { this( resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), None, //description statusType, None) //statusMessage } def this( resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, description: String, statusType: QueryResult.StatusType) = { this( resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), Option(description), statusType, None) //statusMessage } def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match { case Some(rt) => rt == testedResultType case _ => false } import QueryResult._ //NB: Fragile, non-type-safe == def isError = statusType == StatusType.Error def elapsed: Option[Long] = { def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis for { start <- startDate end <- endDate } yield inMillis(end) - inMillis(start) } //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = { breakdowns.values.toSeq.sortBy(_.resultType.name) } override def toI2b2: NodeSeq = { import OptionEnrichments._ XmlUtil.stripWhitespace { { resultId } { instanceId } { description.toXml() } { resultType.fold( ResultOutputType.ERROR.toI2b2NameOnly("") ){ rt => if(rt.isBreakdown) rt.toI2b2NameOnly() else if (rt.isError) rt.toI2b2NameOnly() //The result type can be an error else if (statusType.isError) rt.toI2b2NameOnly() //Or the status type can be an error else rt.toI2b2 } } { setSize } { startDate.toXml() } { endDate.toXml() } { statusType } { statusType.toI2b2(this) } { //NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case, //so if we're going to do that, let's produce saner XML. sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data")) } } } override def toXml: NodeSeq = XmlUtil.stripWhitespace { import OptionEnrichments._ { resultId } { instanceId } { resultType.toXml(_.toXml) } { setSize } { startDate.toXml() } { endDate.toXml() } { description.toXml() } { statusType } { statusMessage.toXml() } { //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. sortedBreakdowns.map(_.toXml) } { problemDigest.map(_.toXml).getOrElse("") } } def withId(id: Long): QueryResult = copy(resultId = id) def withInstanceId(id: Long): QueryResult = copy(instanceId = id) def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize)) def withSetSize(size: Long): QueryResult = copy(setSize = size) def withDescription(desc: String): QueryResult = copy(description = Option(desc)) def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType)) def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData)) def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns) } object QueryResult { final case class StatusType( name: String, isDone: Boolean, i2b2Id: Option[Int] = Some(-1), private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2, isCrcCallCompleted:Boolean = true ) extends StatusType.Value { def isError = this == StatusType.Error + def crcPromisedToFinishAfterReply = isCrcCallCompleted && !isDone + def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult) } object StatusType extends SEnum[StatusType] { private val defaultToI2b2: QueryResult => NodeSeq = { queryResult => val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{ throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id") } { i2b2Id }{ queryResult.statusType.name } } val noMessage:NodeSeq = null val Error = StatusType("ERROR", isDone = true, None, { queryResult => (queryResult.statusMessage, queryResult.problemDigest) match { case (Some(msg),Some(pd)) => { if(msg != "ERROR") msg else pd.summary } ++ pd.toXml case (Some(msg),None) => { msg } case (None,Some(pd)) => { pd.summary } ++ pd.toXml case (None, None) => noMessage } }) val Finished = StatusType("FINISHED", isDone = true, Some(3)) //TODO: Can we use the same for Queued, Processing, and Incomplete? val Processing = StatusType("PROCESSING", isDone = false, Some(2)) //todo only used in tests val Queued = StatusType("QUEUED", isDone = false, Some(2)) val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2)) //TODO: What s should these have? Does anyone care? val Held = StatusType("HELD", isDone = false) val SmallQueue = StatusType("SMALL_QUEUE", isDone = false) val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false) val LargeQueue = StatusType("LARGE_QUEUE", isDone = false) val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false) //SHRINE's internal states as reported by the hub val HubWillSubmit = StatusType("HUB_WILL_SUBMIT",isDone = false,isCrcCallCompleted = false) } def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _) def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = { import ResultOutputType.valueOf val typeName = asText(elemNames: _*)(xml) valueOf(typeName) orElse valueOf(breakdownTypes)(typeName) } def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = { val attempt = parse(xml) attempt.toOption } def extractProblemDigest(xml: NodeSeq):Option[ProblemDigest] = { val subXml = xml \ "problem" if(subXml.nonEmpty) Some(ProblemDigest.fromXml(xml)) else None } def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def extract(elemName: String): Option[String] = { Option((xml \ elemName).text.trim).filter(!_.isEmpty) } def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate) val asLong = extractLong(xml) _ import NodeSeqEnrichments.Strictness._ import Tries.sequence def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = { //noinspection ScalaUnnecessaryParentheses val mapAttempt = for { subXml <- xml.withChild(elemName) envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes))) mappings = envelopes.map(envelope => (envelope.resultType -> envelope)) } yield Map.empty ++ mappings mapAttempt.getOrElse(Map.empty) } QueryResult( resultId = asLong("resultId"), instanceId = asLong("instanceId"), resultType = extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml), setSize = asLong("setSize"), startDate = extractDate("startDate"), endDate = extractDate("endDate"), description = extract("description"), statusType = StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call statusMessage = extract("statusMessage"), problemDigest = extractProblemDigest(xml), breakdowns = extractBreakdowns("resultEnvelope") ) } def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def asLong = extractLong(xml) _ def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim) def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate) val statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get //TODO: Avoid fragile .get call val statusMessage: Option[String] = asTextOption("query_status_type", "description") val encodedProblemDigest = extractProblemDigest(xml \ "query_status_type") val problemDigest = if (encodedProblemDigest.isDefined) encodedProblemDigest else if (statusType.isError) Some(ErrorStatusFromCrc(statusMessage,xml.text).toDigest) else None case class Filling( resultType:Option[ResultOutputType], setSize:Long, startDate:Option[XMLGregorianCalendar], endDate:Option[XMLGregorianCalendar] ) val filling = if(!statusType.isError) { val resultType: Option[ResultOutputType] = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2) val setSize = asLong("set_size") val startDate = asXmlGcOption("start_date") val endDate = asXmlGcOption("end_date") Filling(resultType,setSize,startDate,endDate) } else { val resultType = None val setSize = 0L val startDate = None val endDate = None Filling(resultType,setSize,startDate,endDate) } QueryResult( resultId = asLong("result_instance_id"), instanceId = asLong("query_instance_id"), resultType = filling.resultType, setSize = filling.setSize, startDate = filling.startDate, endDate = filling.endDate, description = asTextOption("description"), statusType = statusType, statusMessage = statusMessage, problemDigest = problemDigest ) } def errorResult(description: Option[String], statusMessage: String,problemDigest:ProblemDigest):QueryResult = { QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } def errorResult(description: Option[String], statusMessage: String,problem:Problem):QueryResult = { val problemDigest = problem.toDigest QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } /** * For reconstituting errorResults from a database */ def errorResult(description:Option[String], statusMessage:String, codec:String,stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq): QueryResult = { // This would require parsing the stamp text to change, and without a standard locale that's nigh impossible. // If this is replaced with real problems, then this can be addressed then. For now, passing on zero is the best bet. val problemDigest = ProblemDigest(codec,stampText,summary,digestDescription,detailsXml,0) QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } } case class ErrorStatusFromCrc(messageFromCrC:Option[String], xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) { override val summary: String = "The I2B2 CRC reported an internal error." override val description:String = s"The I2B2 CRC responded with status type ERROR ${messageFromCrC.fold(" but no message")(message => s"and a message of '$message'")}" override val detailsXml =

CRC's Response is {xmlResponseFromCrc}
} diff --git a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala index d4a292767..e304ad3b3 100644 --- a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala +++ b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala @@ -1,317 +1,324 @@ package net.shrine.qep import net.shrine.aggregation.{Aggregator, Aggregators, DeleteQueryAggregator, FlagQueryAggregator, ReadInstanceResultsAggregator, ReadQueryDefinitionAggregator, RenameQueryAggregator, RunQueryAggregator, UnFlagQueryAggregator} import net.shrine.audit.NetworkQueryId import net.shrine.authentication.AuthenticationResult.Authenticated import net.shrine.authentication.{AuthenticationResult, Authenticator, NotAuthenticatedException} import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized} import net.shrine.authorization.QueryAuthorizationService import net.shrine.broadcaster.BroadcastAndAggregationService import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.protocol.{AggregatedReadInstanceResultsResponse, AggregatedRunQueryResponse, AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, Credential, DeleteQueryRequest, FlagQueryRequest, NodeId, QueryInstance, QueryResult, ReadApprovedQueryTopicsRequest, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, ReadQueryDefinitionRequest, ReadQueryInstancesRequest, ReadQueryInstancesResponse, ReadResultOutputTypesRequest, ReadResultOutputTypesResponse, RenameQueryRequest, ResultOutputType, RunQueryRequest, UnFlagQueryRequest} import net.shrine.qep.audit.QepAuditDb import net.shrine.qep.dao.AuditDao import net.shrine.qep.querydb.QepQueryDb import net.shrine.util.XmlDateHelper import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} import scala.util.control.NonFatal import scala.xml.NodeSeq /** * @author clint * @since Feb 19, 2014 */ trait AbstractQepService[BaseResp <: BaseShrineResponse] extends Loggable { val commonName:String val auditDao: AuditDao val authenticator: Authenticator val authorizationService: QueryAuthorizationService val includeAggregateResult: Boolean val broadcastAndAggregationService: BroadcastAndAggregationService val queryTimeout: Duration val breakdownTypes: Set[ResultOutputType] val collectQepAudit:Boolean val nodeId:NodeId protected def doReadResultOutputTypes(request: ReadResultOutputTypesRequest): BaseResp = { info(s"doReadResultOutputTypes($request)") authenticateAndThen(request) { authResult => val resultOutputTypes = ResultOutputType.nonErrorTypes ++ breakdownTypes //TODO: XXX: HACK: Would like to remove the cast ReadResultOutputTypesResponse(resultOutputTypes).asInstanceOf[BaseResp] } } protected def doFlagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { authenticateAndThen(request) { authResult => QepQueryDb.db.insertQepQueryFlag(request) doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast,authResult) } } protected def doUnFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { authenticateAndThen(request) { authResult => QepQueryDb.db.insertQepQueryFlag(request) doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast,authResult) } } protected def doRunQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor") //store the query in the qep's database doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast,authResult) } } protected def doReadQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doReadQueryDefinition($request,$shouldBroadcast)") doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast,authResult) } } protected def doReadInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doReadInstanceResults($request,$shouldBroadcast)") val networkId = request.shrineNetworkQueryId //read from the QEP database code here. Only broadcast if some result is in some sketchy state val resultsFromDb: Seq[QueryResult] = QepQueryDb.db.selectMostRecentQepResultsFor(networkId) - //If any query result was pending - val response = if (resultsFromDb.nonEmpty && (!resultsFromDb.exists(!_.statusType.isDone))) { + debug(s"result states are ${resultsFromDb.map(_.statusType).mkString(" ")}") + + def shouldAskAdapters:Boolean = { + if (resultsFromDb.isEmpty) false //Don't ask if no results exist. + else + resultsFromDb.forall(_.statusType.isCrcCallCompleted) && //Be sure every CRC has replied to its adapter - hack to deal with this request going to all adapters + resultsFromDb.exists(_.statusType.crcPromisedToFinishAfterReply) + } + + val response = if (!shouldAskAdapters) { debug(s"Using qep cached results for query $networkId") AggregatedReadInstanceResultsResponse(networkId, resultsFromDb).asInstanceOf[BaseResp] } else { - debug(s"Requesting results for $networkId from network") + info(s"Requesting results for $networkId from network") val response = doBroadcastQuery(request, new ReadInstanceResultsAggregator(networkId, false), shouldBroadcast,authResult) //put the new results in the database if we got what we wanted response match { case arirr: AggregatedReadInstanceResultsResponse => arirr.results.foreach(r => QepQueryDb.db.insertQueryResult(networkId, r)) - case _ => //do nothing + case _ => warn(s"Response was a ${response.getClass.getSimpleName}, not a ${classOf[AggregatedReadInstanceResultsResponse].getSimpleName}: $response") } - response } response } } protected def doReadQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doReadQueryInstances($request,$shouldBroadcast)") val now = XmlDateHelper.now val networkQueryId = request.networkQueryId val username = request.authn.username val groupId = request.projectId //NB: Return a dummy response, with a dummy QueryInstance containing the network (Shrine) id of the query we'd like //to get "instances" for. This allows the legacy web client to formulate a request for query results that Shrine //can understand, while meeting the conversational requirements of the legacy web client. val instance = QueryInstance(networkQueryId.toString, networkQueryId.toString, username, groupId, now, now) //TODO: XXX: HACK: Would like to remove the cast //NB: Munge in username from authentication result ReadQueryInstancesResponse(networkQueryId, authResult.username, groupId, Seq(instance)).asInstanceOf[BaseResp] } } protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): ReadPreviousQueriesResponse = { authenticateAndThen(request){ authResult => info(s"doReadPreviousQueries($request,$shouldBroadcast)") //todo if any results are in one of the pending states go ahead and request them async (has to wait for async Shrine 1.24) //pull queries from the local database. QepQueryDb.db.selectPreviousQueries(request) } } protected def doRenameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doRenameQuery($request,$shouldBroadcast)") QepQueryDb.db.renamePreviousQuery(request) doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast,authResult) } } protected def doDeleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doDeleteQuery($request,$shouldBroadcast)") QepQueryDb.db.markDeleted(request) doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast,authResult) } } protected def doReadApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseResp = authenticateAndThen(request) { _ => info(s"doReadApprovedQueryTopics($request,$shouldBroadcast)") //TODO: XXX: HACK: Would like to remove the cast authorizationService.readApprovedEntries(request) match { case Left(errorResponse) => errorResponse.asInstanceOf[BaseResp] case Right(validResponse) => validResponse.asInstanceOf[BaseResp] } } import broadcastAndAggregationService.sendAndAggregate protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean, authResult:Authenticated): BaseResp = { debug(s"doBroadcastQuery($request) authResult is $authResult") //NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another //NB: Only audit RunQueryRequests //When making BroadcastMessages val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false)) /** Sends the query to the hub and starts a Future to watch for the results. Returns (almost) immediately. */ def queryHub( authorizedRequest: RunQueryRequest): Unit = { import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking info(s"Sending RunQueryRequest ${authorizedRequest.networkQueryId} to the Hub") //Future[Unit] that this code fires off to the hub, then handles the BaseShrineResponse sendAndAggregate(networkAuthn,authorizedRequest,aggregator,shouldBroadcast).transform ( { hubResponse:BaseShrineResponse => debug(s"Received $hubResponse for ${authorizedRequest.networkQueryId}") hubResponse match { case aggregated: AggregatedRunQueryResponse => info(s"Received ${aggregated.statusTypeName} and ignored results for ${authorizedRequest.networkQueryId}") blocking { //now that queries arrive at the QEP via a queue, no need to put them into the database. They can just fall on the floor //todo record the query's state in a way that will stop polling in 1.23. See SHRINE-2148 } case _ => IncorrectResponseFromHub(hubResponse,authorizedRequest.networkQueryId) } } , throwable => { throwable match { case NonFatal(t) => ExceptionWhileHubRanQuery(t, authorizedRequest.networkQueryId) case _ => //Let the infrastructure handle fatal exceptions } throwable } ) } request match { case runQueryRequest: RunQueryRequest => // inject modified, authorized runQueryRequest //although it might make more sense to put this whole if block in the aggregator, the RunQueryAggregator lives in the hub, far from this DB code //inject QEP NodeId auditAuthorizeAndThen(runQueryRequest.copy(nodeId = Some(nodeId))) { authorizedRequest => debug(s"doBroadcastQuery authorizedRequest is $authorizedRequest") // tuck the ACT audit metrics data into a database here if (collectQepAudit) QepAuditDb.db.insertQepQuery(authorizedRequest,commonName) QepQueryDb.db.insertQepQuery(authorizedRequest) queryHub(authorizedRequest) val response = AggregatedRunQueryResponse( queryId = authorizedRequest.networkQueryId, createDate = XmlDateHelper.now, userId = networkAuthn.username, groupId = networkAuthn.domain, requestXml = authorizedRequest.queryDefinition, queryInstanceId = authorizedRequest.networkQueryId, results = Seq.empty, statusTypeName = "RECEIVED_BY_QEP" //todo figure out the right statuses for 1.23. See SHRINE-2148 ) response.asInstanceOf[BaseResp] } case _ => doSynchronousQuery(networkAuthn,request,aggregator,shouldBroadcast) } } private def doSynchronousQuery(networkAuthn: AuthenticationInfo,request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean) = { info(s"doSynchronousQuery($request) started") val response = waitFor(sendAndAggregate(networkAuthn, request, aggregator, shouldBroadcast)).asInstanceOf[BaseResp] info(s"doSynchronousQuery($request) completed with response $response") response } private[qep] val runQueryAggregatorFor: RunQueryRequest => RunQueryAggregator = Aggregators.forRunQueryRequest(includeAggregateResult) protected def waitFor[R](futureResponse: Future[R]): R = { XmlDateHelper.time("Waiting for aggregated results")(debug(_)) { Await.result(futureResponse, queryTimeout) } } private[qep] def auditAuthorizeAndThen[T](request: RunQueryRequest)(body: (RunQueryRequest => T)): T = { auditTransactionally(request) { debug(s"auditAuthorizeAndThen($request) with $authorizationService") val authorizedRequest = authorizationService.authorizeRunQueryRequest(request) match { case na: NotAuthorized => throw na.toException case authorized: Authorized => request.copy(topicName = authorized.topicIdAndName.map(x => x._2)) } body(authorizedRequest) } } private[qep] def auditTransactionally[T](request: RunQueryRequest)(body: => T): T = { try { body } finally { auditDao.addAuditEntry( request.projectId, request.authn.domain, request.authn.username, request.queryDefinition.toI2b2String, //TODO: Use i2b2 format Still? request.topicId) } } //todo move auth code with SHRINE-1322 import AuthenticationResult._ private[qep] def authenticateAndThen[T](request: BaseShrineRequest)(f: Authenticated => T): T = { val authResult = authenticator.authenticate(request.authn) authResult match { case a: Authenticated => f(a) case na:NotAuthenticated => throw NotAuthenticatedException(na) } } } case class ExceptionWhileHubRanQuery(t: Throwable,networkQueryId: NetworkQueryId) extends AbstractProblem(ProblemSources.Qep) { override val throwable = Some(t) override def summary: String = s"${t.getClass.getSimpleName} encountered in an http call to run $networkQueryId query at the hub." override def description: String = "The QEP generated an exception while making an http call to run a query at the hub." override def detailsXml: NodeSeq = NodeSeq.fromSeq(
networkQueryId is {networkQueryId} {throwableDetail.getOrElse("")}
) } case class IncorrectResponseFromHub(hubResponse:BaseShrineResponse,networkQueryId: NetworkQueryId) extends AbstractProblem(ProblemSources.Qep) { override def summary: String = s"The hub responded to query $networkQueryId with a ${hubResponse.getClass.getSimpleName}, not a ${classOf[AggregatedRunQueryResponse].getSimpleName}" override def description: String = s"The hub responded with something other than a ${classOf[AggregatedRunQueryResponse].getSimpleName} to the QEP's run query request." override def detailsXml: NodeSeq = NodeSeq.fromSeq(
networkQueryId is {networkQueryId} hubResponse is {hubResponse}
) } \ No newline at end of file