diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
index 59a82a05a..3c8eb8dc5 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
@@ -1,289 +1,296 @@
package net.shrine.adapter
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import net.shrine.adapter.audit.AdapterAuditDb
import net.shrine.problem.{AbstractProblem, ProblemSources}
import scala.Option.option2Iterable
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.xml.NodeSeq
import net.shrine.adapter.Obfuscator.obfuscateResults
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.dao.model.Breakdown
import net.shrine.adapter.dao.model.ShrineQueryResult
import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, ErrorResponse, HasQueryResults, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse, BaseShrineRequest}
import net.shrine.protocol.query.QueryDefinition
import net.shrine.util.StackTrace
import net.shrine.util.Tries.sequence
import scala.concurrent.duration.Duration
import net.shrine.client.Poster
/**
* @author clint
* @since Nov 2, 2012
*
*/
object AbstractReadQueryResultAdapter {
private final case class RawResponseAttempts(countResponseAttempt: Try[ReadResultResponse], breakdownResponseAttempts: Seq[Try[ReadResultResponse]])
private final case class SpecificResponseAttempts[R](responseAttempt: Try[R], breakdownResponseAttempts: Seq[Try[ReadResultResponse]])
}
+//noinspection RedundantBlock
abstract class AbstractReadQueryResultAdapter[Req <: BaseShrineRequest, Rsp <: ShrineResponse with HasQueryResults](
poster: Poster,
override val hiveCredentials: HiveCredentials,
dao: AdapterDao,
doObfuscation: Boolean,
getQueryId: Req => Long,
getProjectId: Req => String,
toResponse: (Long, QueryResult) => Rsp,
breakdownTypes: Set[ResultOutputType],
collectAdapterAudit:Boolean
) extends WithHiveCredentialsAdapter(hiveCredentials) {
//TODO: Make this configurable
private val numThreads = math.max(5, Runtime.getRuntime.availableProcessors)
//TODO: Use scala.concurrent.ExecutionContext.Implicits.global instead?
private lazy val executorService = Executors.newFixedThreadPool(numThreads)
private lazy val executionContext = ExecutionContext.fromExecutorService(executorService)
override def shutdown() {
try {
executorService.shutdown()
executorService.awaitTermination(5, TimeUnit.SECONDS)
} finally {
executorService.shutdownNow()
super.shutdown()
}
}
import AbstractReadQueryResultAdapter._
override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = {
val req = message.request.asInstanceOf[Req]
val queryId = getQueryId(req)
def findShrineQueryRow = dao.findQueryByNetworkId(queryId)
def findShrineQueryResults = dao.findResultsFor(queryId)
findShrineQueryRow match {
case None => {
debug(s"Query $queryId not found in the Shrine DB")
-
- errorResponse(queryId)
+
+ ErrorResponse(QueryNotFound(queryId))
}
case Some(shrineQueryRow) => {
findShrineQueryResults match {
case None => {
- debug(s"Query $queryId found but its results are not available yet")
+ debug(s"Query $queryId found but its results are not available")
//TODO: When precisely can this happen? Should we go back to the CRC here?
- errorResponse(queryId)
+ ErrorResponse(QueryResultNotAvailable(queryId))
}
case Some(shrineQueryResult) => {
if (shrineQueryResult.isDone) {
debug(s"Query $queryId is done and already stored, returning stored results")
makeResponseFrom(queryId, shrineQueryResult)
} else {
debug(s"Query $queryId is incomplete, asking CRC for results")
val result: ShrineResponse = retrieveQueryResults(queryId, req, shrineQueryResult, message)
if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(queryId,result)
result
}
}
}
}
}
}
- private def errorResponse(queryId: Long) = ErrorResponse(QueryNotFound(queryId))
-
private def makeResponseFrom(queryId: Long, shrineQueryResult: ShrineQueryResult): ShrineResponse = {
- shrineQueryResult.toQueryResults(doObfuscation).map(toResponse(queryId, _)).getOrElse(errorResponse(queryId))
+ shrineQueryResult.toQueryResults(doObfuscation).map(toResponse(queryId, _)).getOrElse(ErrorResponse(QueryNotFound(queryId)))
}
private def retrieveQueryResults(queryId: Long, req: Req, shrineQueryResult: ShrineQueryResult, message: BroadcastMessage): ShrineResponse = {
//NB: If the requested query was not finished executing on the i2b2 side when Shrine recorded it, attempt to
//retrieve it and all its sub-components (breakdown results, if any) in parallel. Asking for the results in
//parallel is quite possibly too clever, but may be faster than asking for them serially.
//TODO: Review this.
//Make requests for results in parallel
val futureResponses = scatter(message.networkAuthn, req, shrineQueryResult)
//Gather all the results (block until they're all returned)
val SpecificResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = gather(queryId, futureResponses, req.waitTime)
countResponseAttempt match {
//If we successfully received the parent response (the one with query type PATIENT_COUNT_XML), re-store it along
//with any retrieved breakdowns before returning it.
case Success(countResponse) => {
//NB: Only store the result if needed, that is, if all results are done
//TODO: REVIEW THIS
storeResultIfNecessary(shrineQueryResult, countResponse, req.authn, queryId, getFailedBreakdownTypes(breakdownResponseAttempts))
countResponse
}
- case Failure(e) => ErrorResponse(s"Couldn't retrieve query with id '$queryId' from the CRC: exception message follows: ${e.getMessage} stack trace: ${StackTrace.stackTraceAsString(e)}")
+ case Failure(e) => ErrorResponse(CouldNotRetrieveQueryFromCrc(queryId,e))
}
}
private def scatter(authn: AuthenticationInfo, req: Req, shrineQueryResult: ShrineQueryResult): Future[RawResponseAttempts] = {
def makeRequest(localResultId: Long) = ReadResultRequest(hiveCredentials.projectId, req.waitTime, hiveCredentials.toAuthenticationInfo, localResultId.toString)
def process(localResultId: Long): ShrineResponse = {
delegateResultRetrievingAdapter.process(authn, makeRequest(localResultId))
}
implicit val executionContext = this.executionContext
import scala.concurrent.blocking
def futureBlockingAttempt[T](f: => T): Future[Try[T]] = Future(blocking(Try(f)))
val futureCountAttempt: Future[Try[ShrineResponse]] = futureBlockingAttempt {
process(shrineQueryResult.count.localId)
}
val futureBreakdownAttempts = Future.sequence(for {
Breakdown(_, localResultId, resultType, data) <- shrineQueryResult.breakdowns
} yield futureBlockingAttempt {
process(localResultId)
})
//Log errors retrieving count
futureCountAttempt.collect {
case Success(e: ErrorResponse) => error(s"Error requesting count result from the CRC: '$e'")
case Failure(e) => error(s"Error requesting count result from the CRC: ", e)
}
//Log errors retrieving breakdown
for {
breakdownResponseAttempts <- futureBreakdownAttempts
} {
breakdownResponseAttempts.collect {
case Success(e: ErrorResponse) => error(s"Error requesting breakdown result from the CRC: '$e'")
case Failure(e) => error(s"Error requesting breakdown result from the CRC: ", e)
}
}
//"Filter" for non-ErrorResponses
val futureNonErrorCountAttempt: Future[Try[ReadResultResponse]] = futureCountAttempt.collect {
case Success(resp: ReadResultResponse) => Success(resp) //NB: Need to repackage response here to avoid ugly, obscure, superfluous cast
case unexpected => Failure(new Exception(s"Getting count result failed. Response is: '$unexpected'"))
}
//"Filter" for non-ErrorResponses
val futureNonErrorBreakdownResponseAttempts: Future[Seq[Try[ReadResultResponse]]] = for {
breakdownResponseAttempts <- futureBreakdownAttempts
} yield {
breakdownResponseAttempts.collect {
case Success(resp: ReadResultResponse) => Try(resp)
}
}
for {
countResponseAttempt <- futureNonErrorCountAttempt
breakdownResponseAttempts <- futureNonErrorBreakdownResponseAttempts
} yield {
RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts)
}
}
private def gather(queryId: Long, futureResponses: Future[RawResponseAttempts], waitTime: Duration): SpecificResponseAttempts[Rsp] = {
val RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = Await.result(futureResponses, waitTime)
//Log any failures
(countResponseAttempt +: breakdownResponseAttempts).collect { case Failure(e) => e }.foreach(error("Error retrieving result from the CRC: ", _))
//NB: Count response and ALL breakdown responses must be available (not Failures) or else a Failure will be returned
val responseAttempt = for {
countResponse: ReadResultResponse <- countResponseAttempt
countQueryResult = countResponse.metadata
breakdownResponses: Seq[ReadResultResponse] <- sequence(breakdownResponseAttempts)
} yield {
- val localCountResultId = countResponse.metadata.resultId
-
val breakdownsByType = (for {
breakdownResponse <- breakdownResponses
resultType <- breakdownResponse.metadata.resultType
} yield resultType -> breakdownResponse.data).toMap
val queryResultWithBreakdowns = countQueryResult.withBreakdowns(breakdownsByType)
val queryResultToReturn = if(doObfuscation) Obfuscator.obfuscate(queryResultWithBreakdowns) else queryResultWithBreakdowns
toResponse(queryId, queryResultToReturn)
}
SpecificResponseAttempts(responseAttempt, breakdownResponseAttempts)
}
private def getFailedBreakdownTypes(attempts: Seq[Try[ReadResultResponse]]): Set[ResultOutputType] = {
val successfulBreakdownTypes = attempts.collect { case Success(ReadResultResponse(_, metadata, _)) => metadata.resultType }.flatten
breakdownTypes -- successfulBreakdownTypes
}
private def storeResultIfNecessary(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) {
val responseIsDone = response.results.forall(_.statusType.isDone)
if (responseIsDone) {
storeResult(shrineQueryResult, response, authn, queryId, failedBreakdownTypes)
}
}
private def storeResult(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) {
val rawResults = response.results
val obfuscatedResults = obfuscateResults(doObfuscation)(response.results)
for {
shrineQuery <- dao.findQueryByNetworkId(queryId)
queryResult <- rawResults.headOption
obfuscatedQueryResult <- obfuscatedResults.headOption
} {
val queryDefinition = QueryDefinition(shrineQuery.name, shrineQuery.queryDefinition.expr)
dao.inTransaction {
dao.deleteQuery(queryId)
dao.storeResults(authn, shrineQueryResult.localId, queryId, queryDefinition, rawResults, obfuscatedResults, failedBreakdownTypes.toSeq, queryResult.breakdowns, obfuscatedQueryResult.breakdowns)
}
}
}
private type Unmarshaller[R] = Set[ResultOutputType] => NodeSeq => Try[R]
private final class DelegateAdapter[Rqst <: ShrineRequest, Rspns <: ShrineResponse](unmarshaller: Unmarshaller[Rspns]) extends CrcAdapter[Rqst, Rspns](poster, hiveCredentials) {
def process(authn: AuthenticationInfo, req: Rqst): Rspns = processRequest(BroadcastMessage(authn, req)).asInstanceOf[Rspns]
override protected def parseShrineResponse(xml: NodeSeq): ShrineResponse = unmarshaller(breakdownTypes)(xml).get //TODO: Avoid .get call
}
private lazy val delegateResultRetrievingAdapter = new DelegateAdapter[ReadResultRequest, ReadResultResponse](ReadResultResponse.fromI2b2 _)
}
case class QueryNotFound(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) {
override def summary: String = s"Query not found"
override def description:String = s"No query with id $queryId found on ${stamp.host.getHostName}"
}
+case class QueryResultNotAvailable(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) {
+ override def summary: String = s"Query $queryId found but its results are not available yet"
+ override def description:String = s"Query $queryId found but its results are not available yet on ${stamp.host.getHostName}"
+}
+
+case class CouldNotRetrieveQueryFromCrc(queryId:Long,x: Throwable) extends AbstractProblem(ProblemSources.Adapter) {
+ override def summary: String = s"Could not retrieve query $queryId from the CRC"
+ override def description:String = s"Unhandled exception while retrieving query $queryId while retrieving it from the CRC on ${stamp.host.getHostName}"
+ override def throwable = Some(x)
+}
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/HeldQueries.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/HeldQueries.scala
index a2d58af4d..941a014ce 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/HeldQueries.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/HeldQueries.scala
@@ -1,60 +1,60 @@
package net.shrine.adapter.components
import net.shrine.protocol.RunHeldQueryRequest
import net.shrine.protocol.ShrineResponse
import net.shrine.protocol.BroadcastMessage
import net.shrine.adapter.dao.model.ShrineQueryResult
import net.shrine.protocol.RunQueryRequest
import net.shrine.protocol.ResultOutputType
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.query.QueryDefinition
import net.shrine.protocol.RunQueryResponse
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.RunQueryAdapter
import net.shrine.protocol.AuthenticationInfo
import net.shrine.protocol.Credential
/**
* @author clint
- * @date May 2, 2014
+ * @since May 2, 2014
*/
final case class HeldQueries(dao: AdapterDao, runQueryAdapter: RunQueryAdapter) {
def run(req: RunHeldQueryRequest): ShrineResponse = {
val queryId = req.networkQueryId
//TODO: Revisit this, we might want to store/retrieve the output types used originally.
val outputTypes: Set[ResultOutputType] = Set(ResultOutputType.PATIENT_COUNT_XML)
dao.findQueryByNetworkId(queryId) match {
case Some(savedQuery) => {
//Re-un the query with the original credentials, not an admin's
val savedAuthn = AuthenticationInfo(savedQuery.domain, savedQuery.username, Credential("", false))
val runQueryReq = RunQueryRequest(
req.projectId,
req.waitTime,
savedAuthn,
queryId,
topicId = None,
topicName = None,
outputTypes,
savedQuery.queryDefinition)
val newBroadcastMessage = BroadcastMessage(savedAuthn, runQueryReq)
dao.inTransaction {
//Delete previous records for this query from the DB, so we don't have obsolete records with
//SHRINE_QUERY.HAS_BEEN_RUN = false for queries like the current one that ended up getting run.
//Invoking runQueryAdapter.processRequest() will add correct values to the DB for the
//actually-got-run query.
dao.deleteQueryResultsFor(queryId)
dao.deleteQuery(queryId)
runQueryAdapter.processRequest(newBroadcastMessage)
}
}
- case None => ErrorResponse(s"Couldn't find query qith networkQueryId '${}'")
+ case None => ErrorResponse(s"Couldn't find query with networkQueryId '${}'")
}
}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala
index d784c48ea..e644441a4 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala
@@ -1,375 +1,375 @@
package net.shrine.adapter
import scala.xml.NodeSeq
import net.shrine.util.ShouldMatchersForJUnit
import ObfuscatorTest.within3
import javax.xml.datatype.XMLGregorianCalendar
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest
import net.shrine.client.HttpClient
import net.shrine.client.HttpResponse
import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, CrcRequest, Credential, ErrorResponse, I2b2ResultEnvelope, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse, BaseShrineResponse, BaseShrineRequest, RunQueryRequest, RunQueryResponse, DefaultBreakdownResultOutputTypes}
import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_AGE_COUNT_XML
import net.shrine.protocol.ResultOutputType.PATIENT_COUNT_XML
import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML
import net.shrine.protocol.query.{QueryDefinition, Term}
import net.shrine.util.XmlDateHelper
import net.shrine.util.XmlDateHelper.now
import net.shrine.util.XmlGcEnrichments
import net.shrine.client.Poster
import net.shrine.adapter.translators.QueryDefinitionTranslator
import net.shrine.adapter.translators.ExpressionTranslator
import scala.util.Success
/**
* @author clint
* @since Nov 8, 2012
*/
//noinspection UnitMethodIsParameterless
abstract class AbstractQueryRetrievalTestCase[R <: BaseShrineResponse](
makeAdapter: (AdapterDao, HttpClient) => WithHiveCredentialsAdapter,
makeRequest: (Long, AuthenticationInfo) => BaseShrineRequest,
extractor: R => Option[(Long, QueryResult)]) extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit {
private val authn = AuthenticationInfo("some-domain", "some-user", Credential("alskdjlkasd", false))
def doTestProcessRequestMissingQuery {
val adapter = makeAdapter(dao, MockHttpClient)
val response = adapter.processRequest(BroadcastMessage(0L, authn, makeRequest(-1L, authn)))
response.isInstanceOf[ErrorResponse] should be(true)
}
def doTestProcessInvalidRequest {
val adapter = makeAdapter(dao, MockHttpClient)
intercept[ClassCastException] {
//request must be a type of request we can handle
adapter.processRequest(BroadcastMessage(0L, authn, new AbstractQueryRetrievalTestCase.BogusRequest))
}
}
private val localMasterId = "alksjdkalsdjlasdjlkjsad"
private val shrineNetworkQueryId = 123L
private def doGetResults(adapter: Adapter) = adapter.processRequest(BroadcastMessage(shrineNetworkQueryId, authn, makeRequest(shrineNetworkQueryId, authn)))
private def toMillis(xmlGc: XMLGregorianCalendar): Long = xmlGc.toGregorianCalendar.getTimeInMillis
private val instanceId = 999L
private val setSize = 12345L
private val obfSetSize = setSize + 1
private val queryExpr = Term("foo")
private val topicId = "laskdjlkasd"
private val fooQuery = QueryDefinition("some-query",queryExpr)
def doTestProcessRequestIncompleteQuery(countQueryShouldWork: Boolean = true): Unit = afterCreatingTables {
val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None)
import ResultOutputType._
import XmlDateHelper.now
val breakdowns = Map(PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L)))
val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1))
val startDate = now
val elapsed = 100L
val endDate = {
import XmlGcEnrichments._
import scala.concurrent.duration._
startDate + elapsed.milliseconds
}
val countResultId = 456L
val breakdownResultId = 98237943265436L
val incompleteCountResult = QueryResult(
resultId = countResultId,
instanceId = instanceId,
resultType = Some(PATIENT_COUNT_XML),
setSize = setSize,
startDate = Option(startDate),
endDate = Option(endDate),
description = Some("results from node X"),
statusType = QueryResult.StatusType.Processing,
statusMessage = None,
breakdowns = breakdowns)
val breakdownResult = breakdowns.head match {
case (resultType, data) => incompleteCountResult.withId(breakdownResultId).withBreakdowns(Map(resultType -> data)).withResultType(resultType)
}
val queryStartDate = now
val idsByResultType = dao.insertQueryResults(dbQueryId, incompleteCountResult :: breakdownResult :: Nil)
final class MightWorkMockHttpClient(expectedHiveCredentials: HiveCredentials) extends HttpClient {
override def post(input: String, url: String): HttpResponse = {
def makeFinished(queryResult: QueryResult) = queryResult.copy(statusType = QueryResult.StatusType.Finished)
def validateAuthnAndProjectId(req: ShrineRequest) {
req.authn should equal(expectedHiveCredentials.toAuthenticationInfo)
req.projectId should equal(expectedHiveCredentials.projectId)
}
val response = CrcRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input) match {
case Success(req: ReadResultRequest) if req.localResultId == countResultId.toString => {
validateAuthnAndProjectId(req)
if (countQueryShouldWork) {
ReadResultResponse(123L, makeFinished(incompleteCountResult), I2b2ResultEnvelope(PATIENT_COUNT_XML, Map(PATIENT_COUNT_XML.name -> incompleteCountResult.setSize)))
} else {
ErrorResponse("Retrieving count result failed")
}
}
case Success(req: ReadResultRequest) if req.localResultId == breakdownResultId.toString => {
validateAuthnAndProjectId(req)
ReadResultResponse(123L, makeFinished(breakdownResult), breakdowns.head._2)
}
case _ => fail(s"Unknown input: $input")
}
HttpResponse.ok(response.toI2b2String)
}
}
val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, new MightWorkMockHttpClient(AbstractQueryRetrievalTestCase.hiveCredentials))
def getResults = doGetResults(adapter)
getResults.isInstanceOf[ErrorResponse] should be(true)
dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize)
dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns)
//The query shouldn't be 'done', since its status is PROCESSING
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Processing)
//Now, calling processRequest (via getResults) should cause the query to be re-retrieved from the CRC
val result = getResults.asInstanceOf[R]
//Which should cause the query to be re-stored with a 'done' status (since that's what our mock CRC returns)
val expectedStatusType = if (countQueryShouldWork) QueryResult.StatusType.Finished else QueryResult.StatusType.Processing
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(expectedStatusType)
if (!countQueryShouldWork) {
result.isInstanceOf[ErrorResponse] should be(true)
} else {
val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result)
actualNetworkQueryId should equal(shrineNetworkQueryId)
import ObfuscatorTest.within3
actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML))
within3(setSize, actualQueryResult.setSize) should be(true)
actualQueryResult.description should be(Some("results from node X"))
actualQueryResult.statusType should equal(QueryResult.StatusType.Finished)
actualQueryResult.statusMessage should be(Some(QueryResult.StatusType.Finished.name))
actualQueryResult.breakdowns.foreach {
case (rt, I2b2ResultEnvelope(_, data)) => {
data.forall { case (key, value) => within3(value, breakdowns.get(rt).get.data.get(key).get) }
}
}
for {
startDate <- actualQueryResult.startDate
endDate <- actualQueryResult.endDate
} {
val actualElapsed = toMillis(endDate) - toMillis(startDate)
actualElapsed should equal(elapsed)
}
}
}
def doTestProcessRequestQueuedQuery: Unit = afterCreatingTables {
import ResultOutputType._
import XmlDateHelper.now
val startDate = now
val elapsed = 100L
val endDate = {
import XmlGcEnrichments._
import scala.concurrent.duration._
startDate + elapsed.milliseconds
}
val countResultId = 456L
val incompleteCountResult = QueryResult(-1L, -1L, Some(PATIENT_COUNT_XML), -1L, Option(startDate), Option(endDate), Some("results from node X"), QueryResult.StatusType.Queued, None)
dao.inTransaction {
val insertedQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None)
//NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in
//AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete
//queries, will work.
val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(incompleteCountResult))
val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head
dao.insertCountResult(countQueryResultId, -1L, -1L)
}
val queryStartDate = now
object MockHttpClient extends HttpClient {
override def post(input: String, url: String): HttpResponse = ???
}
val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, MockHttpClient)
def getResults = doGetResults(adapter)
getResults.isInstanceOf[ErrorResponse] should be(true)
//The query shouldn't be 'done', since its status is QUEUED
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued)
//Now, calling processRequest (via getResults) should NOT cause the query to be re-retrieved from the CRC, because the query was previously queued
val result = getResults
result.isInstanceOf[ErrorResponse] should be(true)
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued)
}
def doTestProcessRequest = afterCreatingTables {
val adapter = makeAdapter(dao, MockHttpClient)
def getResults = doGetResults(adapter)
getResults match {
case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryNotFound].getName)
case x => fail(s"Got $x, not an ErrorResponse")
}
val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None)
getResults match {
- case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryNotFound].getName)
+ case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryResultNotAvailable].getName)
case x => fail(s"Got $x, not an ErrorResponse")
}
import ResultOutputType._
import XmlDateHelper.now
val breakdowns = Map(
PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L)),
PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("x" -> 3L, "y" -> 4L)))
val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1))
val startDate = now
val elapsed = 100L
val endDate = {
import XmlGcEnrichments._
import scala.concurrent.duration._
startDate + elapsed.milliseconds
}
val countResult = QueryResult(
resultId = 456L,
instanceId = instanceId,
resultType = Some(PATIENT_COUNT_XML),
setSize = setSize,
startDate = Option(startDate),
endDate = Option(endDate),
description = Some("results from node X"),
statusType = QueryResult.StatusType.Finished,
statusMessage = None,
breakdowns = breakdowns
)
val breakdownResults = breakdowns.map {
case (resultType, data) =>
countResult.withBreakdowns(Map(resultType -> data)).withResultType(resultType)
}.toSeq
val queryStartDate = now
val idsByResultType = dao.insertQueryResults(dbQueryId, countResult +: breakdownResults)
getResults.isInstanceOf[ErrorResponse] should be(true)
dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize)
dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns)
val result = getResults.asInstanceOf[R]
val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result)
actualNetworkQueryId should equal(shrineNetworkQueryId)
actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML))
actualQueryResult.setSize should equal(obfSetSize)
actualQueryResult.description should be(None) //TODO: This is probably wrong
actualQueryResult.statusType should equal(QueryResult.StatusType.Finished)
actualQueryResult.statusMessage should be(None)
actualQueryResult.breakdowns should equal(obfscBreakdowns)
for {
startDate <- actualQueryResult.startDate
endDate <- actualQueryResult.endDate
} {
val actualElapsed = toMillis(endDate) - toMillis(startDate)
actualElapsed should equal(elapsed)
}
}
}
object AbstractQueryRetrievalTestCase {
val hiveCredentials = HiveCredentials("some-hive-domain", "hive-username", "hive-password", "hive-project")
val doObfuscation = true
def runQueryAdapter(dao: AdapterDao, poster: Poster): RunQueryAdapter = {
val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("foo" -> Set("bar"))))
new RunQueryAdapter(
poster,
dao,
AbstractQueryRetrievalTestCase.hiveCredentials,
translator,
10000,
doObfuscation,
runQueriesImmediately = true,
DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false
)
}
import scala.concurrent.duration._
final class BogusRequest extends ShrineRequest("fooProject", 1.second, null) {
override val requestType = null
protected override def i2b2MessageBody: NodeSeq =
override def toXml =
}
}
\ No newline at end of file