package net.shrine.adapter import scala.xml.NodeSeq import net.shrine.util.ShouldMatchersForJUnit import ObfuscatorTest.within3 import javax.xml.datatype.XMLGregorianCalendar import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest import net.shrine.client.HttpClient import net.shrine.client.HttpResponse import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, BroadcastMessage, CrcRequest, Credential, DefaultBreakdownResultOutputTypes, ErrorResponse, HiveCredentials, I2b2ResultEnvelope, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse, ShrineRequest, ShrineResponse} import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_AGE_COUNT_XML import net.shrine.protocol.ResultOutputType.PATIENT_COUNT_XML import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML import net.shrine.protocol.query.{QueryDefinition, Term} import net.shrine.util.XmlDateHelper import net.shrine.util.XmlDateHelper.now import net.shrine.util.XmlGcEnrichments import net.shrine.client.Poster import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.adapter.translators.ExpressionTranslator import net.shrine.problem.TestProblem import scala.util.Success /** * @author clint * @since Nov 8, 2012 */ //noinspection UnitMethodIsParameterless abstract class AbstractQueryRetrievalTestCase[R <: BaseShrineResponse]( makeAdapter: (AdapterDao, HttpClient) => WithHiveCredentialsAdapter, makeRequest: (Long, AuthenticationInfo) => BaseShrineRequest, extractor: R => Option[(Long, QueryResult)]) extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit { private val authn = AuthenticationInfo("some-domain", "some-user", Credential("alskdjlkasd", false)) def doTestProcessRequestMissingQuery { val adapter = makeAdapter(dao, MockHttpClient) val response = adapter.processRequest(BroadcastMessage(0L, authn, makeRequest(-1L, authn))) response.isInstanceOf[ErrorResponse] should be(true) } def doTestProcessInvalidRequest { val adapter = makeAdapter(dao, MockHttpClient) intercept[ClassCastException] { //request must be a type of request we can handle adapter.processRequest(BroadcastMessage(0L, authn, new AbstractQueryRetrievalTestCase.BogusRequest)) } } private val localMasterId = "alksjdkalsdjlasdjlkjsad" private val shrineNetworkQueryId = 123L private def doGetResults(adapter: Adapter) = adapter.processRequest(BroadcastMessage(shrineNetworkQueryId, authn, makeRequest(shrineNetworkQueryId, authn))) private def toMillis(xmlGc: XMLGregorianCalendar): Long = xmlGc.toGregorianCalendar.getTimeInMillis private val instanceId = 999L private val setSize = 12345L private val obfSetSize = setSize + 1 private val queryExpr = Term("foo") private val topicId = "laskdjlkasd" private val fooQuery = QueryDefinition("some-query",queryExpr) def doTestProcessRequestIncompleteQuery(countQueryShouldWork: Boolean = true): Unit = afterCreatingTables { val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None) import ResultOutputType._ import XmlDateHelper.now val breakdowns = Map(PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L))) val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1)) val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResultId = 456L val breakdownResultId = 98237943265436L val incompleteCountResult = QueryResult( resultId = countResultId, instanceId = instanceId, resultType = Some(PATIENT_COUNT_XML), setSize = setSize, startDate = Option(startDate), endDate = Option(endDate), description = Some("results from node X"), statusType = QueryResult.StatusType.Processing, statusMessage = None, breakdowns = breakdowns) val breakdownResult = breakdowns.head match { case (resultType, data) => incompleteCountResult.withId(breakdownResultId).withBreakdowns(Map(resultType -> data)).withResultType(resultType) } val queryStartDate = now val idsByResultType = dao.insertQueryResults(dbQueryId, incompleteCountResult :: breakdownResult :: Nil) final class MightWorkMockHttpClient(expectedHiveCredentials: HiveCredentials) extends HttpClient { override def post(input: String, url: String): HttpResponse = { def makeFinished(queryResult: QueryResult) = queryResult.copy(statusType = QueryResult.StatusType.Finished) def validateAuthnAndProjectId(req: ShrineRequest) { req.authn should equal(expectedHiveCredentials.toAuthenticationInfo) req.projectId should equal(expectedHiveCredentials.projectId) } val response = CrcRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input) match { case Success(req: ReadResultRequest) if req.localResultId == countResultId.toString => { validateAuthnAndProjectId(req) if (countQueryShouldWork) { ReadResultResponse(123L, makeFinished(incompleteCountResult), I2b2ResultEnvelope(PATIENT_COUNT_XML, Map(PATIENT_COUNT_XML.name -> incompleteCountResult.setSize))) } else { ErrorResponse(TestProblem(summary = "Retrieving count result failed")) } } case Success(req: ReadResultRequest) if req.localResultId == breakdownResultId.toString => { validateAuthnAndProjectId(req) ReadResultResponse(123L, makeFinished(breakdownResult), breakdowns.head._2) } case _ => fail(s"Unknown input: $input") } HttpResponse.ok(response.toI2b2String) } } val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, new MightWorkMockHttpClient(AbstractQueryRetrievalTestCase.hiveCredentials)) def getResults = doGetResults(adapter) getResults.isInstanceOf[ErrorResponse] should be(true) dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize) dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns) //The query shouldn't be 'done', since its status is PROCESSING dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Processing) //Now, calling processRequest (via getResults) should cause the query to be re-retrieved from the CRC val result = getResults.asInstanceOf[R] //Which should cause the query to be re-stored with a 'done' status (since that's what our mock CRC returns) val expectedStatusType = if (countQueryShouldWork) QueryResult.StatusType.Finished else QueryResult.StatusType.Processing dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(expectedStatusType) if (!countQueryShouldWork) { result.isInstanceOf[ErrorResponse] should be(true) } else { val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result) actualNetworkQueryId should equal(shrineNetworkQueryId) import ObfuscatorTest.within3 actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML)) within3(setSize, actualQueryResult.setSize) should be(true) actualQueryResult.description should be(Some("results from node X")) actualQueryResult.statusType should equal(QueryResult.StatusType.Finished) actualQueryResult.statusMessage should be(Some(QueryResult.StatusType.Finished.name)) actualQueryResult.breakdowns.foreach { case (rt, I2b2ResultEnvelope(_, data)) => { data.forall { case (key, value) => within3(value, breakdowns.get(rt).get.data.get(key).get) } } } for { startDate <- actualQueryResult.startDate endDate <- actualQueryResult.endDate } { val actualElapsed = toMillis(endDate) - toMillis(startDate) actualElapsed should equal(elapsed) } } } def doTestProcessRequestQueuedQuery: Unit = afterCreatingTables { import ResultOutputType._ import XmlDateHelper.now val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResultId = 456L val incompleteCountResult = QueryResult(-1L, -1L, Some(PATIENT_COUNT_XML), -1L, Option(startDate), Option(endDate), Some("results from node X"), QueryResult.StatusType.Queued, None) dao.inTransaction { val insertedQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None) //NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in //AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete //queries, will work. val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(incompleteCountResult)) val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head dao.insertCountResult(countQueryResultId, -1L, -1L) } val queryStartDate = now object MockHttpClient extends HttpClient { override def post(input: String, url: String): HttpResponse = ??? } val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, MockHttpClient) def getResults = doGetResults(adapter) getResults.isInstanceOf[ErrorResponse] should be(true) //The query shouldn't be 'done', since its status is QUEUED dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued) //Now, calling processRequest (via getResults) should NOT cause the query to be re-retrieved from the CRC, because the query was previously queued val result = getResults result.isInstanceOf[ErrorResponse] should be(true) dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued) } def doTestProcessRequest = afterCreatingTables { val adapter = makeAdapter(dao, MockHttpClient) def getResults = doGetResults(adapter) getResults match { case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryNotFound].getName) case x => fail(s"Got $x, not an ErrorResponse") } val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None) getResults match { case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryResultNotAvailable].getName) case x => fail(s"Got $x, not an ErrorResponse") } import ResultOutputType._ import XmlDateHelper.now val breakdowns = Map( PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L)), PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("x" -> 3L, "y" -> 4L))) val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1)) val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResult = QueryResult( resultId = 456L, instanceId = instanceId, resultType = Some(PATIENT_COUNT_XML), setSize = setSize, startDate = Option(startDate), endDate = Option(endDate), description = Some("results from node X"), statusType = QueryResult.StatusType.Finished, statusMessage = None, breakdowns = breakdowns ) val breakdownResults = breakdowns.map { case (resultType, data) => countResult.withBreakdowns(Map(resultType -> data)).withResultType(resultType) }.toSeq val queryStartDate = now val idsByResultType = dao.insertQueryResults(dbQueryId, countResult +: breakdownResults) getResults.isInstanceOf[ErrorResponse] should be(true) dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize) dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns) val result = getResults.asInstanceOf[R] val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result) actualNetworkQueryId should equal(shrineNetworkQueryId) actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML)) actualQueryResult.setSize should equal(obfSetSize) actualQueryResult.description should be(None) //TODO: This is probably wrong actualQueryResult.statusType should equal(QueryResult.StatusType.Finished) actualQueryResult.statusMessage should be(None) actualQueryResult.breakdowns should equal(obfscBreakdowns) for { startDate <- actualQueryResult.startDate endDate <- actualQueryResult.endDate } { val actualElapsed = toMillis(endDate) - toMillis(startDate) actualElapsed should equal(elapsed) } } } object AbstractQueryRetrievalTestCase { val hiveCredentials = HiveCredentials("some-hive-domain", "hive-username", "hive-password", "hive-project") val doObfuscation = true def runQueryAdapter(dao: AdapterDao, poster: Poster): RunQueryAdapter = { val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("foo" -> Set("bar")))) new RunQueryAdapter( poster = poster, dao = dao, hiveCredentials = AbstractQueryRetrievalTestCase.hiveCredentials, conceptTranslator = translator, adapterLockoutAttemptsThreshold = 10000, doObfuscation = doObfuscation, runQueriesImmediately = true, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, botCountTimeThresholds = Map.empty ) } import scala.concurrent.duration._ final class BogusRequest extends ShrineRequest("fooProject", 1.second, null) { override val requestType = null protected override def i2b2MessageBody: NodeSeq = override def toXml = } }