diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala index c13bf959b..7ce5ce381 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala @@ -1,291 +1,297 @@ package net.shrine.adapter import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import net.shrine.adapter.audit.AdapterAuditDb +import net.shrine.problem.{AbstractProblem, ProblemSources} import scala.Option.option2Iterable import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.Try import scala.xml.NodeSeq import net.shrine.adapter.Obfuscator.obfuscateResults import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.model.Breakdown import net.shrine.adapter.dao.model.ShrineQueryResult import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, ErrorResponse, HasQueryResults, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse, BaseShrineRequest} import net.shrine.protocol.query.QueryDefinition import net.shrine.util.StackTrace import net.shrine.util.Tries.sequence import scala.concurrent.duration.Duration import net.shrine.client.Poster /** * @author clint * @since Nov 2, 2012 * */ object AbstractReadQueryResultAdapter { private final case class RawResponseAttempts(countResponseAttempt: Try[ReadResultResponse], breakdownResponseAttempts: Seq[Try[ReadResultResponse]]) private final case class SpecificResponseAttempts[R](responseAttempt: Try[R], breakdownResponseAttempts: Seq[Try[ReadResultResponse]]) } abstract class AbstractReadQueryResultAdapter[Req <: BaseShrineRequest, Rsp <: ShrineResponse with HasQueryResults]( poster: Poster, override val hiveCredentials: HiveCredentials, dao: AdapterDao, doObfuscation: Boolean, getQueryId: Req => Long, getProjectId: Req => String, toResponse: (Long, QueryResult) => Rsp, breakdownTypes: Set[ResultOutputType], collectAdapterAudit:Boolean ) extends WithHiveCredentialsAdapter(hiveCredentials) { //TODO: Make this configurable private val numThreads = math.max(5, Runtime.getRuntime.availableProcessors) //TODO: Use scala.concurrent.ExecutionContext.Implicits.global instead? private lazy val executorService = Executors.newFixedThreadPool(numThreads) private lazy val executionContext = ExecutionContext.fromExecutorService(executorService) override def shutdown() { try { executorService.shutdown() executorService.awaitTermination(5, TimeUnit.SECONDS) } finally { executorService.shutdownNow() super.shutdown() } } import AbstractReadQueryResultAdapter._ override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = { val req = message.request.asInstanceOf[Req] val queryId = getQueryId(req) def findShrineQueryRow = dao.findQueryByNetworkId(queryId) def findShrineQueryResults = dao.findResultsFor(queryId) findShrineQueryRow match { case None => { debug(s"Query $queryId not found in the Shrine DB") errorResponse(queryId) } case Some(shrineQueryRow) => { if (shrineQueryRow.hasNotBeenRun) { debug(s"Query $queryId found, but it wasn't run before") findShrineQueryResults.map(makeResponseFrom(queryId, _)).getOrElse { debug(s"Couldn't retrive all results for query $queryId; it's likely the query's status is incomplete (QUEUED, PROCESSING, etc)") errorResponse(queryId) } } else { findShrineQueryResults match { case None => { debug(s"Query $queryId found, and it has been run, but its results are not available yet") //TODO: When precisely can this happen? Should we go back to the CRC here? errorResponse(queryId) } case Some(shrineQueryResult) => { if (shrineQueryResult.isDone) { debug(s"Query $queryId is done and already stored, returning stored results") makeResponseFrom(queryId, shrineQueryResult) } else { debug(s"Query $queryId is incomplete, asking CRC for results") val result: ShrineResponse = retrieveQueryResults(queryId, req, shrineQueryResult, message) if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(queryId,result) result } } } } } } } - private def errorResponse(queryId: Long) = ErrorResponse(s"Query with id '$queryId' not found") + private def errorResponse(queryId: Long) = ErrorResponse(QueryNotFound(queryId)) private def makeResponseFrom(queryId: Long, shrineQueryResult: ShrineQueryResult): ShrineResponse = { shrineQueryResult.toQueryResults(doObfuscation).map(toResponse(queryId, _)).getOrElse(errorResponse(queryId)) } private def retrieveQueryResults(queryId: Long, req: Req, shrineQueryResult: ShrineQueryResult, message: BroadcastMessage): ShrineResponse = { //NB: If the requested query was not finished executing on the i2b2 side when Shrine recorded it, attempt to //retrieve it and all its sub-components (breakdown results, if any) in parallel. Asking for the results in //parallel is quite possibly too clever, but may be faster than asking for them serially. //TODO: Review this. //Make requests for results in parallel val futureResponses = scatter(message.networkAuthn, req, shrineQueryResult) //Gather all the results (block until they're all returned) val SpecificResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = gather(queryId, futureResponses, req.waitTime) countResponseAttempt match { //If we successfully received the parent response (the one with query type PATIENT_COUNT_XML), re-store it along //with any retrieved breakdowns before returning it. case Success(countResponse) => { //NB: Only store the result if needed, that is, if all results are done //TODO: REVIEW THIS storeResultIfNecessary(shrineQueryResult, countResponse, req.authn, queryId, getFailedBreakdownTypes(breakdownResponseAttempts)) countResponse } case Failure(e) => ErrorResponse(s"Couldn't retrieve query with id '$queryId' from the CRC: exception message follows: ${e.getMessage} stack trace: ${StackTrace.stackTraceAsString(e)}") } } private def scatter(authn: AuthenticationInfo, req: Req, shrineQueryResult: ShrineQueryResult): Future[RawResponseAttempts] = { def makeRequest(localResultId: Long) = ReadResultRequest(hiveCredentials.projectId, req.waitTime, hiveCredentials.toAuthenticationInfo, localResultId.toString) def process(localResultId: Long): ShrineResponse = { delegateResultRetrievingAdapter.process(authn, makeRequest(localResultId)) } implicit val executionContext = this.executionContext import scala.concurrent.blocking def futureBlockingAttempt[T](f: => T): Future[Try[T]] = Future(blocking(Try(f))) val futureCountAttempt: Future[Try[ShrineResponse]] = futureBlockingAttempt { process(shrineQueryResult.count.localId) } val futureBreakdownAttempts = Future.sequence(for { Breakdown(_, localResultId, resultType, data) <- shrineQueryResult.breakdowns } yield futureBlockingAttempt { process(localResultId) }) //Log errors retrieving count futureCountAttempt.collect { case Success(e: ErrorResponse) => error(s"Error requesting count result from the CRC: '$e'") case Failure(e) => error(s"Error requesting count result from the CRC: ", e) } //Log errors retrieving breakdown for { breakdownResponseAttempts <- futureBreakdownAttempts } { breakdownResponseAttempts.collect { case Success(e: ErrorResponse) => error(s"Error requesting breakdown result from the CRC: '$e'") case Failure(e) => error(s"Error requesting breakdown result from the CRC: ", e) } } //"Filter" for non-ErrorResponses val futureNonErrorCountAttempt: Future[Try[ReadResultResponse]] = futureCountAttempt.collect { case Success(resp: ReadResultResponse) => Success(resp) //NB: Need to repackage response here to avoid ugly, obscure, superfluous cast case unexpected => Failure(new Exception(s"Getting count result failed. Response is: '$unexpected'")) } //"Filter" for non-ErrorResponses val futureNonErrorBreakdownResponseAttempts: Future[Seq[Try[ReadResultResponse]]] = for { breakdownResponseAttempts <- futureBreakdownAttempts } yield { breakdownResponseAttempts.collect { case Success(resp: ReadResultResponse) => Try(resp) } } for { countResponseAttempt <- futureNonErrorCountAttempt breakdownResponseAttempts <- futureNonErrorBreakdownResponseAttempts } yield { RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) } } private def gather(queryId: Long, futureResponses: Future[RawResponseAttempts], waitTime: Duration): SpecificResponseAttempts[Rsp] = { val RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = Await.result(futureResponses, waitTime) //Log any failures (countResponseAttempt +: breakdownResponseAttempts).collect { case Failure(e) => e }.foreach(error("Error retrieving result from the CRC: ", _)) //NB: Count response and ALL breakdown responses must be available (not Failures) or else a Failure will be returned val responseAttempt = for { countResponse: ReadResultResponse <- countResponseAttempt countQueryResult = countResponse.metadata breakdownResponses: Seq[ReadResultResponse] <- sequence(breakdownResponseAttempts) } yield { val localCountResultId = countResponse.metadata.resultId val breakdownsByType = (for { breakdownResponse <- breakdownResponses resultType <- breakdownResponse.metadata.resultType } yield resultType -> breakdownResponse.data).toMap val queryResultWithBreakdowns = countQueryResult.withBreakdowns(breakdownsByType) val queryResultToReturn = if(doObfuscation) Obfuscator.obfuscate(queryResultWithBreakdowns) else queryResultWithBreakdowns toResponse(queryId, queryResultToReturn) } SpecificResponseAttempts(responseAttempt, breakdownResponseAttempts) } private def getFailedBreakdownTypes(attempts: Seq[Try[ReadResultResponse]]): Set[ResultOutputType] = { val successfulBreakdownTypes = attempts.collect { case Success(ReadResultResponse(_, metadata, _)) => metadata.resultType }.flatten breakdownTypes -- successfulBreakdownTypes } private def storeResultIfNecessary(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) { val responseIsDone = response.results.forall(_.statusType.isDone) if (responseIsDone) { storeResult(shrineQueryResult, response, authn, queryId, failedBreakdownTypes) } } private def storeResult(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) { val rawResults = response.results val obfuscatedResults = obfuscateResults(doObfuscation)(response.results) for { shrineQuery <- dao.findQueryByNetworkId(queryId) queryResult <- rawResults.headOption obfuscatedQueryResult <- obfuscatedResults.headOption } { val queryDefinition = QueryDefinition(shrineQuery.name, shrineQuery.queryDefinition.expr) dao.inTransaction { dao.deleteQuery(queryId) dao.storeResults(authn, shrineQueryResult.localId, queryId, queryDefinition, rawResults, obfuscatedResults, failedBreakdownTypes.toSeq, queryResult.breakdowns, obfuscatedQueryResult.breakdowns) } } } private type Unmarshaller[R] = Set[ResultOutputType] => NodeSeq => Try[R] private final class DelegateAdapter[Rqst <: ShrineRequest, Rspns <: ShrineResponse](unmarshaller: Unmarshaller[Rspns]) extends CrcAdapter[Rqst, Rspns](poster, hiveCredentials) { def process(authn: AuthenticationInfo, req: Rqst): Rspns = processRequest(BroadcastMessage(authn, req)).asInstanceOf[Rspns] override protected def parseShrineResponse(xml: NodeSeq): ShrineResponse = unmarshaller(breakdownTypes)(xml).get //TODO: Avoid .get call } private lazy val delegateResultRetrievingAdapter = new DelegateAdapter[ReadResultRequest, ReadResultResponse](ReadResultResponse.fromI2b2 _) } + +case class QueryNotFound(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) { + override def summary: String = s"Query with id '$queryId' not found" +} + diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala index beeacf088..8430355fc 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala @@ -1,64 +1,74 @@ package net.shrine.adapter import net.shrine.log.Loggable -import net.shrine.protocol.BroadcastMessage -import net.shrine.protocol.ErrorResponse +import net.shrine.problem.{Problem, ProblemNotInCodec, LoggingProblemHandler, ProblemSources, AbstractProblem} +import net.shrine.protocol.{ShrineRequest, BroadcastMessage, ErrorResponse, ShrineResponse, BaseShrineResponse, AuthenticationInfo} import net.shrine.serialization.XmlMarshaller import net.shrine.util.StackTrace -import net.shrine.protocol.ShrineResponse -import net.shrine.protocol.BaseShrineResponse -import net.shrine.protocol.AuthenticationInfo /** * @author Bill Simons * @since 4/8/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ abstract class Adapter extends Loggable { final def perform(message: BroadcastMessage): BaseShrineResponse = { + def problemToErrorResponse(problem:Problem):ErrorResponse = { + LoggingProblemHandler.handleProblem(problem) + ErrorResponse(problem) + } + val shrineResponse = try { processRequest(message) } catch { case e: AdapterLockoutException => { - val AuthenticationInfo(domain, username, _) = message.request.authn - - warn(s"User '$domain:$username' is locked out") - - errorResponseFrom(e) + problemToErrorResponse(AdapterLockout(message.request.authn,e)) } case e @ CrcInvocationException(invokedCrcUrl, request, cause) => { - error(s"Error invoking the CRC at '$invokedCrcUrl' with request $request . Root cause: ", cause) - - errorResponseFrom(e) + problemToErrorResponse(CrcCouldNotBeInvoked(invokedCrcUrl,request,e)) } case e: AdapterMappingException => { - warn(s"Error mapping query terms: ${e.getMessage}", e) - - errorResponseFrom(e) - } + problemToErrorResponse(AdapterMappingProblem(e)) + } case e: Exception => { - //for now we'll warn on all errors and work towards more specific logging later - def messageXml = Option(message).map(_.toXmlString).getOrElse("(Null message)") - - warn(s"Exception $e in Adapter with stack trace:\r\n${ StackTrace.stackTraceAsString(e) } caused on request\r\n $messageXml", e) - - errorResponseFrom(e) + + val summary = if(message == null) "Unknown problem in Adapter.perform with null BroadcastMessage" + else s"Unexpected exception in Adapter" + problemToErrorResponse(ProblemNotInCodec(summary,e)) } } shrineResponse } - private def errorResponseFrom(e: Throwable) = ErrorResponse(e.getMessage) - protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse //NOOP, may be overridden by subclasses def shutdown(): Unit = () -} \ No newline at end of file +} + +case class AdapterLockout(authn:AuthenticationInfo,x:AdapterLockoutException) extends AbstractProblem(ProblemSources.Hub) { + override def summary: String = s"User '${authn.domain}:${authn.username}' is locked out" + + override def throwable = Some(x) +} + +case class CrcCouldNotBeInvoked(crcUrl:String,request:ShrineRequest,x:CrcInvocationException) extends AbstractProblem(ProblemSources.Hub) { + override def summary: String = s"Error invoking the CRC at '$crcUrl' with request $request ." + + override def throwable = Some(x) +} + +case class AdapterMappingProblem(x:AdapterMappingException) extends AbstractProblem(ProblemSources.Hub) { + override def summary: String = s"Error mapping query terms: ${x.getMessage}" + + override def throwable = Some(x) +} + +case class ExceptionInAdapter() \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala index 9dfbc2285..80784f2c8 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala @@ -1,33 +1,38 @@ package net.shrine.adapter.components import net.shrine.adapter.dao.AdapterDao +import net.shrine.problem.{ProblemSources, AbstractProblem} import net.shrine.protocol.ShrineResponse import net.shrine.protocol.ReadQueryDefinitionRequest import net.shrine.protocol.ReadQueryDefinitionResponse import net.shrine.protocol.ErrorResponse import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.AbstractReadQueryDefinitionRequest /** * @author clint * @since Apr 4, 2013 * * NB: Tested by ReadQueryDefinitionAdapterTest */ final case class QueryDefinitions[Req <: AbstractReadQueryDefinitionRequest](dao: AdapterDao) { def get(request: Req): ShrineResponse = { val resultOption = for { shrineQuery <- dao.findQueryByNetworkId(request.queryId) } yield { ReadQueryDefinitionResponse( shrineQuery.networkId, shrineQuery.name, shrineQuery.username, shrineQuery.dateCreated, //TODO: I2b2 or Shrine format? shrineQuery.queryDefinition.toI2b2String) } - resultOption.getOrElse(ErrorResponse(s"Couldn't find query with network id: ${request.queryId}")) + resultOption.getOrElse(ErrorResponse(QueryNotInDatabase(request))) } +} + +case class QueryNotInDatabase(request:AbstractReadQueryDefinitionRequest) extends AbstractProblem(ProblemSources.Hub) { + override def summary: String = s"Couldn't find query with network id: ${request.queryId}" } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala index ea595f7c6..d784c48ea 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala @@ -1,370 +1,375 @@ package net.shrine.adapter import scala.xml.NodeSeq import net.shrine.util.ShouldMatchersForJUnit import ObfuscatorTest.within3 import javax.xml.datatype.XMLGregorianCalendar import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest import net.shrine.client.HttpClient import net.shrine.client.HttpResponse import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, CrcRequest, Credential, ErrorResponse, I2b2ResultEnvelope, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse, BaseShrineResponse, BaseShrineRequest, RunQueryRequest, RunQueryResponse, DefaultBreakdownResultOutputTypes} import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_AGE_COUNT_XML import net.shrine.protocol.ResultOutputType.PATIENT_COUNT_XML import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML import net.shrine.protocol.query.{QueryDefinition, Term} import net.shrine.util.XmlDateHelper import net.shrine.util.XmlDateHelper.now import net.shrine.util.XmlGcEnrichments import net.shrine.client.Poster import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.adapter.translators.ExpressionTranslator import scala.util.Success /** * @author clint * @since Nov 8, 2012 */ +//noinspection UnitMethodIsParameterless abstract class AbstractQueryRetrievalTestCase[R <: BaseShrineResponse]( makeAdapter: (AdapterDao, HttpClient) => WithHiveCredentialsAdapter, makeRequest: (Long, AuthenticationInfo) => BaseShrineRequest, extractor: R => Option[(Long, QueryResult)]) extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit { private val authn = AuthenticationInfo("some-domain", "some-user", Credential("alskdjlkasd", false)) def doTestProcessRequestMissingQuery { val adapter = makeAdapter(dao, MockHttpClient) val response = adapter.processRequest(BroadcastMessage(0L, authn, makeRequest(-1L, authn))) response.isInstanceOf[ErrorResponse] should be(true) } def doTestProcessInvalidRequest { val adapter = makeAdapter(dao, MockHttpClient) intercept[ClassCastException] { //request must be a type of request we can handle adapter.processRequest(BroadcastMessage(0L, authn, new AbstractQueryRetrievalTestCase.BogusRequest)) } } private val localMasterId = "alksjdkalsdjlasdjlkjsad" private val shrineNetworkQueryId = 123L - private val errorResponse = ErrorResponse(s"Query with id '$shrineNetworkQueryId' not found") - private def doGetResults(adapter: Adapter) = adapter.processRequest(BroadcastMessage(shrineNetworkQueryId, authn, makeRequest(shrineNetworkQueryId, authn))) private def toMillis(xmlGc: XMLGregorianCalendar): Long = xmlGc.toGregorianCalendar.getTimeInMillis private val instanceId = 999L private val setSize = 12345L private val obfSetSize = setSize + 1 private val queryExpr = Term("foo") private val topicId = "laskdjlkasd" private val fooQuery = QueryDefinition("some-query",queryExpr) def doTestProcessRequestIncompleteQuery(countQueryShouldWork: Boolean = true): Unit = afterCreatingTables { val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None) import ResultOutputType._ import XmlDateHelper.now val breakdowns = Map(PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L))) val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1)) val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResultId = 456L val breakdownResultId = 98237943265436L val incompleteCountResult = QueryResult( resultId = countResultId, instanceId = instanceId, resultType = Some(PATIENT_COUNT_XML), setSize = setSize, startDate = Option(startDate), endDate = Option(endDate), description = Some("results from node X"), statusType = QueryResult.StatusType.Processing, statusMessage = None, breakdowns = breakdowns) val breakdownResult = breakdowns.head match { case (resultType, data) => incompleteCountResult.withId(breakdownResultId).withBreakdowns(Map(resultType -> data)).withResultType(resultType) } val queryStartDate = now val idsByResultType = dao.insertQueryResults(dbQueryId, incompleteCountResult :: breakdownResult :: Nil) final class MightWorkMockHttpClient(expectedHiveCredentials: HiveCredentials) extends HttpClient { override def post(input: String, url: String): HttpResponse = { def makeFinished(queryResult: QueryResult) = queryResult.copy(statusType = QueryResult.StatusType.Finished) def validateAuthnAndProjectId(req: ShrineRequest) { req.authn should equal(expectedHiveCredentials.toAuthenticationInfo) req.projectId should equal(expectedHiveCredentials.projectId) } val response = CrcRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input) match { case Success(req: ReadResultRequest) if req.localResultId == countResultId.toString => { validateAuthnAndProjectId(req) if (countQueryShouldWork) { ReadResultResponse(123L, makeFinished(incompleteCountResult), I2b2ResultEnvelope(PATIENT_COUNT_XML, Map(PATIENT_COUNT_XML.name -> incompleteCountResult.setSize))) } else { ErrorResponse("Retrieving count result failed") } } case Success(req: ReadResultRequest) if req.localResultId == breakdownResultId.toString => { validateAuthnAndProjectId(req) ReadResultResponse(123L, makeFinished(breakdownResult), breakdowns.head._2) } case _ => fail(s"Unknown input: $input") } HttpResponse.ok(response.toI2b2String) } } val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, new MightWorkMockHttpClient(AbstractQueryRetrievalTestCase.hiveCredentials)) def getResults = doGetResults(adapter) getResults.isInstanceOf[ErrorResponse] should be(true) dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize) dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns) //The query shouldn't be 'done', since its status is PROCESSING dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Processing) //Now, calling processRequest (via getResults) should cause the query to be re-retrieved from the CRC val result = getResults.asInstanceOf[R] //Which should cause the query to be re-stored with a 'done' status (since that's what our mock CRC returns) val expectedStatusType = if (countQueryShouldWork) QueryResult.StatusType.Finished else QueryResult.StatusType.Processing dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(expectedStatusType) if (!countQueryShouldWork) { result.isInstanceOf[ErrorResponse] should be(true) } else { val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result) actualNetworkQueryId should equal(shrineNetworkQueryId) import ObfuscatorTest.within3 actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML)) within3(setSize, actualQueryResult.setSize) should be(true) actualQueryResult.description should be(Some("results from node X")) actualQueryResult.statusType should equal(QueryResult.StatusType.Finished) actualQueryResult.statusMessage should be(Some(QueryResult.StatusType.Finished.name)) actualQueryResult.breakdowns.foreach { case (rt, I2b2ResultEnvelope(_, data)) => { data.forall { case (key, value) => within3(value, breakdowns.get(rt).get.data.get(key).get) } } } for { startDate <- actualQueryResult.startDate endDate <- actualQueryResult.endDate } { val actualElapsed = toMillis(endDate) - toMillis(startDate) actualElapsed should equal(elapsed) } } } def doTestProcessRequestQueuedQuery: Unit = afterCreatingTables { import ResultOutputType._ import XmlDateHelper.now val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResultId = 456L val incompleteCountResult = QueryResult(-1L, -1L, Some(PATIENT_COUNT_XML), -1L, Option(startDate), Option(endDate), Some("results from node X"), QueryResult.StatusType.Queued, None) dao.inTransaction { val insertedQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None) //NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in //AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete //queries, will work. val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(incompleteCountResult)) val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head dao.insertCountResult(countQueryResultId, -1L, -1L) } val queryStartDate = now object MockHttpClient extends HttpClient { override def post(input: String, url: String): HttpResponse = ??? } val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, MockHttpClient) def getResults = doGetResults(adapter) getResults.isInstanceOf[ErrorResponse] should be(true) //The query shouldn't be 'done', since its status is QUEUED dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued) //Now, calling processRequest (via getResults) should NOT cause the query to be re-retrieved from the CRC, because the query was previously queued val result = getResults result.isInstanceOf[ErrorResponse] should be(true) dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued) } def doTestProcessRequest = afterCreatingTables { val adapter = makeAdapter(dao, MockHttpClient) def getResults = doGetResults(adapter) - getResults should equal(errorResponse) + getResults match { + case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryNotFound].getName) + case x => fail(s"Got $x, not an ErrorResponse") + } val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None) - getResults should equal(errorResponse) + getResults match { + case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryNotFound].getName) + case x => fail(s"Got $x, not an ErrorResponse") + } import ResultOutputType._ import XmlDateHelper.now val breakdowns = Map( PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L)), PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("x" -> 3L, "y" -> 4L))) val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1)) val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResult = QueryResult( resultId = 456L, instanceId = instanceId, resultType = Some(PATIENT_COUNT_XML), setSize = setSize, startDate = Option(startDate), endDate = Option(endDate), description = Some("results from node X"), statusType = QueryResult.StatusType.Finished, statusMessage = None, breakdowns = breakdowns ) val breakdownResults = breakdowns.map { case (resultType, data) => countResult.withBreakdowns(Map(resultType -> data)).withResultType(resultType) }.toSeq val queryStartDate = now val idsByResultType = dao.insertQueryResults(dbQueryId, countResult +: breakdownResults) getResults.isInstanceOf[ErrorResponse] should be(true) dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize) dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns) val result = getResults.asInstanceOf[R] val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result) actualNetworkQueryId should equal(shrineNetworkQueryId) actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML)) actualQueryResult.setSize should equal(obfSetSize) actualQueryResult.description should be(None) //TODO: This is probably wrong actualQueryResult.statusType should equal(QueryResult.StatusType.Finished) actualQueryResult.statusMessage should be(None) actualQueryResult.breakdowns should equal(obfscBreakdowns) for { startDate <- actualQueryResult.startDate endDate <- actualQueryResult.endDate } { val actualElapsed = toMillis(endDate) - toMillis(startDate) actualElapsed should equal(elapsed) } } } object AbstractQueryRetrievalTestCase { val hiveCredentials = HiveCredentials("some-hive-domain", "hive-username", "hive-password", "hive-project") val doObfuscation = true def runQueryAdapter(dao: AdapterDao, poster: Poster): RunQueryAdapter = { val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("foo" -> Set("bar")))) new RunQueryAdapter( poster, dao, AbstractQueryRetrievalTestCase.hiveCredentials, translator, 10000, doObfuscation, runQueriesImmediately = true, DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false ) } import scala.concurrent.duration._ final class BogusRequest extends ShrineRequest("fooProject", 1.second, null) { override val requestType = null protected override def i2b2MessageBody: NodeSeq = override def toXml = } } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala index 9ba8869df..4759cdbdf 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala @@ -1,66 +1,77 @@ package net.shrine.adapter +import net.shrine.problem.ProblemNotInCodec import net.shrine.util.ShouldMatchersForJUnit import net.shrine.protocol.BaseShrineResponse import net.shrine.protocol.BroadcastMessage import org.junit.Test import net.shrine.protocol.DeleteQueryResponse import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.Credential import net.shrine.protocol.ErrorResponse import net.shrine.protocol.DeleteQueryRequest /** * @author clint - * @date Mar 31, 2014 + * @since Mar 31, 2014 */ +//noinspection UnitMethodIsParameterless final class AdapterTest extends ShouldMatchersForJUnit { private final class MockAdapter(toReturn: => BaseShrineResponse) extends Adapter { override protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse = toReturn } import scala.concurrent.duration._ - private val lockedOutAuthn = AuthenticationInfo("d", "u", Credential("p", false)) + private val lockedOutAuthn = AuthenticationInfo("d", "u", Credential("p", isToken = false)) - private val networkAuthn = AuthenticationInfo("nd", "nu", Credential("np", false)) + private val networkAuthn = AuthenticationInfo("nd", "nu", Credential("np", isToken = false)) private val req = DeleteQueryRequest("pid", 1.second, lockedOutAuthn, 12345L) private val resp = DeleteQueryResponse(12345) - + @Test def testHandlesNonFailureCase: Unit = { val adapter = new MockAdapter(resp) adapter.perform(null) should equal(resp) } - + @Test def testHandlesLockoutCase: Unit = { - doExceptionToErrorResponseTest(new AdapterLockoutException(lockedOutAuthn)) + doErrorResponseTest(new AdapterLockoutException(lockedOutAuthn),classOf[AdapterLockout]) } - + @Test def testHandlesCrcFailureCase: Unit = { val url = "http://example.com" - doExceptionToErrorResponseTest(CrcInvocationException(url, req, new Exception)) + doErrorResponseTest(CrcInvocationException(url, req, new Exception),classOf[CrcCouldNotBeInvoked]) } @Test def testHandlesMappingFailureCase: Unit = { - doExceptionToErrorResponseTest(new AdapterMappingException("blarg", new Exception)) + doErrorResponseTest(new AdapterMappingException("blarg", new Exception),classOf[AdapterMappingProblem]) } @Test def testHandlesGeneralFailureCase: Unit = { - doExceptionToErrorResponseTest(new Exception("blerg")) + doErrorResponseTest(new Exception("blerg"),classOf[ProblemNotInCodec]) } - - private def doExceptionToErrorResponseTest(exception: Throwable): Unit = { + + //noinspection ScalaUnreachableCode,RedundantBlock + private def doErrorResponseTest(exception: Throwable,problemClass:Class[_]) = { val adapter = new MockAdapter(throw exception) - - adapter.perform(BroadcastMessage(networkAuthn, req)) should equal(ErrorResponse(exception.getMessage)) - } + + val response = adapter.perform(BroadcastMessage(networkAuthn, req)) + + response match { + case errorResponse:ErrorResponse => { + val pd = errorResponse.problemDigest + pd.codec should be (problemClass.getName) + } + case x => fail(s"$x is not an ErrorResponse") + } + } } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala index 8344de5e7..1d5a2864b 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala @@ -1,44 +1,44 @@ package net.shrine.adapter import scala.concurrent.duration.DurationInt import org.junit.Test import net.shrine.client.Poster import net.shrine.protocol.ReadInstanceResultsRequest import net.shrine.protocol.ReadInstanceResultsResponse import net.shrine.adapter.dao.AdapterDao import net.shrine.protocol.DefaultBreakdownResultOutputTypes /** * @author clint - * @date Nov 7, 2012 + * @since Nov 7, 2012 */ final class ReadInstanceResultsAdapterTest extends AbstractQueryRetrievalTestCase( (dao, httpClient) => new ReadInstanceResultsAdapter( Poster("", httpClient), AbstractQueryRetrievalTestCase.hiveCredentials, dao, true, DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false ), (queryId, authn) => ReadInstanceResultsRequest("some-project-id", 10.seconds, authn, queryId), ReadInstanceResultsResponse.unapply) { @Test def testProcessInvalidRequest = doTestProcessInvalidRequest @Test def testProcessRequest = doTestProcessRequest - + @Test def testProcessRequestMissingQuery = doTestProcessRequestMissingQuery @Test def testProcessRequestIncompleteQuery = doTestProcessRequestIncompleteQuery(true) @Test def testProcessRequestIncompleteQueryCountResultRetrievalFails = doTestProcessRequestIncompleteQuery(false) @Test def testProcessRequestQueuedQuery = doTestProcessRequestQueuedQuery } diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryDefinitionAdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryDefinitionAdapterTest.scala index b4fcd4f76..563315581 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryDefinitionAdapterTest.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryDefinitionAdapterTest.scala @@ -1,78 +1,83 @@ package net.shrine.adapter +import net.shrine.adapter.components.QueryNotInDatabase +import net.shrine.problem.ProblemNotInCodec import org.junit.Test import net.shrine.util.ShouldMatchersForJUnit import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.Credential import net.shrine.protocol.ErrorResponse import net.shrine.protocol.ReadQueryDefinitionRequest import net.shrine.protocol.ReadQueryDefinitionResponse import net.shrine.protocol.RenameQueryRequest import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.query.Term /** * @author clint - * @date Nov 28, 2012 + * @since Nov 28, 2012 */ final class ReadQueryDefinitionAdapterTest extends AbstractSquerylAdapterTest with AdapterTestHelpers with ShouldMatchersForJUnit { - private val networkAuthn = AuthenticationInfo("network-domain", "network-user", Credential("skajdhkasjdh", false)) + private val networkAuthn = AuthenticationInfo("network-domain", "network-user", Credential("skajdhkasjdh", isToken = false)) import scala.concurrent.duration._ @Test def testProcessRequest = afterCreatingTables { val name = "blarg" val expr = Term("foo") val fooQuery = QueryDefinition(name,expr) val adapter = new ReadQueryDefinitionAdapter(dao) //Should get error for non-existent query { - val ErrorResponse(msg) = adapter.processRequest(BroadcastMessage(123L, networkAuthn, ReadQueryDefinitionRequest("proj", 1.second, authn, queryId))) + val ErrorResponse(msg,problemDigest) = adapter.processRequest(BroadcastMessage(123L, networkAuthn, ReadQueryDefinitionRequest("proj", 1.second, authn, queryId))) msg should not be(null) + + problemDigest.codec should be (classOf[QueryNotInDatabase].getName) } //Add a query dao.insertQuery(localMasterId, queryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None) //sanity check that it's there { val Some(query) = dao.findQueryByNetworkId(queryId) query.networkId should equal(queryId) } { //Should still get error for non-existent query - val ErrorResponse(msg) = adapter.processRequest(BroadcastMessage(123L, networkAuthn, ReadQueryDefinitionRequest("proj", 1.second, authn, bogusQueryId))) + val ErrorResponse(msg, problemDigest) = adapter.processRequest(BroadcastMessage(123L, networkAuthn, ReadQueryDefinitionRequest("proj", 1.second, authn, bogusQueryId))) msg should not be(null) + problemDigest.codec should be (classOf[QueryNotInDatabase].getName) } { //try to read a real query val ReadQueryDefinitionResponse(rQueryId, rName, userId, createDate, queryDefinition) = adapter.processRequest(BroadcastMessage(123L, networkAuthn, ReadQueryDefinitionRequest("proj", 1.second, authn, queryId))) rQueryId should equal(queryId) rName should equal(name) userId should equal(authn.username) createDate should not be(null) // :( queryDefinition should equal(QueryDefinition(name, expr).toI2b2String) } } @Test def testProcessRequestBadRequest { val adapter = new ReadQueryDefinitionAdapter(dao) intercept[Exception] { adapter.processRequest(BroadcastMessage(123L, networkAuthn, RenameQueryRequest("proj", 1.second, authn, queryId, "foo"))) } } } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/AbstractI2b2AdminResourceJaxrsTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/AbstractI2b2AdminResourceJaxrsTest.scala index b6e2130dc..45636c915 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/AbstractI2b2AdminResourceJaxrsTest.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/AbstractI2b2AdminResourceJaxrsTest.scala @@ -1,119 +1,117 @@ package net.shrine.adapter.service import scala.xml.XML import org.junit.After import org.junit.Before import net.shrine.util.ShouldMatchersForJUnit import net.shrine.adapter.AdapterTestHelpers import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest import net.shrine.client.HttpClient import net.shrine.client.HttpResponse import net.shrine.client.JerseyHttpClient import net.shrine.protocol.ErrorResponse import net.shrine.protocol.QueryMaster import net.shrine.protocol.ReadI2b2AdminPreviousQueriesRequest import net.shrine.protocol.ReadPreviousQueriesResponse import net.shrine.protocol.ReadQueryDefinitionRequest import net.shrine.protocol.ReadQueryDefinitionResponse import net.shrine.protocol.query.QueryDefinition import net.shrine.util.XmlUtil import net.shrine.crypto.TrustParam.AcceptAllCerts import net.shrine.protocol.I2b2AdminRequestHandler import net.shrine.protocol.I2b2AdminReadQueryDefinitionRequest import junit.framework.TestCase import net.shrine.protocol.DefaultBreakdownResultOutputTypes /** * @author clint * @date Apr 24, 2013 */ abstract class AbstractI2b2AdminResourceJaxrsTest extends TestCase with JerseyTestComponent[I2b2AdminRequestHandler] with AbstractSquerylAdapterTest with ShouldMatchersForJUnit with CanLoadTestData with AdapterTestHelpers { import scala.concurrent.duration._ protected def adminClient = I2b2AdminClient(resourceUrl, JerseyHttpClient(AcceptAllCerts, 5.minutes)) override def resourceClass(handler: I2b2AdminRequestHandler) = I2b2AdminResource(handler, DefaultBreakdownResultOutputTypes.toSet) override val basePath = "i2b2/admin/request" @Before override def setUp(): Unit = this.JerseyTest.setUp() @After override def tearDown(): Unit = this.JerseyTest.tearDown() protected object NeverAuthenticatesMockPmHttpClient extends HttpClient { override def post(input: String, url: String): HttpResponse = HttpResponse.ok(ErrorResponse("blarg").toI2b2String) } protected object AlwaysAuthenticatesMockPmHttpClient extends HttpClient { override def post(input: String, url: String): HttpResponse = { HttpResponse.ok(XmlUtil.stripWhitespace { Some user { userId } { domain } { password } MANAGER }.toString) } } protected def doTestReadQueryDefinition(networkQueryId: Long, expectedQueryNameAndQueryDef: Option[(String, QueryDefinition)]) { val request = I2b2AdminReadQueryDefinitionRequest(projectId, waitTime, authn, networkQueryId) val currentHandler = handler val resp = adminClient.readQueryDefinition(request) def stripNamespaces(s: String) = XmlUtil.stripNamespaces(XML.loadString(s)) expectedQueryNameAndQueryDef match { case Some((expectedQueryName, expectedQueryDef)) => { val response @ ReadQueryDefinitionResponse(masterId, name, userId, createDate, queryDefinition) = resp masterId should be(networkQueryId) name should be(expectedQueryName) userId should be(authn.username) createDate should not be (null) //NB: I'm not sure why whacky namespaces were coming back from the resource; //this checks that the gist of the queryDef XML makes it back. //TODO: revisit this stripNamespaces(queryDefinition) should equal(stripNamespaces(expectedQueryDef.toI2b2String)) } case None => resp.isInstanceOf[ErrorResponse] should be(true) } } protected def doTestReadI2b2AdminPreviousQueries(request: ReadI2b2AdminPreviousQueriesRequest, expectedQueryMasters: Seq[QueryMaster]) { val currentHandler = handler val ReadPreviousQueriesResponse(queryMasters) = adminClient.readI2b2AdminPreviousQueries(request) if(expectedQueryMasters.isEmpty) { queryMasters.isEmpty should be(true) } else { - println(queryMasters) - println(expectedQueryMasters) - + (queryMasters zip expectedQueryMasters).foreach { case (queryMaster, expected) => queryMaster.createDate should not be(null) queryMaster.name should equal(expected.name) queryMaster.queryMasterId should equal(expected.queryMasterId) queryMaster.userId should equal(expected.userId) queryMaster.groupId should equal(expected.groupId) queryMaster.flagged should equal(expected.flagged) queryMaster.held should equal(expected.held) } } } } \ No newline at end of file diff --git a/apps/shrine-app/src/test/scala/net/shrine/wiring/ShrineConfigTest.scala b/apps/shrine-app/src/test/scala/net/shrine/wiring/ShrineConfigTest.scala index 23982a854..7eabb348e 100644 --- a/apps/shrine-app/src/test/scala/net/shrine/wiring/ShrineConfigTest.scala +++ b/apps/shrine-app/src/test/scala/net/shrine/wiring/ShrineConfigTest.scala @@ -1,132 +1,130 @@ package net.shrine.wiring import com.typesafe.config.ConfigFactory import net.shrine.authentication.AuthenticationType import net.shrine.authorization.AuthorizationType import net.shrine.broadcaster.NodeListParserTest import net.shrine.client.EndpointConfigTest import net.shrine.crypto.{KeyStoreType, SigningCertStrategy} import net.shrine.protocol.TestResultOutputTypes import net.shrine.util.ShouldMatchersForJUnit import org.junit.Test /** * @author clint * @since Feb 6, 2013 */ final class ShrineConfigTest extends ShouldMatchersForJUnit { private def shrineConfig(baseFileName: String, loadBreakdownsFile: Boolean = true) = { val baseConfig = ConfigFactory.load(baseFileName) val breakdownConfig = ConfigFactory.load("breakdowns") val config = if(loadBreakdownsFile) baseConfig.withFallback(breakdownConfig) else baseConfig ShrineConfig(config) } import scala.concurrent.duration._ @Test def testApply() { import NodeListParserTest.node import EndpointConfigTest.endpoint val conf = shrineConfig("shrine") conf.pmEndpoint should equal(endpoint("http://services.i2b2.org/i2b2/rest/PMService/getServices")) conf.ontEndpoint should equal(endpoint("http://example.com:9090/i2b2/rest/OntologyService/")) conf.adapterConfig.get.crcEndpoint should equal(endpoint("http://services.i2b2.org/i2b2/rest/QueryToolService/")) conf.queryEntryPointConfig.get.authenticationType should be(AuthenticationType.Ecommons) conf.queryEntryPointConfig.get.authorizationType should be(AuthorizationType.HmsSteward) conf.queryEntryPointConfig.get.sheriffEndpoint.get should equal(endpoint("http://localhost:8080/shrine-hms-authorization/queryAuthorization")) conf.queryEntryPointConfig.get.sheriffCredentials.get.domain should be(None) conf.queryEntryPointConfig.get.sheriffCredentials.get.username should be("sheriffUsername") conf.queryEntryPointConfig.get.sheriffCredentials.get.password should be("sheriffPassword") conf.humanReadableNodeName should equal("SHRINE Cell") conf.crcHiveCredentials should equal(conf.pmHiveCredentials) conf.crcHiveCredentials.domain should equal("HarvardDemo") conf.crcHiveCredentials.username should equal("demo") conf.crcHiveCredentials.password should equal("demouser") conf.crcHiveCredentials.projectId should equal("Demo") conf.ontHiveCredentials.domain should equal("HarvardDemo") conf.ontHiveCredentials.username should equal("demo") conf.ontHiveCredentials.password should equal("demouser") conf.ontHiveCredentials.projectId should equal("SHRINE") conf.ontHiveCredentials.domain should equal("HarvardDemo") conf.ontHiveCredentials.username should equal("demo") conf.ontHiveCredentials.password should equal("demouser") conf.ontHiveCredentials.projectId should equal("SHRINE") conf.queryEntryPointConfig.get.broadcasterIsLocal should be(false) conf.queryEntryPointConfig.get.broadcasterServiceEndpoint.get should equal(endpoint("http://example.com/shrine/rest/broadcaster/broadcast")) conf.queryEntryPointConfig.get.maxQueryWaitTime should equal(5.minutes) conf.queryEntryPointConfig.get.signingCertStrategy should equal(SigningCertStrategy.Attach) conf.adapterConfig.get.setSizeObfuscation should equal(true) conf.queryEntryPointConfig.get.includeAggregateResults should equal(false) conf.adapterConfig.get.adapterLockoutAttemptsThreshold should equal(10) conf.hubConfig.get.maxQueryWaitTime should equal(4.5.minutes) conf.adapterConfig.get.maxSignatureAge should equal(5.minutes) conf.adapterStatusQuery should equal("""\\SHRINE\SHRINE\Diagnoses\Mental Illness\Disorders usually diagnosed in infancy, childhood, or adolescence\Pervasive developmental disorders\Infantile autism, current or active state\""") conf.adapterConfig.get.adapterMappingsFileName should equal("AdapterMappings.xml") conf.shrineDatabaseType should equal("mysql") conf.keystoreDescriptor.file should be("shrine.keystore") conf.keystoreDescriptor.password should be("chiptesting") conf.keystoreDescriptor.privateKeyAlias should be(Some("test-cert")) conf.keystoreDescriptor.keyStoreType should be(KeyStoreType.PKCS12) conf.keystoreDescriptor.caCertAliases should be(Seq("carra ca")) conf.hubConfig.get.downstreamNodes.toSet should equal { Set( node("some hospital", "http://example.com/foo"), node("CHB", "http://example.com/chb"), node("PHS", "http://example.com/phs")) } conf.adapterConfig.get.immediatelyRunIncomingQueries should be(false) conf.breakdownResultOutputTypes should equal(TestResultOutputTypes.values) } @Test def testApplyOptionalFields() { val conf = shrineConfig("shrine-no-optional-configs", loadBreakdownsFile = false) - println(conf) - conf.adapterConfig should be(None) conf.hubConfig should be(None) conf.queryEntryPointConfig should be(None) conf.breakdownResultOutputTypes should be(Set.empty) } @Test def testApplySomeOptionalFields() { val conf = shrineConfig("shrine-some-optional-props") conf.queryEntryPointConfig.get.authenticationType should be(AuthenticationType.Pm) conf.queryEntryPointConfig.get.authorizationType should be(AuthorizationType.NoAuthorization) conf.queryEntryPointConfig.get.signingCertStrategy should be(SigningCertStrategy.DontAttach) conf.breakdownResultOutputTypes should equal(TestResultOutputTypes.values) } } \ No newline at end of file diff --git a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala index e59885567..14b9a33c6 100644 --- a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala +++ b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala @@ -1,76 +1,100 @@ package net.shrine.authorization import net.shrine.log.Loggable +import net.shrine.problem.{LoggingProblemHandler, Problem, ProblemSources, AbstractProblem, ProblemDigest} import scala.util.Try import net.shrine.client.HttpResponse import net.shrine.i2b2.protocol.pm.GetUserConfigurationRequest import net.shrine.i2b2.protocol.pm.User import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.ErrorResponse import scala.util.control.NonFatal /** * @author clint - * @date Apr 5, 2013 + * @since Apr 5, 2013 */ trait PmAuthorizerComponent { self: PmHttpClientComponent with Loggable => import PmAuthorizerComponent._ + //noinspection RedundantBlock object Pm { def parsePmResult(authn: AuthenticationInfo)(httpResponse: HttpResponse): Try[Either[ErrorResponse, User]] = { User.fromI2b2(httpResponse.body).map(Right(_)).recoverWith { case NonFatal(e) => { debug(s"Couldn't extract a User from '$httpResponse'") Try(Left(ErrorResponse.fromI2b2(httpResponse.body))) } }.recover { case NonFatal(e) => { - warn(s"Couldn't understand response from PM: '$httpResponse'", e) - - Left(ErrorResponse(s"Error authorizing ${authn.domain}:${authn.username}: ${e.getMessage}")) + val problem = CouldNotInterpretResponseFromPmCell(pmPoster.url,authn,httpResponse,e) + LoggingProblemHandler.handleProblem(problem) + Left(ErrorResponse(problem.summary,Some(problem))) } } } def authorize(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo): AuthorizationStatus = { val request = GetUserConfigurationRequest(authn) val responseAttempt = Try { debug(s"Authorizing with PM cell at ${pmPoster.url}") pmPoster.post(request.toI2b2String) } val authStatusAttempt = responseAttempt.flatMap(parsePmResult(authn)).map { case Right(user) => { val managerUserOption = for { roles <- user.rolesByProject.get(projectId) if neededRoles.forall(roles.contains) } yield user - managerUserOption.map(Authorized(_)).getOrElse { - NotAuthorized(s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'") + managerUserOption.map(Authorized).getOrElse { + NotAuthorized(MissingRequiredRoles(projectId,neededRoles,authn)) } } - case Left(ErrorResponse(message)) => NotAuthorized(message) + case Left(errorResponse) => { + //todo remove when ErrorResponse gets its message + info(s"ErrorResponse message '${errorResponse.errorMessage}' may not have carried through to the NotAuthorized object") + NotAuthorized(errorResponse.problemDigest) + } } authStatusAttempt.getOrElse { - NotAuthorized(s"Error authorizing ${authn.domain}:${authn.username} with PM at ${pmPoster.url}") + NotAuthorized(CouldNotReachPmCell(pmPoster.url,authn)) } } } } object PmAuthorizerComponent { sealed trait AuthorizationStatus case class Authorized(user: User) extends AuthorizationStatus - case class NotAuthorized(reason: String) extends AuthorizationStatus { - def toErrorResponse = ErrorResponse(reason) + case class NotAuthorized(problemDigest: ProblemDigest) extends AuthorizationStatus { + def toErrorResponse = ErrorResponse(problemDigest.summary,problemDigest) + } + + object NotAuthorized { + def apply(problem:Problem):NotAuthorized = NotAuthorized(problem.toDigest) } +} + +case class MissingRequiredRoles(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep) { + override val summary: String = s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'" +} + +case class CouldNotReachPmCell(pmUrl:String,authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep) { + override def summary: String = s"Could not reach PM cell at $pmUrl for ${authn.domain}:${authn.username}" +} + +case class CouldNotInterpretResponseFromPmCell(pmUrl:String,authn: AuthenticationInfo,httpResponse: HttpResponse,x:Throwable) extends AbstractProblem(ProblemSources.Qep) { + override def summary: String = s"Exception while interpreting response from PM cell at ${pmUrl} for ${authn.domain}:${authn.username}: $httpResponse" + + override def throwable = Some(x) } \ No newline at end of file diff --git a/commons/auth/src/test/scala/net/shrine/authorization/PmAuthorizerComponentTest.scala b/commons/auth/src/test/scala/net/shrine/authorization/PmAuthorizerComponentTest.scala index 1f0878448..e486c523e 100644 --- a/commons/auth/src/test/scala/net/shrine/authorization/PmAuthorizerComponentTest.scala +++ b/commons/auth/src/test/scala/net/shrine/authorization/PmAuthorizerComponentTest.scala @@ -1,197 +1,199 @@ package net.shrine.authorization import net.shrine.log.Loggable +import net.shrine.problem.ProblemNotInCodec import org.junit.Test import net.shrine.util.ShouldMatchersForJUnit import net.shrine.client.HttpClient import net.shrine.i2b2.protocol.pm.User import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.Credential import net.shrine.util.XmlUtil import net.shrine.client.Poster -import net.shrine.authorization.PmAuthorizerComponent.Authorized -import net.shrine.authorization.PmAuthorizerComponent.NotAuthorized /** * @author clint - * @date Apr 5, 2013 + * @since Apr 5, 2013 */ +//noinspection UnitMethodIsParameterless,UnitMethodIsParameterless,EmptyParenMethodAccessedAsParameterless,ScalaUnnecessaryParentheses final class PmAuthorizerComponentTest extends ShouldMatchersForJUnit { import PmAuthorizerComponentTest._ import PmAuthorizerComponent._ //Adds a bogus URL, for convenience private[this] def poster(httpClient: HttpClient) = Poster("", httpClient) @Test def testGoodResponseNotManager { val component = new TestPmAuthorizerComponent(poster(new LazyMockHttpClient(validUserResponseXml.toString))) - val NotAuthorized(reason) = component.Pm.authorize(projectId1, Set(User.Roles.Manager), authn) + val NotAuthorized(problemDigest) = component.Pm.authorize(projectId1, Set(User.Roles.Manager), authn) - reason should not be(null) - reason.size should not be(0) + problemDigest should not be(null) + problemDigest.codec should be (classOf[MissingRequiredRoles].getName) } @Test def testGoodResponseIsManager { val component = new TestPmAuthorizerComponent(poster(new LazyMockHttpClient(validUserResponseXml.toString))) val Authorized(user) = component.Pm.authorize(projectId2, Set(User.Roles.Manager), authn) user should not be(null) user.fullName should be(fullName) user.username should be(username) user.domain should be(domain) user.credential should be(authn.credential) } @Test def testErrorResponse { val component = new TestPmAuthorizerComponent(poster(new LazyMockHttpClient(i2b2ErrorXml.toString))) - val NotAuthorized(reason) = component.Pm.authorize(projectId1, Set.empty, authn) - - reason should be(errorMessage) + val NotAuthorized(problemDigest) = component.Pm.authorize(projectId1, Set.empty, authn) + + problemDigest should not be(null) + + problemDigest.codec should be (classOf[CouldNotInterpretResponseFromPmCell].getName) } @Test def testJunkResponse { val component = new TestPmAuthorizerComponent(poster(new LazyMockHttpClient("jahfskajhkjashfdkjashkfd"))) - val NotAuthorized(reason) = component.Pm.authorize(projectId1, Set.empty, authn) - - reason should not be(null) - reason.size should not be(0) - } + val NotAuthorized(problemDigest) = component.Pm.authorize(projectId1, Set.empty, authn) + + problemDigest should not be(null) + problemDigest.codec should be (classOf[CouldNotReachPmCell].getName) + } @Test - def testResponseThatBlowsUp { + def testResponseThatBlowsUp() { val component = new TestPmAuthorizerComponent(poster(new LazyMockHttpClient(throw new Exception with scala.util.control.NoStackTrace))) - val NotAuthorized(reason) = component.Pm.authorize(projectId1, Set.empty, authn) - - reason should not be(null) - reason.size should not be(0) + val NotAuthorized(problemDigest) = component.Pm.authorize(projectId1, Set.empty, authn) + + problemDigest should not be(null) + problemDigest.codec should be (classOf[CouldNotReachPmCell].getName) } private val fullName = "Some Person" private val username = "some-user" private val domain = "some-place" private val password = "sal;dk;aslkd;" private val errorMessage = "Something blew up" - private lazy val authn = AuthenticationInfo(domain, username, Credential(password, true)) + private lazy val authn = AuthenticationInfo(domain, username, Credential(password, isToken = true)) private val projectId1 = "foo" private val projectId2 = "bar" private val params1 = Map("x" -> "1", "y" -> "2") private val params2 = Map("y" -> "2", "z" -> "3") private val roles1 = Set("a", "b", "c") private val roles2 = Set("MANAGER", "x", "y") private lazy val projects = Seq((projectId1, params1, roles1), (projectId2, params2, roles2)) private lazy val validUserResponseXml = XmlUtil.stripWhitespace { 1.1 2.4 SHRINE 1.3-compatible SHRINE 2011-04-08T16:21:12.251-04:00 DONE DEVELOPMENT http://www.i2b2.org {fullName} {username} {password} {domain} { projects.map { case (projectId, params, roles) => Demo Group { projectId } http://www.i2b2.org { params.map { case (name, value) => { value } } } { roles.map(r => { r }) } } } Data Repository http://localhost/crc REST Ontology Cell http://localhost/ont REST false 200 false } private val i2b2ErrorXml = XmlUtil.stripWhitespace { 1.1 2.4 SHRINE 1.3-compatible SHRINE 2011-04-08T16:21:12.251-04:00 { errorMessage } } } object PmAuthorizerComponentTest { final class TestPmAuthorizerComponent(override val pmPoster: Poster) extends PmAuthorizerComponent with PmHttpClientComponent with Loggable } \ No newline at end of file diff --git a/commons/ont-support/src/main/scala/net/shrine/ont/data/ShrineSqlOntologyDao.scala b/commons/ont-support/src/main/scala/net/shrine/ont/data/ShrineSqlOntologyDao.scala index fdf5e1c10..787d0f451 100644 --- a/commons/ont-support/src/main/scala/net/shrine/ont/data/ShrineSqlOntologyDao.scala +++ b/commons/ont-support/src/main/scala/net/shrine/ont/data/ShrineSqlOntologyDao.scala @@ -1,80 +1,81 @@ package net.shrine.ont.data import net.shrine.ont.messaging.Concept import scala.io.Source import scala.util.matching.Regex import scala.util.matching.Regex.Match import java.io.InputStream /** * @author Clint Gilbert * @date Feb 8, 2012 * */ final class ShrineSqlOntologyDao(val file: InputStream) extends OntologyDao { require(file != null) override def ontologyEntries: Iterable[Concept] = { //Matches VALUES (99, '', 'synonym', '' //where is_synonym is 'Y' or 'N' val pathAndSynonymRegex = """VALUES\s+\(\d+,\s+'(.+?)',\s+'(.+?)',\s+'(\w)',\s+'(.+?)',\s+(NULL|'.*?'),\s+(NULL|'(.*?)')""".r def mungeBaseCode(rawBaseCode: String): Option[String] = { val icd9BaseCodePrefix = """SHRINE|ICD9:""" def isNotNull(s: String) = s != "NULL" def isNotEmpty(s: String) = !s.isEmpty def isAcceptable(s: String): Boolean = !s.isEmpty && s != "NULL" && s.startsWith(icd9BaseCodePrefix) Option(rawBaseCode).filter(isAcceptable).map(_.drop(icd9BaseCodePrefix.size)) } def toConcept(termMatch: Match): Concept = { /*val synonym = termMatch.group(3) match { case "Y" => Option(termMatch.group(2)) case "N" => None }*/ //Ignore synonym_cd (y/n) field. //NB: Do this because medications are stored as coded names (..\CV000\CV350\203144\, etc) //but with synonyms containing human-readable names for the drugs ('pravastatin sodium', etc). //Crucially, synonym_cd is 'N' in this case (not sure why), and we don't want those synonyms to //be ignored. This has the effect of making medications' names available when searching an index //of terms, and also makes the relevance ranking "better", in that simple terms "male", "female" //are higher up the list. Some terms now have a redundant synonym now, but this is an ok tradeoff. val synonym = Option(termMatch.group(2)) val rawPath = termMatch.group(1) val rawBaseCode = termMatch.group(7) //Need to add '\\SHRINE' that's missing from the SQL file, but needed by i2b2 Concept("""\\SHRINE""" + rawPath, synonym, mungeBaseCode(rawBaseCode)) } parseWith(pathAndSynonymRegex, toConcept) } private def parseWith(regex: Regex, parser: Match => Concept): Iterable[Concept] = { def parseLine(line: String): Option[Concept] = { val result = regex.findFirstMatchIn(line).map(parser) if(result.isEmpty) { + //todo found this scary bit println("Failed to parse line: " + line) } result } def noEmptyLines(line: String) = line != null && line.trim != "" def mungeSingleQuotes(line: String) = line.replace("''", "'") val source = Source.fromInputStream(file) source.getLines.filter(noEmptyLines)/*.map(mungeSingleQuotes)*/.flatMap(parseLine).toIterable } } diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala b/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala index c79abebc0..0dbc8ffa2 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala @@ -1,100 +1,124 @@ package net.shrine.protocol -import xml.NodeSeq +import net.shrine.problem.{LoggingProblemHandler, Problem, ProblemNotInCodec, ProblemDigest} + +import scala.xml.{NodeBuffer, NodeSeq} import net.shrine.util.XmlUtil import net.shrine.serialization.XmlUnmarshaller import net.shrine.serialization.I2b2Unmarshaller import net.shrine.util.NodeSeqEnrichments import scala.util.Try import scala.util.control.NonFatal /** * @author Bill Simons * @since 4/25/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html * * NB: Now a case class for structural equality */ -final case class ErrorResponse(errorMessage: String) extends ShrineResponse { - - //todo codec id, one-liner, medium message, detailed message +final case class ErrorResponse(errorMessage: String,problemDigest:ProblemDigest) extends ShrineResponse { - override protected def status = { errorMessage } + override protected def status: NodeSeq = { + val buffer = new NodeBuffer + buffer += { errorMessage } + buffer += problemDigest.toXml + } override protected def i2b2MessageBody = null import ErrorResponse.rootTagName - override def toXml = XmlUtil.stripWhitespace { - XmlUtil.renameRootTag(rootTagName) { - - //todo this is where the ProblemDigest goes. - - { errorMessage } - - + override def toXml = { XmlUtil.stripWhitespace { + val xml = XmlUtil.renameRootTag(rootTagName) { + + { errorMessage } + {problemDigest.toXml} + + } + xml } } } object ErrorResponse extends XmlUnmarshaller[ErrorResponse] with I2b2Unmarshaller[ErrorResponse] with HasRootTagName { val rootTagName = "errorResponse" + //todo deprecate this one + def apply(errorMessage:String,problem:Option[Problem] = None) = { + new ErrorResponse(errorMessage,problem.fold{ + val problem = ProblemNotInCodec(s"'$errorMessage'") + LoggingProblemHandler.handleProblem(problem) //todo someday hook up to the proper problem handler hierarchy. + problem.toDigest + }(p => p.toDigest)) + } + + def apply(problem:Problem) = { + new ErrorResponse(problem.summary,problem.toDigest) + } override def fromXml(xml: NodeSeq): ErrorResponse = { + val messageXml = xml \ "message" //NB: Fail fast require(messageXml.nonEmpty) - ErrorResponse(XmlUtil.trim(messageXml)) + val problemDigest = ProblemDigest.fromXml(xml) + + ErrorResponse(XmlUtil.trim(messageXml),problemDigest) } override def fromI2b2(xml: NodeSeq): ErrorResponse = { import NodeSeqEnrichments.Strictness._ + //todo what determines parseFormatA vs parseFormatB when written? It looks like our ErrorResponses use A. + def parseFormatA: Try[ErrorResponse] = { for { statusXml <- xml withChild "response_header" withChild "result_status" withChild "status" - typeText <- statusXml attribute "type" - if typeText == "ERROR" //NB: Fail fast - statusMessage = XmlUtil.trim(statusXml) + resultStatusXml <- xml withChild "response_header" withChild "result_status" + typeText <- statusXml attribute "type" if typeText == "ERROR" //NB: Fail fast{ + statusMessage = XmlUtil.trim(statusXml) + problemDigest = ProblemDigest.fromXml(resultStatusXml) } yield { - ErrorResponse(statusMessage) + ErrorResponse(statusMessage,problemDigest) } } def parseFormatB: Try[ErrorResponse] = { for { conditionXml <- xml withChild "message_body" withChild "response" withChild "status" withChild "condition" typeText <- conditionXml attribute "type" if typeText == "ERROR" statusMessage = XmlUtil.trim(conditionXml) } yield { ErrorResponse(statusMessage) } } - parseFormatA.recoverWith { case NonFatal(e) => parseFormatB }.get + parseFormatA.recoverWith { case NonFatal(e) => { + e.printStackTrace() //todo log instead + parseFormatB + } }.get } /** * * * * * * Query result instance id 3126 not found * * * * */ } \ No newline at end of file diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala index 04ce64adb..56c978628 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala @@ -1,351 +1,351 @@ package net.shrine.protocol import javax.xml.datatype.XMLGregorianCalendar import net.shrine.problem.{Problem, ProblemNotInCodec, ProblemDigest} import net.shrine.protocol.QueryResult.StatusType import scala.xml.NodeSeq import net.shrine.util.{Tries, XmlUtil, NodeSeqEnrichments, SEnum, XmlDateHelper, OptionEnrichments} import net.shrine.serialization.{ I2b2Marshaller, XmlMarshaller } import scala.util.Try /** * @author Bill Simons * @since 4/15/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html * * NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing */ final case class QueryResult( resultId: Long, instanceId: Long, resultType: Option[ResultOutputType], setSize: Long, startDate: Option[XMLGregorianCalendar], endDate: Option[XMLGregorianCalendar], description: Option[String], statusType: StatusType, statusMessage: Option[String], problemDigest: Option[ProblemDigest] = None, breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty ) extends XmlMarshaller with I2b2Marshaller { def this( resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, statusType: QueryResult.StatusType) = { this( resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), None, //description statusType, None) //statusMessage } def this( resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, description: String, statusType: QueryResult.StatusType) = { this( resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), Option(description), statusType, None) //statusMessage } def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match { case Some(rt) => rt == testedResultType case _ => false } import QueryResult._ //NB: Fragile, non-type-safe == def isError = statusType == StatusType.Error def elapsed: Option[Long] = { def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis for { start <- startDate end <- endDate } yield inMillis(end) - inMillis(start) } //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = { breakdowns.values.toSeq.sortBy(_.resultType.name) } override def toI2b2: NodeSeq = { import OptionEnrichments._ XmlUtil.stripWhitespace { { resultId } { instanceId } { description.toXml() } { resultType match { case Some(rt) if !rt.isError => //noinspection RedundantBlock { if (rt.isBreakdown) { rt.toI2b2NameOnly() } else { rt.toI2b2 } } case _ => ResultOutputType.ERROR.toI2b2NameOnly("") } } { setSize } { startDate.toXml() } { endDate.toXml() } { statusType } { statusType.toI2b2(this) } { //NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case, //so if we're going to do that, let's produce saner XML. sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data")) } } } override def toXml: NodeSeq = XmlUtil.stripWhitespace { import OptionEnrichments._ { resultId } { instanceId } { resultType.toXml(_.toXml) } { setSize } { startDate.toXml() } { endDate.toXml() } { description.toXml() } { statusType } { statusMessage.toXml() } { //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. sortedBreakdowns.map(_.toXml) } { problemDigest.map(_.toXml).getOrElse("") } } def withId(id: Long): QueryResult = copy(resultId = id) def withInstanceId(id: Long): QueryResult = copy(instanceId = id) def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize)) def withSetSize(size: Long): QueryResult = copy(setSize = size) def withDescription(desc: String): QueryResult = copy(description = Option(desc)) def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType)) def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData)) def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns) } object QueryResult { final case class StatusType( name: String, isDone: Boolean, i2b2Id: Option[Int] = Some(-1), private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2) extends StatusType.Value { def isError = this == StatusType.Error def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult) } object StatusType extends SEnum[StatusType] { private val defaultToI2b2: QueryResult => NodeSeq = { queryResult => val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{ throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id") } { i2b2Id }{ queryResult.statusType.name } } val noMessage:NodeSeq = null val Error = StatusType("ERROR", isDone = true, None, { queryResult => (queryResult.statusMessage, queryResult.problemDigest) match { case (Some(msg),Some(pd)) => { msg } ++ pd.toXml case (Some(msg),None) => { msg } case (None,Some(pd)) => pd.toXml case (None, None) => noMessage } }) /* msg => net.shrine.something.is.Broken

Something is borked { msg }
Herein is a stack trace, multiple lines
)) */ val Finished = StatusType("FINISHED", isDone = true, Some(3)) //TODO: Can we use the same for Queued, Processing, and Incomplete? val Processing = StatusType("PROCESSING", isDone = false, Some(2)) val Queued = StatusType("QUEUED", isDone = false, Some(2)) val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2)) //TODO: What s should these have? Does anyone care? val Held = StatusType("HELD", isDone = false) val SmallQueue = StatusType("SMALL_QUEUE", isDone = false) val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false) val LargeQueue = StatusType("LARGE_QUEUE", isDone = false) val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false) } def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _) def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = { import ResultOutputType.valueOf val typeName = asText(elemNames: _*)(xml) valueOf(typeName) orElse valueOf(breakdownTypes)(typeName) } def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = { val attempt = parse(xml) attempt.toOption } def extractProblemDigest(xml: NodeSeq):Option[ProblemDigest] = { val subXml = xml \ "problem" - if(subXml.nonEmpty) Some(ProblemDigest.fromXml(subXml)) + if(subXml.nonEmpty) Some(ProblemDigest.fromXml(xml)) else None } def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def extract(elemName: String): Option[String] = { Option((xml \ elemName).text.trim).filter(!_.isEmpty) } def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate) val asLong = extractLong(xml) _ import NodeSeqEnrichments.Strictness._ import Tries.sequence def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = { //noinspection ScalaUnnecessaryParentheses val mapAttempt = for { subXml <- xml.withChild(elemName) envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes))) mappings = envelopes.map(envelope => (envelope.resultType -> envelope)) } yield Map.empty ++ mappings mapAttempt.getOrElse(Map.empty) } QueryResult( resultId = asLong("resultId"), instanceId = asLong("instanceId"), resultType = extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml), setSize = asLong("setSize"), startDate = extractDate("startDate"), endDate = extractDate("endDate"), description = extract("description"), statusType = StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call statusMessage = extract("statusMessage"), problemDigest = extractProblemDigest(xml), breakdowns = extractBreakdowns("resultEnvelope") ) } def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def asLong = extractLong(xml) _ def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim) def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate) QueryResult( resultId = asLong("result_instance_id"), instanceId = asLong("query_instance_id"), resultType = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2), setSize = asLong("set_size"), startDate = asXmlGcOption("start_date"), endDate = asXmlGcOption("end_date"), description = asTextOption("description"), statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get, //TODO: Avoid fragile .get call statusMessage = asTextOption("query_status_type", "description"), problemDigest = extractProblemDigest(xml \ "query_status_type")) } def errorResult(description: Option[String], statusMessage: String,problem:Option[Problem] = None):QueryResult = { val problemDigest = problem.getOrElse(ProblemNotInCodec(statusMessage)).toDigest QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } /** * For reconsitituting errorResults from a database */ def errorResult(description:Option[String], statusMessage:String, codec:String, summary:String, digestDescription:String,details:String): QueryResult = { val problemDigest = ProblemDigest(codec,summary,digestDescription,details) QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } } \ No newline at end of file diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/ShrineResponse.scala b/commons/protocol/src/main/scala/net/shrine/protocol/ShrineResponse.scala index 88548c077..1af4b2de7 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/ShrineResponse.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/ShrineResponse.scala @@ -1,88 +1,89 @@ package net.shrine.protocol import net.shrine.serialization.XmlMarshaller import net.shrine.serialization.XmlUnmarshaller -import scala.xml.NodeSeq +import scala.xml.{Elem, NodeSeq} import net.shrine.serialization.I2b2Marshaller import net.shrine.util.XmlUtil import scala.util.Try /** * @author clint - * @date Nov 5, 2012 + * @since Nov 5, 2012 */ trait ShrineResponse extends BaseShrineResponse with I2b2Marshaller { protected def i2b2MessageBody: NodeSeq - protected def status = DONE + protected def status:NodeSeq = DONE //TODO better xmlns strategy override def toI2b2: NodeSeq = XmlUtil.stripWhitespace { 1.1 2.4 SHRINE 1.3-compatible SHRINE 2011-04-08T16:21:12.251-04:00 { status } { i2b2MessageBody } } + //todo start here Monday. figure out I2B2 xml for this } object ShrineResponse { private def lift(unmarshaller: XmlUnmarshaller[ShrineResponse]): Set[ResultOutputType] => NodeSeq => Try[ShrineResponse] = { _ => xml => Try(unmarshaller.fromXml(xml)) } private def lift(unmarshal: Set[ResultOutputType] => NodeSeq => ShrineResponse): Set[ResultOutputType] => NodeSeq => Try[ShrineResponse] = { knownTypes => xml => Try(unmarshal(knownTypes)(xml)) } private val unmarshallers = Map[String, Set[ResultOutputType] => NodeSeq => Try[ShrineResponse]]( "deleteQueryResponse" -> lift(DeleteQueryResponse), "readPreviousQueriesResponse" -> lift(ReadPreviousQueriesResponse), "readQueryDefinitionResponse" -> lift(ReadQueryDefinitionResponse), "readQueryInstancesResponse" -> lift(ReadQueryInstancesResponse), "renameQueryResponse" -> lift(RenameQueryResponse), "readInstanceResultsResponse" -> lift(ReadInstanceResultsResponse.fromXml _), "aggregatedReadInstanceResultsResponse" -> lift(AggregatedReadInstanceResultsResponse.fromXml _), "runQueryResponse" -> RunQueryResponse.fromXml _, "aggregatedRunQueryResponse" -> AggregatedRunQueryResponse.fromXml _, "readQueryResultResponse" -> lift(ReadQueryResultResponse.fromXml _), "aggregatedReadQueryResultResponse" -> lift(AggregatedReadQueryResultResponse.fromXml _), ErrorResponse.rootTagName -> lift(ErrorResponse), ReadApprovedQueryTopicsResponse.rootTagName -> lift(ReadApprovedQueryTopicsResponse), ReadPdoResponse.rootTagName -> lift(ReadPdoResponse), ReadResultResponse.rootTagName -> ReadResultResponse.fromXml _, FlagQueryResponse.rootTagName -> (_ => xml => FlagQueryResponse.fromXml(xml)), UnFlagQueryResponse.rootTagName -> (_ => xml => UnFlagQueryResponse.fromXml(xml))) def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[ShrineResponse] = { xml match { case null => scala.util.Failure(new IllegalArgumentException("null xml passed in")) case _ => for { rootTag <- Try(xml.head) rootTagName = rootTag.label if unmarshallers.contains(rootTagName) unmarshal = unmarshallers(rootTagName) unmarshalled <- unmarshal(breakdownTypes)(xml) } yield unmarshalled } } } \ No newline at end of file diff --git a/commons/protocol/src/test/scala/net/shrine/protocol/ErrorResponseTest.scala b/commons/protocol/src/test/scala/net/shrine/protocol/ErrorResponseTest.scala index 2991bcdcd..6be128993 100644 --- a/commons/protocol/src/test/scala/net/shrine/protocol/ErrorResponseTest.scala +++ b/commons/protocol/src/test/scala/net/shrine/protocol/ErrorResponseTest.scala @@ -1,147 +1,151 @@ package net.shrine.protocol import junit.framework.TestCase +import net.shrine.problem.TestProblem import net.shrine.util.ShouldMatchersForJUnit import org.junit.Test import scala.xml.NodeSeq import net.shrine.util.XmlUtil /** * @author clint * @since Apr 5, 2013 */ final class ErrorResponseTest extends TestCase with ShouldMatchersForJUnit { val message = "foo" - val resp = ErrorResponse(message) + val resp = ErrorResponse(message,Some(TestProblem)) val expectedShrineXml = XmlUtil.stripWhitespace { { message } + {TestProblem.toDigest.toXml} } val expectedI2b2Xml = XmlUtil.stripWhitespace { 1.1 2.4 SHRINE 1.3-compatible SHRINE 2011-04-08T16:21:12.251-04:00 { message } + {TestProblem.toDigest.toXml} } @Test def testToXml() = doTestToXml(expectedShrineXml, _.toXml) @Test def testToI2b2() = doTestToXml(expectedI2b2Xml, _.toI2b2) @Test def testToXmlRoundTrip() = doTestRoundTrip(_.toXml, ErrorResponse.fromXml) @Test def testToI2b2RoundTrip() = doTestRoundTrip(_.toI2b2, ErrorResponse.fromI2b2) @Test def testFromXml() = doTestFromXml(expectedShrineXml, ErrorResponse.fromXml) @Test def testFromI2b2() = doTestFromXml(expectedI2b2Xml, ErrorResponse.fromI2b2) //NB: See https://open.med.harvard.edu/jira/browse/SHRINE-745 + @Test def testFromI2b2AlternateFormat() { val altI2b2Xml = XmlUtil.stripWhitespace { 1.1 2.4 edu.harvard.i2b2.crc 1.5 i2b2 Hive i2b2_QueryTool 0.2 i2b2 Hive 1 i2b2 Log information DONE Query result instance id 3126 not found } val resp = ErrorResponse.fromI2b2(altI2b2Xml) resp should not be null resp.errorMessage should equal("Query result instance id 3126 not found") } private def doTestFromXml(xml: NodeSeq, deserialize: NodeSeq => ErrorResponse) { intercept[Exception] { deserialize(null) } intercept[Exception] { deserialize() } intercept[Exception] { //Correct I2b2 XML structure, wrong status type deserialize({ message }) } deserialize(xml) should equal(resp) } private def doTestToXml(expected: NodeSeq, serialize: ErrorResponse => NodeSeq) { val xml = serialize(resp) -//todo turn this back on xml.toString should equal(expected.toString) + xml.toString should equal(expected.toString()) } private def doTestRoundTrip(serialize: ErrorResponse => NodeSeq, deserialize: NodeSeq => ErrorResponse) { val unmarshalled = deserialize(serialize(resp)) unmarshalled should equal(resp) } } \ No newline at end of file diff --git a/commons/protocol/src/test/scala/net/shrine/protocol/ShrineResponseTest.scala b/commons/protocol/src/test/scala/net/shrine/protocol/ShrineResponseTest.scala index d376be18b..d676157a4 100644 --- a/commons/protocol/src/test/scala/net/shrine/protocol/ShrineResponseTest.scala +++ b/commons/protocol/src/test/scala/net/shrine/protocol/ShrineResponseTest.scala @@ -1,104 +1,106 @@ package net.shrine.protocol +import net.shrine.problem.TestProblem + import scala.xml.NodeSeq import org.junit.Test import net.shrine.util.ShouldMatchersForJUnit import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.query.Term import net.shrine.util.XmlDateHelper import net.shrine.util.XmlUtil import scala.util.Success /** * @author clint * @since Nov 5, 2012 */ //noinspection UnitMethodIsParameterless,NameBooleanParameters,ScalaUnnecessaryParentheses final class ShrineResponseTest extends ShouldMatchersForJUnit { @Test def testFromXml { //ShrineResponse.fromXml(null: String).isFailure should be(true) ShrineResponse.fromXml(DefaultBreakdownResultOutputTypes.toSet)(null: NodeSeq).isFailure should be(true) ShrineResponse.fromXml(DefaultBreakdownResultOutputTypes.toSet)(NodeSeq.Empty).isFailure should be(true) def roundTrip(response: ShrineResponse): Unit = { val unmarshalled = ShrineResponse.fromXml(DefaultBreakdownResultOutputTypes.toSet)(response.toXml) unmarshalled.get.getClass should equal(response.getClass) unmarshalled should not be (null) unmarshalled should equal(Success(response)) } val queryResult1 = QueryResult( resultId = 1L, instanceId = 2342L, resultType = Some(ResultOutputType.PATIENT_COUNT_XML), setSize = 123L, startDate = None, endDate = None, description = None, statusType = QueryResult.StatusType.Finished, statusMessage = None) roundTrip(ReadQueryResultResponse(123L, queryResult1)) roundTrip(AggregatedReadQueryResultResponse(123L, Seq(queryResult1))) roundTrip(DeleteQueryResponse(123L)) roundTrip(ReadInstanceResultsResponse(2342L, queryResult1)) roundTrip(AggregatedReadInstanceResultsResponse(2342L, Seq(queryResult1))) roundTrip(ReadPreviousQueriesResponse(Seq(QueryMaster("queryMasterId", 12345L, "name", "userId", "groupId", XmlDateHelper.now, Some(false))))) roundTrip(ReadQueryDefinitionResponse(8457L, "name", "userId", XmlDateHelper.now, "queryDefXml")) roundTrip(ReadQueryInstancesResponse(12345L, "userId", "groupId", Seq.empty)) roundTrip(RenameQueryResponse(12345L, "name")) roundTrip(RunQueryResponse(38957L, XmlDateHelper.now, "userId", "groupId", QueryDefinition("foo", Term("bar")), 2342L, queryResult1)) roundTrip(AggregatedRunQueryResponse(38957L, XmlDateHelper.now, "userId", "groupId", QueryDefinition("foo", Term("bar")), 2342L, Seq(queryResult1))) roundTrip(UnFlagQueryResponse) roundTrip(FlagQueryResponse) - roundTrip(ErrorResponse("errorMessage")) + roundTrip(ErrorResponse("errorMessage",Some(TestProblem))) } @Test def testToXml { val response = new FooResponse response.toXmlString should equal("") } @Test def testToI2b2 { val expected = XmlUtil.stripWhitespace( 1.1 2.4 SHRINE 1.3-compatible SHRINE 2011-04-08T16:21:12.251-04:00 DONE ) val response = new FooResponse response.toI2b2String should equal(expected.toString()) } private final class FooResponse extends ShrineResponse { protected override def i2b2MessageBody = override def toXml = i2b2MessageBody } } \ No newline at end of file diff --git a/commons/util/src/main/scala/net/shrine/problem/Problem.scala b/commons/util/src/main/scala/net/shrine/problem/Problem.scala index f38c4f9ea..244a62751 100644 --- a/commons/util/src/main/scala/net/shrine/problem/Problem.scala +++ b/commons/util/src/main/scala/net/shrine/problem/Problem.scala @@ -1,138 +1,150 @@ package net.shrine.problem import java.net.{InetAddress, ConnectException} import java.util.Date import net.shrine.log.Loggable import net.shrine.serialization.{XmlUnmarshaller, XmlMarshaller} -import scala.xml.NodeSeq +import scala.xml.{Node, NodeSeq} /** * Describes what information we have about a problem at the site in code where we discover it. * * @author david * @since 8/6/15 */ trait Problem { def summary:String def problemName = getClass.getName def throwable:Option[Throwable] = None def stamp:Stamp def description = s"$summary ${stamp.pretty}" def throwableDetail = throwable.map(x => x.getStackTrace.mkString(sys.props("line.separator"))) def details:String = s"$description ${throwableDetail.getOrElse("")}" def toDigest:ProblemDigest = ProblemDigest(problemName,summary,description,details) } case class ProblemDigest(codec:String,summary:String,description:String,details:String) extends XmlMarshaller { - override def toXml: NodeSeq = { + override def toXml: Node = { {codec} {summary} {description}
{details}
} } object ProblemDigest extends XmlUnmarshaller[ProblemDigest] with Loggable { def apply(oldMessage:String):ProblemDigest = { val ex = new IllegalStateException(s"'$oldMessage' detected, not in codec. Please report this problem and stack trace to Shrine dev.") ex.fillInStackTrace() warn(ex) ProblemDigest("ProblemNotInCodec",oldMessage,"","") } override def fromXml(xml: NodeSeq): ProblemDigest = { - def extractText(tagName:String) = (xml \ tagName).text + val problemNode = xml \ "problem" + require(problemNode.nonEmpty,s"No problem tag in $xml") + + def extractText(tagName:String) = { + val t = (problemNode \ tagName).text + require(t.nonEmpty) + t + } val codec = extractText("codec") val summary = extractText("summary") val description = extractText("description") val details = extractText("details") ProblemDigest(codec,summary,description,details) } } case class Stamp(host:InetAddress,time:Long,source:ProblemSources.ProblemSource) { def pretty = s"at ${new Date(time)} on $host ${source.pretty}" } object Stamp { def apply(source:ProblemSources.ProblemSource): Stamp = Stamp(InetAddress.getLocalHost,System.currentTimeMillis(),source) } abstract class AbstractProblem(source:ProblemSources.ProblemSource) extends Problem { val stamp = Stamp(source) } trait ProblemHandler { def handleProblem(problem:Problem) } /** * An example problem handler */ object LoggingProblemHandler extends ProblemHandler with Loggable { override def handleProblem(problem: Problem): Unit = { problem.throwable.fold(error(problem.toString))(throwable => error(problem.toString,throwable) ) } } object ProblemSources{ sealed trait ProblemSource { //todo name without $ def pretty = getClass.getSimpleName } case object Adapter extends ProblemSource case object Hub extends ProblemSource case object Qep extends ProblemSource case object Dsa extends ProblemSource case object Unknown extends ProblemSource def problemSources = Set(Adapter,Hub,Qep,Dsa,Unknown) } -case class ProblemNotInCodec(summary:String) extends AbstractProblem(ProblemSources.Unknown){ - override val throwable = { - val x = new IllegalStateException(s"$summary , is not yet in the codec.") - x.fillInStackTrace() - Option(x) - } +case class ProblemNotInCodec(summary:String,t:Throwable) extends AbstractProblem(ProblemSources.Unknown){ + override val throwable = Some(t) override val description = s"${super.description} . This error is not yet in the codec. Please report the stack trace to the Shrine development team at TODO" } +object ProblemNotInCodec { + + def apply(summary:String):ProblemNotInCodec = { + val x = new IllegalStateException(s"$summary , is not yet in the codec.") + x.fillInStackTrace() + new ProblemNotInCodec(summary,x) + } +} + /** * For "Failure querying node 'SITE NAME': java.net.ConnectException: Connection refused" * * This one is interesting because "Connection refused" is different from "Connection timed out" according to Keith's * notes, but the only way to pick that up is to pull the text out of that contained exception. However, all four options * are probably worth checking no matter what the exception's message. */ //todo NodeId is in protocol, which will be accessible from the hub code where this class should live //case class CouldNotConnectToQueryNode(nodeId:NodeId,connectExcepition:ConnectException) extends Problem { case class CouldNotConnectToNode(nodeName:String,connectException:ConnectException) extends AbstractProblem(ProblemSources.Hub) { val summary = s"Could not connect to node $nodeName" override def throwable = Some(connectException) } diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala index 82372b68f..02d60db5f 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala @@ -1,31 +1,41 @@ package net.shrine.aggregation -import net.shrine.protocol.ShrineResponse + import net.shrine.aggregation.BasicAggregator.{Invalid, Error, Valid} +import net.shrine.problem.{ProblemSources, AbstractProblem} import net.shrine.protocol.ErrorResponse import net.shrine.protocol.BaseShrineResponse /** * * @author Clint Gilbert - * @date Sep 16, 2011 + * @since Sep 16, 2011 * - * @link http://cbmi.med.harvard.edu + * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL - * @link http://www.gnu.org/licenses/lgpl.html + * @see http://www.gnu.org/licenses/lgpl.html * * Extends BasicAggregator to ignore Errors and Invalid responses * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class IgnoresErrorsAggregator[T <: BaseShrineResponse : Manifest] extends BasicAggregator[T] { private[aggregation] override def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse = { //Filter out errors and invalid responses makeResponseFrom(validResponses) } //Default implementation, just returns first valid response, or if there are none, an ErrorResponse private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]]): BaseShrineResponse = { - validResponses.map(_.response).toSet.headOption.getOrElse(new ErrorResponse("No valid responses to aggregate")) + + + validResponses.map(_.response).toSet.headOption.getOrElse{ + val problem = NoValidResponsesToAggregate() + ErrorResponse(problem.summary,Some(problem)) + } } +} + +case class NoValidResponsesToAggregate() extends AbstractProblem(ProblemSources.Hub) { + override def summary: String = "No valid responses to aggregate" } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/ReadQueryResultAggregatorTest.scala b/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/ReadQueryResultAggregatorTest.scala index 8af33b5d1..bd0f6d38c 100644 --- a/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/ReadQueryResultAggregatorTest.scala +++ b/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/ReadQueryResultAggregatorTest.scala @@ -1,132 +1,132 @@ package net.shrine.aggregation import scala.concurrent.duration.DurationInt import org.junit.Test import net.shrine.util.ShouldMatchersForJUnit import net.shrine.protocol.AggregatedReadQueryResultResponse import net.shrine.protocol.ErrorResponse import net.shrine.protocol.NodeId import net.shrine.protocol.QueryResult import net.shrine.protocol.ReadQueryResultResponse import net.shrine.protocol.Result import net.shrine.protocol.ResultOutputType import net.shrine.protocol.BaseShrineResponse /** * @author clint * @since Nov 7, 2012 */ final class ReadQueryResultAggregatorTest extends ShouldMatchersForJUnit { private val queryId = 12345L import ResultOutputType._ private def asAggregatedResponse(resp: BaseShrineResponse) = resp.asInstanceOf[AggregatedReadQueryResultResponse] private val setSize1 = 123L private val setSize2 = 456L private val totalSetSize = setSize1 + setSize2 private val queryResult1 = QueryResult(1L, 2L, Some(PATIENT_COUNT_XML), setSize1, None, None, None, QueryResult.StatusType.Finished, None) private val queryResult2 = QueryResult(1L, 2L, Some(PATIENT_COUNT_XML), setSize2, None, None, None, QueryResult.StatusType.Finished, None) private val response1 = ReadQueryResultResponse(queryId, queryResult1) private val response2 = ReadQueryResultResponse(queryId, queryResult2) import scala.concurrent.duration._ private val result1 = Result(NodeId("X"), 1.second, response1) private val result2 = Result(NodeId("Y"), 1.second, response2) private val errors = Seq(ErrorResponse("blarg"), ErrorResponse("glarg")) @Test def testAggregate { val aggregator = new ReadQueryResultAggregator(queryId, true) val response = asAggregatedResponse(aggregator.aggregate(Seq(result1, result2), Nil)) val Seq(actualQueryResult1, actualQueryResult2, aggregatedQueryResult) = response.results actualQueryResult1 should equal(queryResult1) actualQueryResult2 should equal(queryResult2) val expectedAggregatedResult = queryResult1.withSetSize(totalSetSize).withInstanceId(queryId).withDescription("Aggregated Count") aggregatedQueryResult should equal(expectedAggregatedResult) } @Test def testAggregateNoAggregatedResult { val aggregator = new ReadQueryResultAggregator(queryId, false) val response = asAggregatedResponse(aggregator.aggregate(Seq(result1, result2), Nil)) val Seq(actualQueryResult1, actualQueryResult2) = response.results actualQueryResult1 should equal(queryResult1) actualQueryResult2 should equal(queryResult2) } @Test def testAggregateNoResponses { for (doAggregation <- Seq(true, false)) { val aggregator = new ReadQueryResultAggregator(queryId, true) val response = asAggregatedResponse(aggregator.aggregate(Nil, Nil)) response.queryId should equal(queryId) response.results.isEmpty should be(true) } } @Test def testAggregateOnlyErrorResponses { val aggregator = new ReadQueryResultAggregator(queryId, true) val response = asAggregatedResponse(aggregator.aggregate(Nil, errors)) response.queryId should equal(queryId) response.results.exists(qr => qr.problemDigest.exists(pd => pd.codec == classOf[ErrorResultProblem].getName)) should be (true) } @Test def testAggregateSomeErrors { val aggregator = new ReadQueryResultAggregator(queryId, true) val response = asAggregatedResponse(aggregator.aggregate(Seq(result1, result2), errors)) val Seq(actualQueryResult1, actualQueryResult2, aggregatedQueryResult, actualErrorQueryResults @ _*) = response.results actualQueryResult1 should equal(queryResult1) actualQueryResult2 should equal(queryResult2) val expectedAggregatedResult = queryResult1.withSetSize(totalSetSize).withInstanceId(queryId).withDescription("Aggregated Count") aggregatedQueryResult should equal(expectedAggregatedResult) actualErrorQueryResults.exists(qr => qr.problemDigest.exists(pd => pd.codec == classOf[ErrorResultProblem].getName)) should be (true) } @Test def testAggregateSomeDownstreamErrors { val aggregator = new ReadQueryResultAggregator(queryId, true) val result3 = Result(NodeId("A"), 1.second, errors.head) val result4 = Result(NodeId("A"), 1.second, errors.last) val response = asAggregatedResponse(aggregator.aggregate(Seq(result1, result2, result3, result4), Nil)) val Seq(actualQueryResult1, actualQueryResult2, aggregatedQueryResult, actualErrorQueryResults @ _*) = response.results actualQueryResult1 should equal(queryResult1) actualQueryResult2 should equal(queryResult2) val expectedAggregatedResult = queryResult1.withSetSize(totalSetSize).withInstanceId(queryId).withDescription("Aggregated Count") aggregatedQueryResult should equal(expectedAggregatedResult) - actualErrorQueryResults.forall(qr => qr.description == Some("A")) + actualErrorQueryResults.forall(qr => qr.description.contains("A")) should be(true) } } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/RunQueryAggregatorTest.scala b/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/RunQueryAggregatorTest.scala index 78ad65cc2..93f13f256 100644 --- a/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/RunQueryAggregatorTest.scala +++ b/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/RunQueryAggregatorTest.scala @@ -1,154 +1,154 @@ package net.shrine.aggregation import scala.concurrent.duration.DurationInt import org.junit.Test import net.shrine.util.ShouldMatchersForJUnit import net.shrine.protocol.AggregatedRunQueryResponse import net.shrine.protocol.ErrorResponse import net.shrine.protocol.I2b2ResultEnvelope import net.shrine.protocol.NodeId import net.shrine.protocol.QueryResult import net.shrine.protocol.Result import net.shrine.protocol.ResultOutputType import net.shrine.protocol.ResultOutputType.PATIENTSET import net.shrine.protocol.ResultOutputType.PATIENT_COUNT_XML import net.shrine.protocol.RunQueryResponse import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.query.Term import net.shrine.util.XmlDateHelper import net.shrine.protocol.DefaultBreakdownResultOutputTypes /** * * * @author Justin Quan - * @link http://chip.org + * @see http://chip.org * Date: 8/12/11 */ final class RunQueryAggregatorTest extends ShouldMatchersForJUnit { private val queryId = 1234L private val queryName = "someQueryName" private val now = XmlDateHelper.now private val userId = "user" private val groupId = "group" private val requestQueryDef = QueryDefinition(queryName, Term("""\\i2b2\i2b2\Demographics\Age\0-9 years old\""")) private val requestQueryDefString = requestQueryDef.toI2b2String private val queryInstanceId = 9999L import scala.concurrent.duration._ @Test def testAggregate { val qrCount = new QueryResult(1L, queryInstanceId, PATIENT_COUNT_XML, 10L, now, now, "Desc", QueryResult.StatusType.Finished) val qrSet = new QueryResult(2L, queryInstanceId, PATIENTSET, 10L, now, now, "Desc", QueryResult.StatusType.Finished) val rqr1 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qrCount) val rqr2 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qrSet) val result2 = Result(NodeId("description1"), 1.second, rqr2) val result1 = Result(NodeId("description2"), 1.second, rqr1) val aggregator = new RunQueryAggregator(queryId, userId, groupId, requestQueryDef, true) val actual = aggregator.aggregate(Vector(result1, result2), Nil).asInstanceOf[AggregatedRunQueryResponse] actual.queryId should equal(queryId) actual.queryInstanceId should equal(-1L) actual.results.size should equal(3) actual.results.filter(_.resultTypeIs(PATIENT_COUNT_XML)).size should equal(2) //1 for the actual count result, 1 for the aggregated total count actual.results.filter(_.resultTypeIs(PATIENTSET)).size should equal(1) actual.results.filter(hasTotalCount).size should equal(1) actual.results.filter(hasTotalCount).head.setSize should equal(20) actual.queryName should equal(queryName) } @Test def testAggCount { val qrSet = new QueryResult(2L, queryInstanceId, PATIENTSET, 10L, now, now, "Desc", QueryResult.StatusType.Finished) val rqr1 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qrSet) val rqr2 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qrSet) val result1 = Result(NodeId("description1"), 1.second, rqr1) val result2 = Result(NodeId("description2"), 1.second, rqr2) val aggregator = new RunQueryAggregator(queryId, userId, groupId, requestQueryDef, true) //TODO: test handling error responses val actual = aggregator.aggregate(Vector(result1, result2), Nil).asInstanceOf[AggregatedRunQueryResponse] actual.results.filter(_.description.getOrElse("").equalsIgnoreCase("TOTAL COUNT")).head.setSize should equal(20) } @Test def testHandleErrorResponse { val qrCount = new QueryResult(1L, queryInstanceId, PATIENT_COUNT_XML, 10L, now, now, "Desc", QueryResult.StatusType.Finished) val rqr1 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qrCount) val errorMessage = "error message" - val errorResponse = new ErrorResponse(errorMessage) + val errorResponse = ErrorResponse(errorMessage) val result1 = Result(NodeId("description1"), 1.second, rqr1) val result2 = Result(NodeId("description2"), 1.second, errorResponse) val aggregator = new RunQueryAggregator(queryId, userId, groupId, requestQueryDef, true) val actual = aggregator.aggregate(Vector(result1, result2), Nil).asInstanceOf[AggregatedRunQueryResponse] actual.results.size should equal(3) actual.results.filter(_.resultTypeIs(PATIENT_COUNT_XML)).head.setSize should equal(10) actual.results.filter(_.statusType == QueryResult.StatusType.Error).head.statusMessage should equal(Some(errorMessage)) actual.results.filter(hasTotalCount).head.setSize should equal(10) } @Test def testAggregateResponsesWithBreakdowns { def toColumnTuple(i: Int) = ("x" + i, i.toLong) val breakdowns1 = Map.empty ++ DefaultBreakdownResultOutputTypes.values.map { resultType => resultType -> I2b2ResultEnvelope(resultType, (1 to 10).map(toColumnTuple).toMap) } val breakdowns2 = Map.empty ++ DefaultBreakdownResultOutputTypes.values.map { resultType => resultType -> I2b2ResultEnvelope(resultType, (11 to 20).map(toColumnTuple).toMap) } val qr1 = new QueryResult(1L, queryInstanceId, Some(PATIENT_COUNT_XML), 10L, Some(now), Some(now), Some("Desc"), QueryResult.StatusType.Finished, None, breakdowns = breakdowns1) val qr2 = new QueryResult(2L, queryInstanceId, Some(PATIENT_COUNT_XML), 20L, Some(now), Some(now), Some("Desc"), QueryResult.StatusType.Finished, None, breakdowns = breakdowns2) val rqr1 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qr1) val rqr2 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qr2) val result1 = Result(NodeId("description2"), 1.second, rqr1) val result2 = Result(NodeId("description1"), 1.second, rqr2) val aggregator = new RunQueryAggregator(queryId, userId, groupId, requestQueryDef, true) val actual = aggregator.aggregate(Seq(result1, result2), Nil).asInstanceOf[AggregatedRunQueryResponse] actual.results.size should equal(3) actual.results.filter(hasTotalCount).size should equal(1) val Seq(actualQr1, actualQr2, actualQr3) = actual.results.filter(_.resultTypeIs(PATIENT_COUNT_XML)) actualQr1.setSize should equal(10) actualQr2.setSize should equal(20) actualQr3.setSize should equal(30) actualQr1.breakdowns should equal(breakdowns1) actualQr2.breakdowns should equal(breakdowns2) actualQr3.breakdowns.isEmpty should be(true) } private def hasTotalCount(result: QueryResult) = result.description.getOrElse("").equalsIgnoreCase("TOTAL COUNT") private def toQueryResultMap(results: QueryResult*) = Map.empty ++ (for { result <- results resultType <- result.resultType } yield (resultType, result)) } diff --git a/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/HubBroadcastServiceTest.scala b/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/HubBroadcastServiceTest.scala index 86bb821c2..700cc38e8 100644 --- a/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/HubBroadcastServiceTest.scala +++ b/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/HubBroadcastServiceTest.scala @@ -1,98 +1,100 @@ package net.shrine.broadcaster +import net.shrine.problem.TestProblem + import scala.concurrent.Await import org.junit.Test import net.shrine.util.ShouldMatchersForJUnit import net.shrine.aggregation.Aggregator import net.shrine.crypto.DefaultSignerVerifier import net.shrine.crypto.TestKeystore import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.Credential import net.shrine.protocol.DeleteQueryRequest import net.shrine.protocol.ErrorResponse import net.shrine.protocol.Failure import net.shrine.protocol.NodeId import net.shrine.protocol.Result import net.shrine.protocol.ShrineResponse import net.shrine.protocol.SingleNodeResult import net.shrine.protocol.Timeout import net.shrine.broadcaster.dao.MockHubDao /** * @author clint - * @date Nov 19, 2013 + * @since Nov 19, 2013 */ final class HubBroadcastAndAggregationServiceTest extends AbstractSquerylHubDaoTest with ShouldMatchersForJUnit { import scala.concurrent.duration._ import MockBroadcasters._ - private def result(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah")) + private def result(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah",Some(TestProblem))) private val results = "abcde".map(result) private lazy val nullResultsByOrigin: Map[NodeId, SingleNodeResult] = Map(NodeId("X") -> null, NodeId("Y") -> null) private lazy val resultsWithNullsByOrigin: Map[NodeId, SingleNodeResult] = { results.collect { case r @ Result(origin, _, _) => origin -> r }.toMap ++ nullResultsByOrigin } private val broadcastMessage = { val authn = AuthenticationInfo("domain", "username", Credential("asdasd", false)) import scala.concurrent.duration._ BroadcastMessage(authn, DeleteQueryRequest("projectId", 12345.milliseconds, authn, 12345L)) } @Test def testAggregateHandlesNullResults { val mockBroadcaster = MockAdapterClientBroadcaster(resultsWithNullsByOrigin) val broadcastService = new HubBroadcastAndAggregationService(InJvmBroadcasterClient(mockBroadcaster)) val aggregator: Aggregator = new Aggregator { override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): ShrineResponse = { - ErrorResponse(results.size.toString) + ErrorResponse(results.size.toString,Some(TestProblem)) } } val aggregatedResult = Await.result(broadcastService.sendAndAggregate(broadcastMessage, aggregator, true), 5.minutes) mockBroadcaster.messageParam.signature.isDefined should be(false) - aggregatedResult should equal(ErrorResponse(s"${results.size}")) + aggregatedResult should equal(ErrorResponse(s"${results.size}",Some(TestProblem))) } @Test def testAggregateHandlesFailures { def toResult(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah")) def toFailure(description: Char) = Failure(NodeId(description.toString), new Exception with scala.util.control.NoStackTrace) val failuresByOrigin: Map[NodeId, SingleNodeResult] = { "UV".map(toFailure).map { case f @ Failure(origin, _) => origin -> f }.toMap } val timeoutsByOrigin: Map[NodeId, SingleNodeResult] = Map(NodeId("Z") -> Timeout(NodeId("Z"))) val resultsWithFailuresByOrigin: Map[NodeId, SingleNodeResult] = resultsWithNullsByOrigin ++ failuresByOrigin ++ timeoutsByOrigin val mockBroadcaster = MockAdapterClientBroadcaster(resultsWithFailuresByOrigin) val broadcastService = new HubBroadcastAndAggregationService(InJvmBroadcasterClient(mockBroadcaster)) val aggregator: Aggregator = new Aggregator { override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): ShrineResponse = { - ErrorResponse(s"${results.size},${errors.size}") + ErrorResponse(s"${results.size},${errors.size}",Some(TestProblem)) } } val aggregatedResult = Await.result(broadcastService.sendAndAggregate(broadcastMessage, aggregator, true), 5.minutes) mockBroadcaster.messageParam.signature.isDefined should be(false) - aggregatedResult should equal(ErrorResponse(s"${results.size + failuresByOrigin.size + timeoutsByOrigin.size},0")) + aggregatedResult should equal(ErrorResponse(s"${results.size + failuresByOrigin.size + timeoutsByOrigin.size},0",Some(TestProblem))) } } diff --git a/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationServiceTest.scala b/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationServiceTest.scala index 1a02d0445..28f37e2e2 100644 --- a/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationServiceTest.scala +++ b/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationServiceTest.scala @@ -1,100 +1,102 @@ package net.shrine.broadcaster +import net.shrine.problem.TestProblem + import scala.concurrent.Await import org.junit.Test import net.shrine.util.ShouldMatchersForJUnit import net.shrine.aggregation.Aggregator import net.shrine.crypto.DefaultSignerVerifier import net.shrine.crypto.TestKeystore import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.Credential import net.shrine.protocol.DeleteQueryRequest import net.shrine.protocol.ErrorResponse import net.shrine.protocol.Failure import net.shrine.protocol.NodeId import net.shrine.protocol.Result import net.shrine.protocol.ShrineResponse import net.shrine.protocol.SingleNodeResult import net.shrine.protocol.Timeout import net.shrine.crypto.SigningCertStrategy import net.shrine.broadcaster.dao.MockHubDao /** * @author clint - * @date Nov 19, 2013 + * @since Nov 19, 2013 */ final class SigningBroadcastAndAggregationServiceTest extends ShouldMatchersForJUnit { import scala.concurrent.duration._ import MockBroadcasters._ - private def result(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah")) + private def result(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah",Some(TestProblem))) private val results = "abcde".map(result) private lazy val nullResultsByOrigin: Map[NodeId, SingleNodeResult] = Map(NodeId("X") -> null, NodeId("Y") -> null) private lazy val resultsWithNullsByOrigin: Map[NodeId, SingleNodeResult] = { results.collect { case r @ Result(origin, _, _) => origin -> r }.toMap ++ nullResultsByOrigin } private lazy val signer = new DefaultSignerVerifier(TestKeystore.certCollection) private val broadcastMessage = { val authn = AuthenticationInfo("domain", "username", Credential("asdasd", false)) import scala.concurrent.duration._ BroadcastMessage(authn, DeleteQueryRequest("projectId", 12345.milliseconds, authn, 12345L)) } @Test def testAggregateHandlesNullResults { val mockBroadcaster = MockAdapterClientBroadcaster(resultsWithNullsByOrigin) val broadcastService = SigningBroadcastAndAggregationService(InJvmBroadcasterClient(mockBroadcaster), signer, SigningCertStrategy.Attach) val aggregator: Aggregator = new Aggregator { override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): ShrineResponse = { - ErrorResponse(results.size.toString) + ErrorResponse(results.size.toString,Some(TestProblem)) } } val aggregatedResult = Await.result(broadcastService.sendAndAggregate(broadcastMessage, aggregator, true), 5.minutes) mockBroadcaster.messageParam.signature.isDefined should be(true) - aggregatedResult should equal(ErrorResponse(s"${results.size}")) + aggregatedResult should equal(ErrorResponse(s"${results.size}",Some(TestProblem))) } @Test def testAggregateHandlesFailures { - def toResult(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah")) + def toResult(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah",Some(TestProblem))) def toFailure(description: Char) = Failure(NodeId(description.toString), new Exception with scala.util.control.NoStackTrace) val failuresByOrigin: Map[NodeId, SingleNodeResult] = { "UV".map(toFailure).map { case f @ Failure(origin, _) => origin -> f }.toMap } val timeoutsByOrigin: Map[NodeId, SingleNodeResult] = Map(NodeId("Z") -> Timeout(NodeId("Z"))) val resultsWithFailuresByOrigin: Map[NodeId, SingleNodeResult] = resultsWithNullsByOrigin ++ failuresByOrigin ++ timeoutsByOrigin val mockBroadcaster = MockAdapterClientBroadcaster(resultsWithFailuresByOrigin) val broadcastService = SigningBroadcastAndAggregationService(InJvmBroadcasterClient(mockBroadcaster), signer, SigningCertStrategy.DontAttach) val aggregator: Aggregator = new Aggregator { override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): ShrineResponse = { - ErrorResponse(s"${results.size},${errors.size}") + ErrorResponse(s"${results.size},${errors.size}",Some(TestProblem)) } } val aggregatedResult = Await.result(broadcastService.sendAndAggregate(broadcastMessage, aggregator, true), 5.minutes) mockBroadcaster.messageParam.signature.isDefined should be(true) - aggregatedResult should equal(ErrorResponse(s"${results.size + failuresByOrigin.size + timeoutsByOrigin.size},0")) + aggregatedResult should equal(ErrorResponse(s"${results.size + failuresByOrigin.size + timeoutsByOrigin.size},0",Some(TestProblem))) } }