diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala index 59a82a05a..3c8eb8dc5 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala @@ -1,289 +1,296 @@ package net.shrine.adapter import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import net.shrine.adapter.audit.AdapterAuditDb import net.shrine.problem.{AbstractProblem, ProblemSources} import scala.Option.option2Iterable import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.Try import scala.xml.NodeSeq import net.shrine.adapter.Obfuscator.obfuscateResults import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.model.Breakdown import net.shrine.adapter.dao.model.ShrineQueryResult import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, ErrorResponse, HasQueryResults, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse, BaseShrineRequest} import net.shrine.protocol.query.QueryDefinition import net.shrine.util.StackTrace import net.shrine.util.Tries.sequence import scala.concurrent.duration.Duration import net.shrine.client.Poster /** * @author clint * @since Nov 2, 2012 * */ object AbstractReadQueryResultAdapter { private final case class RawResponseAttempts(countResponseAttempt: Try[ReadResultResponse], breakdownResponseAttempts: Seq[Try[ReadResultResponse]]) private final case class SpecificResponseAttempts[R](responseAttempt: Try[R], breakdownResponseAttempts: Seq[Try[ReadResultResponse]]) } +//noinspection RedundantBlock abstract class AbstractReadQueryResultAdapter[Req <: BaseShrineRequest, Rsp <: ShrineResponse with HasQueryResults]( poster: Poster, override val hiveCredentials: HiveCredentials, dao: AdapterDao, doObfuscation: Boolean, getQueryId: Req => Long, getProjectId: Req => String, toResponse: (Long, QueryResult) => Rsp, breakdownTypes: Set[ResultOutputType], collectAdapterAudit:Boolean ) extends WithHiveCredentialsAdapter(hiveCredentials) { //TODO: Make this configurable private val numThreads = math.max(5, Runtime.getRuntime.availableProcessors) //TODO: Use scala.concurrent.ExecutionContext.Implicits.global instead? private lazy val executorService = Executors.newFixedThreadPool(numThreads) private lazy val executionContext = ExecutionContext.fromExecutorService(executorService) override def shutdown() { try { executorService.shutdown() executorService.awaitTermination(5, TimeUnit.SECONDS) } finally { executorService.shutdownNow() super.shutdown() } } import AbstractReadQueryResultAdapter._ override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = { val req = message.request.asInstanceOf[Req] val queryId = getQueryId(req) def findShrineQueryRow = dao.findQueryByNetworkId(queryId) def findShrineQueryResults = dao.findResultsFor(queryId) findShrineQueryRow match { case None => { debug(s"Query $queryId not found in the Shrine DB") - - errorResponse(queryId) + + ErrorResponse(QueryNotFound(queryId)) } case Some(shrineQueryRow) => { findShrineQueryResults match { case None => { - debug(s"Query $queryId found but its results are not available yet") + debug(s"Query $queryId found but its results are not available") //TODO: When precisely can this happen? Should we go back to the CRC here? - errorResponse(queryId) + ErrorResponse(QueryResultNotAvailable(queryId)) } case Some(shrineQueryResult) => { if (shrineQueryResult.isDone) { debug(s"Query $queryId is done and already stored, returning stored results") makeResponseFrom(queryId, shrineQueryResult) } else { debug(s"Query $queryId is incomplete, asking CRC for results") val result: ShrineResponse = retrieveQueryResults(queryId, req, shrineQueryResult, message) if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(queryId,result) result } } } } } } - private def errorResponse(queryId: Long) = ErrorResponse(QueryNotFound(queryId)) - private def makeResponseFrom(queryId: Long, shrineQueryResult: ShrineQueryResult): ShrineResponse = { - shrineQueryResult.toQueryResults(doObfuscation).map(toResponse(queryId, _)).getOrElse(errorResponse(queryId)) + shrineQueryResult.toQueryResults(doObfuscation).map(toResponse(queryId, _)).getOrElse(ErrorResponse(QueryNotFound(queryId))) } private def retrieveQueryResults(queryId: Long, req: Req, shrineQueryResult: ShrineQueryResult, message: BroadcastMessage): ShrineResponse = { //NB: If the requested query was not finished executing on the i2b2 side when Shrine recorded it, attempt to //retrieve it and all its sub-components (breakdown results, if any) in parallel. Asking for the results in //parallel is quite possibly too clever, but may be faster than asking for them serially. //TODO: Review this. //Make requests for results in parallel val futureResponses = scatter(message.networkAuthn, req, shrineQueryResult) //Gather all the results (block until they're all returned) val SpecificResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = gather(queryId, futureResponses, req.waitTime) countResponseAttempt match { //If we successfully received the parent response (the one with query type PATIENT_COUNT_XML), re-store it along //with any retrieved breakdowns before returning it. case Success(countResponse) => { //NB: Only store the result if needed, that is, if all results are done //TODO: REVIEW THIS storeResultIfNecessary(shrineQueryResult, countResponse, req.authn, queryId, getFailedBreakdownTypes(breakdownResponseAttempts)) countResponse } - case Failure(e) => ErrorResponse(s"Couldn't retrieve query with id '$queryId' from the CRC: exception message follows: ${e.getMessage} stack trace: ${StackTrace.stackTraceAsString(e)}") + case Failure(e) => ErrorResponse(CouldNotRetrieveQueryFromCrc(queryId,e)) } } private def scatter(authn: AuthenticationInfo, req: Req, shrineQueryResult: ShrineQueryResult): Future[RawResponseAttempts] = { def makeRequest(localResultId: Long) = ReadResultRequest(hiveCredentials.projectId, req.waitTime, hiveCredentials.toAuthenticationInfo, localResultId.toString) def process(localResultId: Long): ShrineResponse = { delegateResultRetrievingAdapter.process(authn, makeRequest(localResultId)) } implicit val executionContext = this.executionContext import scala.concurrent.blocking def futureBlockingAttempt[T](f: => T): Future[Try[T]] = Future(blocking(Try(f))) val futureCountAttempt: Future[Try[ShrineResponse]] = futureBlockingAttempt { process(shrineQueryResult.count.localId) } val futureBreakdownAttempts = Future.sequence(for { Breakdown(_, localResultId, resultType, data) <- shrineQueryResult.breakdowns } yield futureBlockingAttempt { process(localResultId) }) //Log errors retrieving count futureCountAttempt.collect { case Success(e: ErrorResponse) => error(s"Error requesting count result from the CRC: '$e'") case Failure(e) => error(s"Error requesting count result from the CRC: ", e) } //Log errors retrieving breakdown for { breakdownResponseAttempts <- futureBreakdownAttempts } { breakdownResponseAttempts.collect { case Success(e: ErrorResponse) => error(s"Error requesting breakdown result from the CRC: '$e'") case Failure(e) => error(s"Error requesting breakdown result from the CRC: ", e) } } //"Filter" for non-ErrorResponses val futureNonErrorCountAttempt: Future[Try[ReadResultResponse]] = futureCountAttempt.collect { case Success(resp: ReadResultResponse) => Success(resp) //NB: Need to repackage response here to avoid ugly, obscure, superfluous cast case unexpected => Failure(new Exception(s"Getting count result failed. Response is: '$unexpected'")) } //"Filter" for non-ErrorResponses val futureNonErrorBreakdownResponseAttempts: Future[Seq[Try[ReadResultResponse]]] = for { breakdownResponseAttempts <- futureBreakdownAttempts } yield { breakdownResponseAttempts.collect { case Success(resp: ReadResultResponse) => Try(resp) } } for { countResponseAttempt <- futureNonErrorCountAttempt breakdownResponseAttempts <- futureNonErrorBreakdownResponseAttempts } yield { RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) } } private def gather(queryId: Long, futureResponses: Future[RawResponseAttempts], waitTime: Duration): SpecificResponseAttempts[Rsp] = { val RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = Await.result(futureResponses, waitTime) //Log any failures (countResponseAttempt +: breakdownResponseAttempts).collect { case Failure(e) => e }.foreach(error("Error retrieving result from the CRC: ", _)) //NB: Count response and ALL breakdown responses must be available (not Failures) or else a Failure will be returned val responseAttempt = for { countResponse: ReadResultResponse <- countResponseAttempt countQueryResult = countResponse.metadata breakdownResponses: Seq[ReadResultResponse] <- sequence(breakdownResponseAttempts) } yield { - val localCountResultId = countResponse.metadata.resultId - val breakdownsByType = (for { breakdownResponse <- breakdownResponses resultType <- breakdownResponse.metadata.resultType } yield resultType -> breakdownResponse.data).toMap val queryResultWithBreakdowns = countQueryResult.withBreakdowns(breakdownsByType) val queryResultToReturn = if(doObfuscation) Obfuscator.obfuscate(queryResultWithBreakdowns) else queryResultWithBreakdowns toResponse(queryId, queryResultToReturn) } SpecificResponseAttempts(responseAttempt, breakdownResponseAttempts) } private def getFailedBreakdownTypes(attempts: Seq[Try[ReadResultResponse]]): Set[ResultOutputType] = { val successfulBreakdownTypes = attempts.collect { case Success(ReadResultResponse(_, metadata, _)) => metadata.resultType }.flatten breakdownTypes -- successfulBreakdownTypes } private def storeResultIfNecessary(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) { val responseIsDone = response.results.forall(_.statusType.isDone) if (responseIsDone) { storeResult(shrineQueryResult, response, authn, queryId, failedBreakdownTypes) } } private def storeResult(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) { val rawResults = response.results val obfuscatedResults = obfuscateResults(doObfuscation)(response.results) for { shrineQuery <- dao.findQueryByNetworkId(queryId) queryResult <- rawResults.headOption obfuscatedQueryResult <- obfuscatedResults.headOption } { val queryDefinition = QueryDefinition(shrineQuery.name, shrineQuery.queryDefinition.expr) dao.inTransaction { dao.deleteQuery(queryId) dao.storeResults(authn, shrineQueryResult.localId, queryId, queryDefinition, rawResults, obfuscatedResults, failedBreakdownTypes.toSeq, queryResult.breakdowns, obfuscatedQueryResult.breakdowns) } } } private type Unmarshaller[R] = Set[ResultOutputType] => NodeSeq => Try[R] private final class DelegateAdapter[Rqst <: ShrineRequest, Rspns <: ShrineResponse](unmarshaller: Unmarshaller[Rspns]) extends CrcAdapter[Rqst, Rspns](poster, hiveCredentials) { def process(authn: AuthenticationInfo, req: Rqst): Rspns = processRequest(BroadcastMessage(authn, req)).asInstanceOf[Rspns] override protected def parseShrineResponse(xml: NodeSeq): ShrineResponse = unmarshaller(breakdownTypes)(xml).get //TODO: Avoid .get call } private lazy val delegateResultRetrievingAdapter = new DelegateAdapter[ReadResultRequest, ReadResultResponse](ReadResultResponse.fromI2b2 _) } case class QueryNotFound(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) { override def summary: String = s"Query not found" override def description:String = s"No query with id $queryId found on ${stamp.host.getHostName}" } +case class QueryResultNotAvailable(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) { + override def summary: String = s"Query $queryId found but its results are not available yet" + override def description:String = s"Query $queryId found but its results are not available yet on ${stamp.host.getHostName}" +} + +case class CouldNotRetrieveQueryFromCrc(queryId:Long,x: Throwable) extends AbstractProblem(ProblemSources.Adapter) { + override def summary: String = s"Could not retrieve query $queryId from the CRC" + override def description:String = s"Unhandled exception while retrieving query $queryId while retrieving it from the CRC on ${stamp.host.getHostName}" + override def throwable = Some(x) +} diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/HeldQueries.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/HeldQueries.scala index a2d58af4d..941a014ce 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/HeldQueries.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/HeldQueries.scala @@ -1,60 +1,60 @@ package net.shrine.adapter.components import net.shrine.protocol.RunHeldQueryRequest import net.shrine.protocol.ShrineResponse import net.shrine.protocol.BroadcastMessage import net.shrine.adapter.dao.model.ShrineQueryResult import net.shrine.protocol.RunQueryRequest import net.shrine.protocol.ResultOutputType import net.shrine.protocol.ErrorResponse import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.RunQueryResponse import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.RunQueryAdapter import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.Credential /** * @author clint - * @date May 2, 2014 + * @since May 2, 2014 */ final case class HeldQueries(dao: AdapterDao, runQueryAdapter: RunQueryAdapter) { def run(req: RunHeldQueryRequest): ShrineResponse = { val queryId = req.networkQueryId //TODO: Revisit this, we might want to store/retrieve the output types used originally. val outputTypes: Set[ResultOutputType] = Set(ResultOutputType.PATIENT_COUNT_XML) dao.findQueryByNetworkId(queryId) match { case Some(savedQuery) => { //Re-un the query with the original credentials, not an admin's val savedAuthn = AuthenticationInfo(savedQuery.domain, savedQuery.username, Credential("", false)) val runQueryReq = RunQueryRequest( req.projectId, req.waitTime, savedAuthn, queryId, topicId = None, topicName = None, outputTypes, savedQuery.queryDefinition) val newBroadcastMessage = BroadcastMessage(savedAuthn, runQueryReq) dao.inTransaction { //Delete previous records for this query from the DB, so we don't have obsolete records with //SHRINE_QUERY.HAS_BEEN_RUN = false for queries like the current one that ended up getting run. //Invoking runQueryAdapter.processRequest() will add correct values to the DB for the //actually-got-run query. dao.deleteQueryResultsFor(queryId) dao.deleteQuery(queryId) runQueryAdapter.processRequest(newBroadcastMessage) } } - case None => ErrorResponse(s"Couldn't find query qith networkQueryId '${}'") + case None => ErrorResponse(s"Couldn't find query with networkQueryId '${}'") } } } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala index d784c48ea..e644441a4 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala @@ -1,375 +1,375 @@ package net.shrine.adapter import scala.xml.NodeSeq import net.shrine.util.ShouldMatchersForJUnit import ObfuscatorTest.within3 import javax.xml.datatype.XMLGregorianCalendar import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest import net.shrine.client.HttpClient import net.shrine.client.HttpResponse import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, CrcRequest, Credential, ErrorResponse, I2b2ResultEnvelope, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse, BaseShrineResponse, BaseShrineRequest, RunQueryRequest, RunQueryResponse, DefaultBreakdownResultOutputTypes} import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_AGE_COUNT_XML import net.shrine.protocol.ResultOutputType.PATIENT_COUNT_XML import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML import net.shrine.protocol.query.{QueryDefinition, Term} import net.shrine.util.XmlDateHelper import net.shrine.util.XmlDateHelper.now import net.shrine.util.XmlGcEnrichments import net.shrine.client.Poster import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.adapter.translators.ExpressionTranslator import scala.util.Success /** * @author clint * @since Nov 8, 2012 */ //noinspection UnitMethodIsParameterless abstract class AbstractQueryRetrievalTestCase[R <: BaseShrineResponse]( makeAdapter: (AdapterDao, HttpClient) => WithHiveCredentialsAdapter, makeRequest: (Long, AuthenticationInfo) => BaseShrineRequest, extractor: R => Option[(Long, QueryResult)]) extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit { private val authn = AuthenticationInfo("some-domain", "some-user", Credential("alskdjlkasd", false)) def doTestProcessRequestMissingQuery { val adapter = makeAdapter(dao, MockHttpClient) val response = adapter.processRequest(BroadcastMessage(0L, authn, makeRequest(-1L, authn))) response.isInstanceOf[ErrorResponse] should be(true) } def doTestProcessInvalidRequest { val adapter = makeAdapter(dao, MockHttpClient) intercept[ClassCastException] { //request must be a type of request we can handle adapter.processRequest(BroadcastMessage(0L, authn, new AbstractQueryRetrievalTestCase.BogusRequest)) } } private val localMasterId = "alksjdkalsdjlasdjlkjsad" private val shrineNetworkQueryId = 123L private def doGetResults(adapter: Adapter) = adapter.processRequest(BroadcastMessage(shrineNetworkQueryId, authn, makeRequest(shrineNetworkQueryId, authn))) private def toMillis(xmlGc: XMLGregorianCalendar): Long = xmlGc.toGregorianCalendar.getTimeInMillis private val instanceId = 999L private val setSize = 12345L private val obfSetSize = setSize + 1 private val queryExpr = Term("foo") private val topicId = "laskdjlkasd" private val fooQuery = QueryDefinition("some-query",queryExpr) def doTestProcessRequestIncompleteQuery(countQueryShouldWork: Boolean = true): Unit = afterCreatingTables { val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None) import ResultOutputType._ import XmlDateHelper.now val breakdowns = Map(PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L))) val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1)) val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResultId = 456L val breakdownResultId = 98237943265436L val incompleteCountResult = QueryResult( resultId = countResultId, instanceId = instanceId, resultType = Some(PATIENT_COUNT_XML), setSize = setSize, startDate = Option(startDate), endDate = Option(endDate), description = Some("results from node X"), statusType = QueryResult.StatusType.Processing, statusMessage = None, breakdowns = breakdowns) val breakdownResult = breakdowns.head match { case (resultType, data) => incompleteCountResult.withId(breakdownResultId).withBreakdowns(Map(resultType -> data)).withResultType(resultType) } val queryStartDate = now val idsByResultType = dao.insertQueryResults(dbQueryId, incompleteCountResult :: breakdownResult :: Nil) final class MightWorkMockHttpClient(expectedHiveCredentials: HiveCredentials) extends HttpClient { override def post(input: String, url: String): HttpResponse = { def makeFinished(queryResult: QueryResult) = queryResult.copy(statusType = QueryResult.StatusType.Finished) def validateAuthnAndProjectId(req: ShrineRequest) { req.authn should equal(expectedHiveCredentials.toAuthenticationInfo) req.projectId should equal(expectedHiveCredentials.projectId) } val response = CrcRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input) match { case Success(req: ReadResultRequest) if req.localResultId == countResultId.toString => { validateAuthnAndProjectId(req) if (countQueryShouldWork) { ReadResultResponse(123L, makeFinished(incompleteCountResult), I2b2ResultEnvelope(PATIENT_COUNT_XML, Map(PATIENT_COUNT_XML.name -> incompleteCountResult.setSize))) } else { ErrorResponse("Retrieving count result failed") } } case Success(req: ReadResultRequest) if req.localResultId == breakdownResultId.toString => { validateAuthnAndProjectId(req) ReadResultResponse(123L, makeFinished(breakdownResult), breakdowns.head._2) } case _ => fail(s"Unknown input: $input") } HttpResponse.ok(response.toI2b2String) } } val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, new MightWorkMockHttpClient(AbstractQueryRetrievalTestCase.hiveCredentials)) def getResults = doGetResults(adapter) getResults.isInstanceOf[ErrorResponse] should be(true) dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize) dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns) //The query shouldn't be 'done', since its status is PROCESSING dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Processing) //Now, calling processRequest (via getResults) should cause the query to be re-retrieved from the CRC val result = getResults.asInstanceOf[R] //Which should cause the query to be re-stored with a 'done' status (since that's what our mock CRC returns) val expectedStatusType = if (countQueryShouldWork) QueryResult.StatusType.Finished else QueryResult.StatusType.Processing dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(expectedStatusType) if (!countQueryShouldWork) { result.isInstanceOf[ErrorResponse] should be(true) } else { val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result) actualNetworkQueryId should equal(shrineNetworkQueryId) import ObfuscatorTest.within3 actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML)) within3(setSize, actualQueryResult.setSize) should be(true) actualQueryResult.description should be(Some("results from node X")) actualQueryResult.statusType should equal(QueryResult.StatusType.Finished) actualQueryResult.statusMessage should be(Some(QueryResult.StatusType.Finished.name)) actualQueryResult.breakdowns.foreach { case (rt, I2b2ResultEnvelope(_, data)) => { data.forall { case (key, value) => within3(value, breakdowns.get(rt).get.data.get(key).get) } } } for { startDate <- actualQueryResult.startDate endDate <- actualQueryResult.endDate } { val actualElapsed = toMillis(endDate) - toMillis(startDate) actualElapsed should equal(elapsed) } } } def doTestProcessRequestQueuedQuery: Unit = afterCreatingTables { import ResultOutputType._ import XmlDateHelper.now val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResultId = 456L val incompleteCountResult = QueryResult(-1L, -1L, Some(PATIENT_COUNT_XML), -1L, Option(startDate), Option(endDate), Some("results from node X"), QueryResult.StatusType.Queued, None) dao.inTransaction { val insertedQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None) //NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in //AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete //queries, will work. val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(incompleteCountResult)) val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head dao.insertCountResult(countQueryResultId, -1L, -1L) } val queryStartDate = now object MockHttpClient extends HttpClient { override def post(input: String, url: String): HttpResponse = ??? } val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, MockHttpClient) def getResults = doGetResults(adapter) getResults.isInstanceOf[ErrorResponse] should be(true) //The query shouldn't be 'done', since its status is QUEUED dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued) //Now, calling processRequest (via getResults) should NOT cause the query to be re-retrieved from the CRC, because the query was previously queued val result = getResults result.isInstanceOf[ErrorResponse] should be(true) dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued) } def doTestProcessRequest = afterCreatingTables { val adapter = makeAdapter(dao, MockHttpClient) def getResults = doGetResults(adapter) getResults match { case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryNotFound].getName) case x => fail(s"Got $x, not an ErrorResponse") } val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None) getResults match { - case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryNotFound].getName) + case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryResultNotAvailable].getName) case x => fail(s"Got $x, not an ErrorResponse") } import ResultOutputType._ import XmlDateHelper.now val breakdowns = Map( PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L)), PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("x" -> 3L, "y" -> 4L))) val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1)) val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResult = QueryResult( resultId = 456L, instanceId = instanceId, resultType = Some(PATIENT_COUNT_XML), setSize = setSize, startDate = Option(startDate), endDate = Option(endDate), description = Some("results from node X"), statusType = QueryResult.StatusType.Finished, statusMessage = None, breakdowns = breakdowns ) val breakdownResults = breakdowns.map { case (resultType, data) => countResult.withBreakdowns(Map(resultType -> data)).withResultType(resultType) }.toSeq val queryStartDate = now val idsByResultType = dao.insertQueryResults(dbQueryId, countResult +: breakdownResults) getResults.isInstanceOf[ErrorResponse] should be(true) dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize) dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns) val result = getResults.asInstanceOf[R] val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result) actualNetworkQueryId should equal(shrineNetworkQueryId) actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML)) actualQueryResult.setSize should equal(obfSetSize) actualQueryResult.description should be(None) //TODO: This is probably wrong actualQueryResult.statusType should equal(QueryResult.StatusType.Finished) actualQueryResult.statusMessage should be(None) actualQueryResult.breakdowns should equal(obfscBreakdowns) for { startDate <- actualQueryResult.startDate endDate <- actualQueryResult.endDate } { val actualElapsed = toMillis(endDate) - toMillis(startDate) actualElapsed should equal(elapsed) } } } object AbstractQueryRetrievalTestCase { val hiveCredentials = HiveCredentials("some-hive-domain", "hive-username", "hive-password", "hive-project") val doObfuscation = true def runQueryAdapter(dao: AdapterDao, poster: Poster): RunQueryAdapter = { val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("foo" -> Set("bar")))) new RunQueryAdapter( poster, dao, AbstractQueryRetrievalTestCase.hiveCredentials, translator, 10000, doObfuscation, runQueriesImmediately = true, DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false ) } import scala.concurrent.duration._ final class BogusRequest extends ShrineRequest("fooProject", 1.second, null) { override val requestType = null protected override def i2b2MessageBody: NodeSeq = override def toXml = } } \ No newline at end of file