diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
index c13bf959b..7ce5ce381 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
@@ -1,291 +1,297 @@
package net.shrine.adapter
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import net.shrine.adapter.audit.AdapterAuditDb
+import net.shrine.problem.{AbstractProblem, ProblemSources}
import scala.Option.option2Iterable
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.xml.NodeSeq
import net.shrine.adapter.Obfuscator.obfuscateResults
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.dao.model.Breakdown
import net.shrine.adapter.dao.model.ShrineQueryResult
import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, ErrorResponse, HasQueryResults, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse, BaseShrineRequest}
import net.shrine.protocol.query.QueryDefinition
import net.shrine.util.StackTrace
import net.shrine.util.Tries.sequence
import scala.concurrent.duration.Duration
import net.shrine.client.Poster
/**
* @author clint
* @since Nov 2, 2012
*
*/
object AbstractReadQueryResultAdapter {
private final case class RawResponseAttempts(countResponseAttempt: Try[ReadResultResponse], breakdownResponseAttempts: Seq[Try[ReadResultResponse]])
private final case class SpecificResponseAttempts[R](responseAttempt: Try[R], breakdownResponseAttempts: Seq[Try[ReadResultResponse]])
}
abstract class AbstractReadQueryResultAdapter[Req <: BaseShrineRequest, Rsp <: ShrineResponse with HasQueryResults](
poster: Poster,
override val hiveCredentials: HiveCredentials,
dao: AdapterDao,
doObfuscation: Boolean,
getQueryId: Req => Long,
getProjectId: Req => String,
toResponse: (Long, QueryResult) => Rsp,
breakdownTypes: Set[ResultOutputType],
collectAdapterAudit:Boolean
) extends WithHiveCredentialsAdapter(hiveCredentials) {
//TODO: Make this configurable
private val numThreads = math.max(5, Runtime.getRuntime.availableProcessors)
//TODO: Use scala.concurrent.ExecutionContext.Implicits.global instead?
private lazy val executorService = Executors.newFixedThreadPool(numThreads)
private lazy val executionContext = ExecutionContext.fromExecutorService(executorService)
override def shutdown() {
try {
executorService.shutdown()
executorService.awaitTermination(5, TimeUnit.SECONDS)
} finally {
executorService.shutdownNow()
super.shutdown()
}
}
import AbstractReadQueryResultAdapter._
override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = {
val req = message.request.asInstanceOf[Req]
val queryId = getQueryId(req)
def findShrineQueryRow = dao.findQueryByNetworkId(queryId)
def findShrineQueryResults = dao.findResultsFor(queryId)
findShrineQueryRow match {
case None => {
debug(s"Query $queryId not found in the Shrine DB")
errorResponse(queryId)
}
case Some(shrineQueryRow) => {
if (shrineQueryRow.hasNotBeenRun) {
debug(s"Query $queryId found, but it wasn't run before")
findShrineQueryResults.map(makeResponseFrom(queryId, _)).getOrElse {
debug(s"Couldn't retrive all results for query $queryId; it's likely the query's status is incomplete (QUEUED, PROCESSING, etc)")
errorResponse(queryId)
}
} else {
findShrineQueryResults match {
case None => {
debug(s"Query $queryId found, and it has been run, but its results are not available yet")
//TODO: When precisely can this happen? Should we go back to the CRC here?
errorResponse(queryId)
}
case Some(shrineQueryResult) => {
if (shrineQueryResult.isDone) {
debug(s"Query $queryId is done and already stored, returning stored results")
makeResponseFrom(queryId, shrineQueryResult)
} else {
debug(s"Query $queryId is incomplete, asking CRC for results")
val result: ShrineResponse = retrieveQueryResults(queryId, req, shrineQueryResult, message)
if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(queryId,result)
result
}
}
}
}
}
}
}
- private def errorResponse(queryId: Long) = ErrorResponse(s"Query with id '$queryId' not found")
+ private def errorResponse(queryId: Long) = ErrorResponse(QueryNotFound(queryId))
private def makeResponseFrom(queryId: Long, shrineQueryResult: ShrineQueryResult): ShrineResponse = {
shrineQueryResult.toQueryResults(doObfuscation).map(toResponse(queryId, _)).getOrElse(errorResponse(queryId))
}
private def retrieveQueryResults(queryId: Long, req: Req, shrineQueryResult: ShrineQueryResult, message: BroadcastMessage): ShrineResponse = {
//NB: If the requested query was not finished executing on the i2b2 side when Shrine recorded it, attempt to
//retrieve it and all its sub-components (breakdown results, if any) in parallel. Asking for the results in
//parallel is quite possibly too clever, but may be faster than asking for them serially.
//TODO: Review this.
//Make requests for results in parallel
val futureResponses = scatter(message.networkAuthn, req, shrineQueryResult)
//Gather all the results (block until they're all returned)
val SpecificResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = gather(queryId, futureResponses, req.waitTime)
countResponseAttempt match {
//If we successfully received the parent response (the one with query type PATIENT_COUNT_XML), re-store it along
//with any retrieved breakdowns before returning it.
case Success(countResponse) => {
//NB: Only store the result if needed, that is, if all results are done
//TODO: REVIEW THIS
storeResultIfNecessary(shrineQueryResult, countResponse, req.authn, queryId, getFailedBreakdownTypes(breakdownResponseAttempts))
countResponse
}
case Failure(e) => ErrorResponse(s"Couldn't retrieve query with id '$queryId' from the CRC: exception message follows: ${e.getMessage} stack trace: ${StackTrace.stackTraceAsString(e)}")
}
}
private def scatter(authn: AuthenticationInfo, req: Req, shrineQueryResult: ShrineQueryResult): Future[RawResponseAttempts] = {
def makeRequest(localResultId: Long) = ReadResultRequest(hiveCredentials.projectId, req.waitTime, hiveCredentials.toAuthenticationInfo, localResultId.toString)
def process(localResultId: Long): ShrineResponse = {
delegateResultRetrievingAdapter.process(authn, makeRequest(localResultId))
}
implicit val executionContext = this.executionContext
import scala.concurrent.blocking
def futureBlockingAttempt[T](f: => T): Future[Try[T]] = Future(blocking(Try(f)))
val futureCountAttempt: Future[Try[ShrineResponse]] = futureBlockingAttempt {
process(shrineQueryResult.count.localId)
}
val futureBreakdownAttempts = Future.sequence(for {
Breakdown(_, localResultId, resultType, data) <- shrineQueryResult.breakdowns
} yield futureBlockingAttempt {
process(localResultId)
})
//Log errors retrieving count
futureCountAttempt.collect {
case Success(e: ErrorResponse) => error(s"Error requesting count result from the CRC: '$e'")
case Failure(e) => error(s"Error requesting count result from the CRC: ", e)
}
//Log errors retrieving breakdown
for {
breakdownResponseAttempts <- futureBreakdownAttempts
} {
breakdownResponseAttempts.collect {
case Success(e: ErrorResponse) => error(s"Error requesting breakdown result from the CRC: '$e'")
case Failure(e) => error(s"Error requesting breakdown result from the CRC: ", e)
}
}
//"Filter" for non-ErrorResponses
val futureNonErrorCountAttempt: Future[Try[ReadResultResponse]] = futureCountAttempt.collect {
case Success(resp: ReadResultResponse) => Success(resp) //NB: Need to repackage response here to avoid ugly, obscure, superfluous cast
case unexpected => Failure(new Exception(s"Getting count result failed. Response is: '$unexpected'"))
}
//"Filter" for non-ErrorResponses
val futureNonErrorBreakdownResponseAttempts: Future[Seq[Try[ReadResultResponse]]] = for {
breakdownResponseAttempts <- futureBreakdownAttempts
} yield {
breakdownResponseAttempts.collect {
case Success(resp: ReadResultResponse) => Try(resp)
}
}
for {
countResponseAttempt <- futureNonErrorCountAttempt
breakdownResponseAttempts <- futureNonErrorBreakdownResponseAttempts
} yield {
RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts)
}
}
private def gather(queryId: Long, futureResponses: Future[RawResponseAttempts], waitTime: Duration): SpecificResponseAttempts[Rsp] = {
val RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = Await.result(futureResponses, waitTime)
//Log any failures
(countResponseAttempt +: breakdownResponseAttempts).collect { case Failure(e) => e }.foreach(error("Error retrieving result from the CRC: ", _))
//NB: Count response and ALL breakdown responses must be available (not Failures) or else a Failure will be returned
val responseAttempt = for {
countResponse: ReadResultResponse <- countResponseAttempt
countQueryResult = countResponse.metadata
breakdownResponses: Seq[ReadResultResponse] <- sequence(breakdownResponseAttempts)
} yield {
val localCountResultId = countResponse.metadata.resultId
val breakdownsByType = (for {
breakdownResponse <- breakdownResponses
resultType <- breakdownResponse.metadata.resultType
} yield resultType -> breakdownResponse.data).toMap
val queryResultWithBreakdowns = countQueryResult.withBreakdowns(breakdownsByType)
val queryResultToReturn = if(doObfuscation) Obfuscator.obfuscate(queryResultWithBreakdowns) else queryResultWithBreakdowns
toResponse(queryId, queryResultToReturn)
}
SpecificResponseAttempts(responseAttempt, breakdownResponseAttempts)
}
private def getFailedBreakdownTypes(attempts: Seq[Try[ReadResultResponse]]): Set[ResultOutputType] = {
val successfulBreakdownTypes = attempts.collect { case Success(ReadResultResponse(_, metadata, _)) => metadata.resultType }.flatten
breakdownTypes -- successfulBreakdownTypes
}
private def storeResultIfNecessary(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) {
val responseIsDone = response.results.forall(_.statusType.isDone)
if (responseIsDone) {
storeResult(shrineQueryResult, response, authn, queryId, failedBreakdownTypes)
}
}
private def storeResult(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) {
val rawResults = response.results
val obfuscatedResults = obfuscateResults(doObfuscation)(response.results)
for {
shrineQuery <- dao.findQueryByNetworkId(queryId)
queryResult <- rawResults.headOption
obfuscatedQueryResult <- obfuscatedResults.headOption
} {
val queryDefinition = QueryDefinition(shrineQuery.name, shrineQuery.queryDefinition.expr)
dao.inTransaction {
dao.deleteQuery(queryId)
dao.storeResults(authn, shrineQueryResult.localId, queryId, queryDefinition, rawResults, obfuscatedResults, failedBreakdownTypes.toSeq, queryResult.breakdowns, obfuscatedQueryResult.breakdowns)
}
}
}
private type Unmarshaller[R] = Set[ResultOutputType] => NodeSeq => Try[R]
private final class DelegateAdapter[Rqst <: ShrineRequest, Rspns <: ShrineResponse](unmarshaller: Unmarshaller[Rspns]) extends CrcAdapter[Rqst, Rspns](poster, hiveCredentials) {
def process(authn: AuthenticationInfo, req: Rqst): Rspns = processRequest(BroadcastMessage(authn, req)).asInstanceOf[Rspns]
override protected def parseShrineResponse(xml: NodeSeq): ShrineResponse = unmarshaller(breakdownTypes)(xml).get //TODO: Avoid .get call
}
private lazy val delegateResultRetrievingAdapter = new DelegateAdapter[ReadResultRequest, ReadResultResponse](ReadResultResponse.fromI2b2 _)
}
+
+case class QueryNotFound(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) {
+ override def summary: String = s"Query with id '$queryId' not found"
+}
+
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
index beeacf088..8430355fc 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
@@ -1,64 +1,74 @@
package net.shrine.adapter
import net.shrine.log.Loggable
-import net.shrine.protocol.BroadcastMessage
-import net.shrine.protocol.ErrorResponse
+import net.shrine.problem.{Problem, ProblemNotInCodec, LoggingProblemHandler, ProblemSources, AbstractProblem}
+import net.shrine.protocol.{ShrineRequest, BroadcastMessage, ErrorResponse, ShrineResponse, BaseShrineResponse, AuthenticationInfo}
import net.shrine.serialization.XmlMarshaller
import net.shrine.util.StackTrace
-import net.shrine.protocol.ShrineResponse
-import net.shrine.protocol.BaseShrineResponse
-import net.shrine.protocol.AuthenticationInfo
/**
* @author Bill Simons
* @since 4/8/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
abstract class Adapter extends Loggable {
final def perform(message: BroadcastMessage): BaseShrineResponse = {
+ def problemToErrorResponse(problem:Problem):ErrorResponse = {
+ LoggingProblemHandler.handleProblem(problem)
+ ErrorResponse(problem)
+ }
+
val shrineResponse = try {
processRequest(message)
} catch {
case e: AdapterLockoutException => {
- val AuthenticationInfo(domain, username, _) = message.request.authn
-
- warn(s"User '$domain:$username' is locked out")
-
- errorResponseFrom(e)
+ problemToErrorResponse(AdapterLockout(message.request.authn,e))
}
case e @ CrcInvocationException(invokedCrcUrl, request, cause) => {
- error(s"Error invoking the CRC at '$invokedCrcUrl' with request $request . Root cause: ", cause)
-
- errorResponseFrom(e)
+ problemToErrorResponse(CrcCouldNotBeInvoked(invokedCrcUrl,request,e))
}
case e: AdapterMappingException => {
- warn(s"Error mapping query terms: ${e.getMessage}", e)
-
- errorResponseFrom(e)
- }
+ problemToErrorResponse(AdapterMappingProblem(e))
+ }
case e: Exception => {
- //for now we'll warn on all errors and work towards more specific logging later
- def messageXml = Option(message).map(_.toXmlString).getOrElse("(Null message)")
-
- warn(s"Exception $e in Adapter with stack trace:\r\n${ StackTrace.stackTraceAsString(e) } caused on request\r\n $messageXml", e)
-
- errorResponseFrom(e)
+
+ val summary = if(message == null) "Unknown problem in Adapter.perform with null BroadcastMessage"
+ else s"Unexpected exception in Adapter"
+ problemToErrorResponse(ProblemNotInCodec(summary,e))
}
}
shrineResponse
}
- private def errorResponseFrom(e: Throwable) = ErrorResponse(e.getMessage)
-
protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse
//NOOP, may be overridden by subclasses
def shutdown(): Unit = ()
-}
\ No newline at end of file
+}
+
+case class AdapterLockout(authn:AuthenticationInfo,x:AdapterLockoutException) extends AbstractProblem(ProblemSources.Hub) {
+ override def summary: String = s"User '${authn.domain}:${authn.username}' is locked out"
+
+ override def throwable = Some(x)
+}
+
+case class CrcCouldNotBeInvoked(crcUrl:String,request:ShrineRequest,x:CrcInvocationException) extends AbstractProblem(ProblemSources.Hub) {
+ override def summary: String = s"Error invoking the CRC at '$crcUrl' with request $request ."
+
+ override def throwable = Some(x)
+}
+
+case class AdapterMappingProblem(x:AdapterMappingException) extends AbstractProblem(ProblemSources.Hub) {
+ override def summary: String = s"Error mapping query terms: ${x.getMessage}"
+
+ override def throwable = Some(x)
+}
+
+case class ExceptionInAdapter()
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala
index 9dfbc2285..80784f2c8 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala
@@ -1,33 +1,38 @@
package net.shrine.adapter.components
import net.shrine.adapter.dao.AdapterDao
+import net.shrine.problem.{ProblemSources, AbstractProblem}
import net.shrine.protocol.ShrineResponse
import net.shrine.protocol.ReadQueryDefinitionRequest
import net.shrine.protocol.ReadQueryDefinitionResponse
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.query.QueryDefinition
import net.shrine.protocol.AbstractReadQueryDefinitionRequest
/**
* @author clint
* @since Apr 4, 2013
*
* NB: Tested by ReadQueryDefinitionAdapterTest
*/
final case class QueryDefinitions[Req <: AbstractReadQueryDefinitionRequest](dao: AdapterDao) {
def get(request: Req): ShrineResponse = {
val resultOption = for {
shrineQuery <- dao.findQueryByNetworkId(request.queryId)
} yield {
ReadQueryDefinitionResponse(
shrineQuery.networkId,
shrineQuery.name,
shrineQuery.username,
shrineQuery.dateCreated,
//TODO: I2b2 or Shrine format?
shrineQuery.queryDefinition.toI2b2String)
}
- resultOption.getOrElse(ErrorResponse(s"Couldn't find query with network id: ${request.queryId}"))
+ resultOption.getOrElse(ErrorResponse(QueryNotInDatabase(request)))
}
+}
+
+case class QueryNotInDatabase(request:AbstractReadQueryDefinitionRequest) extends AbstractProblem(ProblemSources.Hub) {
+ override def summary: String = s"Couldn't find query with network id: ${request.queryId}"
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala
index ea595f7c6..d784c48ea 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala
@@ -1,370 +1,375 @@
package net.shrine.adapter
import scala.xml.NodeSeq
import net.shrine.util.ShouldMatchersForJUnit
import ObfuscatorTest.within3
import javax.xml.datatype.XMLGregorianCalendar
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest
import net.shrine.client.HttpClient
import net.shrine.client.HttpResponse
import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, CrcRequest, Credential, ErrorResponse, I2b2ResultEnvelope, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse, BaseShrineResponse, BaseShrineRequest, RunQueryRequest, RunQueryResponse, DefaultBreakdownResultOutputTypes}
import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_AGE_COUNT_XML
import net.shrine.protocol.ResultOutputType.PATIENT_COUNT_XML
import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML
import net.shrine.protocol.query.{QueryDefinition, Term}
import net.shrine.util.XmlDateHelper
import net.shrine.util.XmlDateHelper.now
import net.shrine.util.XmlGcEnrichments
import net.shrine.client.Poster
import net.shrine.adapter.translators.QueryDefinitionTranslator
import net.shrine.adapter.translators.ExpressionTranslator
import scala.util.Success
/**
* @author clint
* @since Nov 8, 2012
*/
+//noinspection UnitMethodIsParameterless
abstract class AbstractQueryRetrievalTestCase[R <: BaseShrineResponse](
makeAdapter: (AdapterDao, HttpClient) => WithHiveCredentialsAdapter,
makeRequest: (Long, AuthenticationInfo) => BaseShrineRequest,
extractor: R => Option[(Long, QueryResult)]) extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit {
private val authn = AuthenticationInfo("some-domain", "some-user", Credential("alskdjlkasd", false))
def doTestProcessRequestMissingQuery {
val adapter = makeAdapter(dao, MockHttpClient)
val response = adapter.processRequest(BroadcastMessage(0L, authn, makeRequest(-1L, authn)))
response.isInstanceOf[ErrorResponse] should be(true)
}
def doTestProcessInvalidRequest {
val adapter = makeAdapter(dao, MockHttpClient)
intercept[ClassCastException] {
//request must be a type of request we can handle
adapter.processRequest(BroadcastMessage(0L, authn, new AbstractQueryRetrievalTestCase.BogusRequest))
}
}
private val localMasterId = "alksjdkalsdjlasdjlkjsad"
private val shrineNetworkQueryId = 123L
- private val errorResponse = ErrorResponse(s"Query with id '$shrineNetworkQueryId' not found")
-
private def doGetResults(adapter: Adapter) = adapter.processRequest(BroadcastMessage(shrineNetworkQueryId, authn, makeRequest(shrineNetworkQueryId, authn)))
private def toMillis(xmlGc: XMLGregorianCalendar): Long = xmlGc.toGregorianCalendar.getTimeInMillis
private val instanceId = 999L
private val setSize = 12345L
private val obfSetSize = setSize + 1
private val queryExpr = Term("foo")
private val topicId = "laskdjlkasd"
private val fooQuery = QueryDefinition("some-query",queryExpr)
def doTestProcessRequestIncompleteQuery(countQueryShouldWork: Boolean = true): Unit = afterCreatingTables {
val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None)
import ResultOutputType._
import XmlDateHelper.now
val breakdowns = Map(PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L)))
val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1))
val startDate = now
val elapsed = 100L
val endDate = {
import XmlGcEnrichments._
import scala.concurrent.duration._
startDate + elapsed.milliseconds
}
val countResultId = 456L
val breakdownResultId = 98237943265436L
val incompleteCountResult = QueryResult(
resultId = countResultId,
instanceId = instanceId,
resultType = Some(PATIENT_COUNT_XML),
setSize = setSize,
startDate = Option(startDate),
endDate = Option(endDate),
description = Some("results from node X"),
statusType = QueryResult.StatusType.Processing,
statusMessage = None,
breakdowns = breakdowns)
val breakdownResult = breakdowns.head match {
case (resultType, data) => incompleteCountResult.withId(breakdownResultId).withBreakdowns(Map(resultType -> data)).withResultType(resultType)
}
val queryStartDate = now
val idsByResultType = dao.insertQueryResults(dbQueryId, incompleteCountResult :: breakdownResult :: Nil)
final class MightWorkMockHttpClient(expectedHiveCredentials: HiveCredentials) extends HttpClient {
override def post(input: String, url: String): HttpResponse = {
def makeFinished(queryResult: QueryResult) = queryResult.copy(statusType = QueryResult.StatusType.Finished)
def validateAuthnAndProjectId(req: ShrineRequest) {
req.authn should equal(expectedHiveCredentials.toAuthenticationInfo)
req.projectId should equal(expectedHiveCredentials.projectId)
}
val response = CrcRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input) match {
case Success(req: ReadResultRequest) if req.localResultId == countResultId.toString => {
validateAuthnAndProjectId(req)
if (countQueryShouldWork) {
ReadResultResponse(123L, makeFinished(incompleteCountResult), I2b2ResultEnvelope(PATIENT_COUNT_XML, Map(PATIENT_COUNT_XML.name -> incompleteCountResult.setSize)))
} else {
ErrorResponse("Retrieving count result failed")
}
}
case Success(req: ReadResultRequest) if req.localResultId == breakdownResultId.toString => {
validateAuthnAndProjectId(req)
ReadResultResponse(123L, makeFinished(breakdownResult), breakdowns.head._2)
}
case _ => fail(s"Unknown input: $input")
}
HttpResponse.ok(response.toI2b2String)
}
}
val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, new MightWorkMockHttpClient(AbstractQueryRetrievalTestCase.hiveCredentials))
def getResults = doGetResults(adapter)
getResults.isInstanceOf[ErrorResponse] should be(true)
dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize)
dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns)
//The query shouldn't be 'done', since its status is PROCESSING
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Processing)
//Now, calling processRequest (via getResults) should cause the query to be re-retrieved from the CRC
val result = getResults.asInstanceOf[R]
//Which should cause the query to be re-stored with a 'done' status (since that's what our mock CRC returns)
val expectedStatusType = if (countQueryShouldWork) QueryResult.StatusType.Finished else QueryResult.StatusType.Processing
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(expectedStatusType)
if (!countQueryShouldWork) {
result.isInstanceOf[ErrorResponse] should be(true)
} else {
val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result)
actualNetworkQueryId should equal(shrineNetworkQueryId)
import ObfuscatorTest.within3
actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML))
within3(setSize, actualQueryResult.setSize) should be(true)
actualQueryResult.description should be(Some("results from node X"))
actualQueryResult.statusType should equal(QueryResult.StatusType.Finished)
actualQueryResult.statusMessage should be(Some(QueryResult.StatusType.Finished.name))
actualQueryResult.breakdowns.foreach {
case (rt, I2b2ResultEnvelope(_, data)) => {
data.forall { case (key, value) => within3(value, breakdowns.get(rt).get.data.get(key).get) }
}
}
for {
startDate <- actualQueryResult.startDate
endDate <- actualQueryResult.endDate
} {
val actualElapsed = toMillis(endDate) - toMillis(startDate)
actualElapsed should equal(elapsed)
}
}
}
def doTestProcessRequestQueuedQuery: Unit = afterCreatingTables {
import ResultOutputType._
import XmlDateHelper.now
val startDate = now
val elapsed = 100L
val endDate = {
import XmlGcEnrichments._
import scala.concurrent.duration._
startDate + elapsed.milliseconds
}
val countResultId = 456L
val incompleteCountResult = QueryResult(-1L, -1L, Some(PATIENT_COUNT_XML), -1L, Option(startDate), Option(endDate), Some("results from node X"), QueryResult.StatusType.Queued, None)
dao.inTransaction {
val insertedQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None)
//NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in
//AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete
//queries, will work.
val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(incompleteCountResult))
val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head
dao.insertCountResult(countQueryResultId, -1L, -1L)
}
val queryStartDate = now
object MockHttpClient extends HttpClient {
override def post(input: String, url: String): HttpResponse = ???
}
val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, MockHttpClient)
def getResults = doGetResults(adapter)
getResults.isInstanceOf[ErrorResponse] should be(true)
//The query shouldn't be 'done', since its status is QUEUED
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued)
//Now, calling processRequest (via getResults) should NOT cause the query to be re-retrieved from the CRC, because the query was previously queued
val result = getResults
result.isInstanceOf[ErrorResponse] should be(true)
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued)
}
def doTestProcessRequest = afterCreatingTables {
val adapter = makeAdapter(dao, MockHttpClient)
def getResults = doGetResults(adapter)
- getResults should equal(errorResponse)
+ getResults match {
+ case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryNotFound].getName)
+ case x => fail(s"Got $x, not an ErrorResponse")
+ }
val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None)
- getResults should equal(errorResponse)
+ getResults match {
+ case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryNotFound].getName)
+ case x => fail(s"Got $x, not an ErrorResponse")
+ }
import ResultOutputType._
import XmlDateHelper.now
val breakdowns = Map(
PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L)),
PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("x" -> 3L, "y" -> 4L)))
val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1))
val startDate = now
val elapsed = 100L
val endDate = {
import XmlGcEnrichments._
import scala.concurrent.duration._
startDate + elapsed.milliseconds
}
val countResult = QueryResult(
resultId = 456L,
instanceId = instanceId,
resultType = Some(PATIENT_COUNT_XML),
setSize = setSize,
startDate = Option(startDate),
endDate = Option(endDate),
description = Some("results from node X"),
statusType = QueryResult.StatusType.Finished,
statusMessage = None,
breakdowns = breakdowns
)
val breakdownResults = breakdowns.map {
case (resultType, data) =>
countResult.withBreakdowns(Map(resultType -> data)).withResultType(resultType)
}.toSeq
val queryStartDate = now
val idsByResultType = dao.insertQueryResults(dbQueryId, countResult +: breakdownResults)
getResults.isInstanceOf[ErrorResponse] should be(true)
dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize)
dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns)
val result = getResults.asInstanceOf[R]
val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result)
actualNetworkQueryId should equal(shrineNetworkQueryId)
actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML))
actualQueryResult.setSize should equal(obfSetSize)
actualQueryResult.description should be(None) //TODO: This is probably wrong
actualQueryResult.statusType should equal(QueryResult.StatusType.Finished)
actualQueryResult.statusMessage should be(None)
actualQueryResult.breakdowns should equal(obfscBreakdowns)
for {
startDate <- actualQueryResult.startDate
endDate <- actualQueryResult.endDate
} {
val actualElapsed = toMillis(endDate) - toMillis(startDate)
actualElapsed should equal(elapsed)
}
}
}
object AbstractQueryRetrievalTestCase {
val hiveCredentials = HiveCredentials("some-hive-domain", "hive-username", "hive-password", "hive-project")
val doObfuscation = true
def runQueryAdapter(dao: AdapterDao, poster: Poster): RunQueryAdapter = {
val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("foo" -> Set("bar"))))
new RunQueryAdapter(
poster,
dao,
AbstractQueryRetrievalTestCase.hiveCredentials,
translator,
10000,
doObfuscation,
runQueriesImmediately = true,
DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false
)
}
import scala.concurrent.duration._
final class BogusRequest extends ShrineRequest("fooProject", 1.second, null) {
override val requestType = null
protected override def i2b2MessageBody: NodeSeq =
override def toXml =
}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala
index 9ba8869df..4759cdbdf 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala
@@ -1,66 +1,77 @@
package net.shrine.adapter
+import net.shrine.problem.ProblemNotInCodec
import net.shrine.util.ShouldMatchersForJUnit
import net.shrine.protocol.BaseShrineResponse
import net.shrine.protocol.BroadcastMessage
import org.junit.Test
import net.shrine.protocol.DeleteQueryResponse
import net.shrine.protocol.AuthenticationInfo
import net.shrine.protocol.Credential
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.DeleteQueryRequest
/**
* @author clint
- * @date Mar 31, 2014
+ * @since Mar 31, 2014
*/
+//noinspection UnitMethodIsParameterless
final class AdapterTest extends ShouldMatchersForJUnit {
private final class MockAdapter(toReturn: => BaseShrineResponse) extends Adapter {
override protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse = toReturn
}
import scala.concurrent.duration._
- private val lockedOutAuthn = AuthenticationInfo("d", "u", Credential("p", false))
+ private val lockedOutAuthn = AuthenticationInfo("d", "u", Credential("p", isToken = false))
- private val networkAuthn = AuthenticationInfo("nd", "nu", Credential("np", false))
+ private val networkAuthn = AuthenticationInfo("nd", "nu", Credential("np", isToken = false))
private val req = DeleteQueryRequest("pid", 1.second, lockedOutAuthn, 12345L)
private val resp = DeleteQueryResponse(12345)
-
+
@Test
def testHandlesNonFailureCase: Unit = {
val adapter = new MockAdapter(resp)
adapter.perform(null) should equal(resp)
}
-
+
@Test
def testHandlesLockoutCase: Unit = {
- doExceptionToErrorResponseTest(new AdapterLockoutException(lockedOutAuthn))
+ doErrorResponseTest(new AdapterLockoutException(lockedOutAuthn),classOf[AdapterLockout])
}
-
+
@Test
def testHandlesCrcFailureCase: Unit = {
val url = "http://example.com"
- doExceptionToErrorResponseTest(CrcInvocationException(url, req, new Exception))
+ doErrorResponseTest(CrcInvocationException(url, req, new Exception),classOf[CrcCouldNotBeInvoked])
}
@Test
def testHandlesMappingFailureCase: Unit = {
- doExceptionToErrorResponseTest(new AdapterMappingException("blarg", new Exception))
+ doErrorResponseTest(new AdapterMappingException("blarg", new Exception),classOf[AdapterMappingProblem])
}
@Test
def testHandlesGeneralFailureCase: Unit = {
- doExceptionToErrorResponseTest(new Exception("blerg"))
+ doErrorResponseTest(new Exception("blerg"),classOf[ProblemNotInCodec])
}
-
- private def doExceptionToErrorResponseTest(exception: Throwable): Unit = {
+
+ //noinspection ScalaUnreachableCode,RedundantBlock
+ private def doErrorResponseTest(exception: Throwable,problemClass:Class[_]) = {
val adapter = new MockAdapter(throw exception)
-
- adapter.perform(BroadcastMessage(networkAuthn, req)) should equal(ErrorResponse(exception.getMessage))
- }
+
+ val response = adapter.perform(BroadcastMessage(networkAuthn, req))
+
+ response match {
+ case errorResponse:ErrorResponse => {
+ val pd = errorResponse.problemDigest
+ pd.codec should be (problemClass.getName)
+ }
+ case x => fail(s"$x is not an ErrorResponse")
+ }
+ }
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala
index 8344de5e7..1d5a2864b 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala
@@ -1,44 +1,44 @@
package net.shrine.adapter
import scala.concurrent.duration.DurationInt
import org.junit.Test
import net.shrine.client.Poster
import net.shrine.protocol.ReadInstanceResultsRequest
import net.shrine.protocol.ReadInstanceResultsResponse
import net.shrine.adapter.dao.AdapterDao
import net.shrine.protocol.DefaultBreakdownResultOutputTypes
/**
* @author clint
- * @date Nov 7, 2012
+ * @since Nov 7, 2012
*/
final class ReadInstanceResultsAdapterTest extends
AbstractQueryRetrievalTestCase(
(dao, httpClient) => new ReadInstanceResultsAdapter(
Poster("", httpClient),
AbstractQueryRetrievalTestCase.hiveCredentials,
dao,
true,
DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false
),
(queryId, authn) => ReadInstanceResultsRequest("some-project-id", 10.seconds, authn, queryId),
ReadInstanceResultsResponse.unapply) {
@Test
def testProcessInvalidRequest = doTestProcessInvalidRequest
@Test
def testProcessRequest = doTestProcessRequest
-
+
@Test
def testProcessRequestMissingQuery = doTestProcessRequestMissingQuery
@Test
def testProcessRequestIncompleteQuery = doTestProcessRequestIncompleteQuery(true)
@Test
def testProcessRequestIncompleteQueryCountResultRetrievalFails = doTestProcessRequestIncompleteQuery(false)
@Test
def testProcessRequestQueuedQuery = doTestProcessRequestQueuedQuery
}
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryDefinitionAdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryDefinitionAdapterTest.scala
index b4fcd4f76..563315581 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryDefinitionAdapterTest.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryDefinitionAdapterTest.scala
@@ -1,78 +1,83 @@
package net.shrine.adapter
+import net.shrine.adapter.components.QueryNotInDatabase
+import net.shrine.problem.ProblemNotInCodec
import org.junit.Test
import net.shrine.util.ShouldMatchersForJUnit
import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest
import net.shrine.protocol.AuthenticationInfo
import net.shrine.protocol.BroadcastMessage
import net.shrine.protocol.Credential
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.ReadQueryDefinitionRequest
import net.shrine.protocol.ReadQueryDefinitionResponse
import net.shrine.protocol.RenameQueryRequest
import net.shrine.protocol.query.QueryDefinition
import net.shrine.protocol.query.Term
/**
* @author clint
- * @date Nov 28, 2012
+ * @since Nov 28, 2012
*/
final class ReadQueryDefinitionAdapterTest extends AbstractSquerylAdapterTest with AdapterTestHelpers with ShouldMatchersForJUnit {
- private val networkAuthn = AuthenticationInfo("network-domain", "network-user", Credential("skajdhkasjdh", false))
+ private val networkAuthn = AuthenticationInfo("network-domain", "network-user", Credential("skajdhkasjdh", isToken = false))
import scala.concurrent.duration._
@Test
def testProcessRequest = afterCreatingTables {
val name = "blarg"
val expr = Term("foo")
val fooQuery = QueryDefinition(name,expr)
val adapter = new ReadQueryDefinitionAdapter(dao)
//Should get error for non-existent query
{
- val ErrorResponse(msg) = adapter.processRequest(BroadcastMessage(123L, networkAuthn, ReadQueryDefinitionRequest("proj", 1.second, authn, queryId)))
+ val ErrorResponse(msg,problemDigest) = adapter.processRequest(BroadcastMessage(123L, networkAuthn, ReadQueryDefinitionRequest("proj", 1.second, authn, queryId)))
msg should not be(null)
+
+ problemDigest.codec should be (classOf[QueryNotInDatabase].getName)
}
//Add a query
dao.insertQuery(localMasterId, queryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None)
//sanity check that it's there
{
val Some(query) = dao.findQueryByNetworkId(queryId)
query.networkId should equal(queryId)
}
{
//Should still get error for non-existent query
- val ErrorResponse(msg) = adapter.processRequest(BroadcastMessage(123L, networkAuthn, ReadQueryDefinitionRequest("proj", 1.second, authn, bogusQueryId)))
+ val ErrorResponse(msg, problemDigest) = adapter.processRequest(BroadcastMessage(123L, networkAuthn, ReadQueryDefinitionRequest("proj", 1.second, authn, bogusQueryId)))
msg should not be(null)
+ problemDigest.codec should be (classOf[QueryNotInDatabase].getName)
}
{
//try to read a real query
val ReadQueryDefinitionResponse(rQueryId, rName, userId, createDate, queryDefinition) = adapter.processRequest(BroadcastMessage(123L, networkAuthn, ReadQueryDefinitionRequest("proj", 1.second, authn, queryId)))
rQueryId should equal(queryId)
rName should equal(name)
userId should equal(authn.username)
createDate should not be(null) // :(
queryDefinition should equal(QueryDefinition(name, expr).toI2b2String)
}
}
@Test
def testProcessRequestBadRequest {
val adapter = new ReadQueryDefinitionAdapter(dao)
intercept[Exception] {
adapter.processRequest(BroadcastMessage(123L, networkAuthn, RenameQueryRequest("proj", 1.second, authn, queryId, "foo")))
}
}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/AbstractI2b2AdminResourceJaxrsTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/AbstractI2b2AdminResourceJaxrsTest.scala
index b6e2130dc..45636c915 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/AbstractI2b2AdminResourceJaxrsTest.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/AbstractI2b2AdminResourceJaxrsTest.scala
@@ -1,119 +1,117 @@
package net.shrine.adapter.service
import scala.xml.XML
import org.junit.After
import org.junit.Before
import net.shrine.util.ShouldMatchersForJUnit
import net.shrine.adapter.AdapterTestHelpers
import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest
import net.shrine.client.HttpClient
import net.shrine.client.HttpResponse
import net.shrine.client.JerseyHttpClient
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.QueryMaster
import net.shrine.protocol.ReadI2b2AdminPreviousQueriesRequest
import net.shrine.protocol.ReadPreviousQueriesResponse
import net.shrine.protocol.ReadQueryDefinitionRequest
import net.shrine.protocol.ReadQueryDefinitionResponse
import net.shrine.protocol.query.QueryDefinition
import net.shrine.util.XmlUtil
import net.shrine.crypto.TrustParam.AcceptAllCerts
import net.shrine.protocol.I2b2AdminRequestHandler
import net.shrine.protocol.I2b2AdminReadQueryDefinitionRequest
import junit.framework.TestCase
import net.shrine.protocol.DefaultBreakdownResultOutputTypes
/**
* @author clint
* @date Apr 24, 2013
*/
abstract class AbstractI2b2AdminResourceJaxrsTest extends TestCase with JerseyTestComponent[I2b2AdminRequestHandler] with AbstractSquerylAdapterTest with ShouldMatchersForJUnit with CanLoadTestData with AdapterTestHelpers {
import scala.concurrent.duration._
protected def adminClient = I2b2AdminClient(resourceUrl, JerseyHttpClient(AcceptAllCerts, 5.minutes))
override def resourceClass(handler: I2b2AdminRequestHandler) = I2b2AdminResource(handler, DefaultBreakdownResultOutputTypes.toSet)
override val basePath = "i2b2/admin/request"
@Before
override def setUp(): Unit = this.JerseyTest.setUp()
@After
override def tearDown(): Unit = this.JerseyTest.tearDown()
protected object NeverAuthenticatesMockPmHttpClient extends HttpClient {
override def post(input: String, url: String): HttpResponse = HttpResponse.ok(ErrorResponse("blarg").toI2b2String)
}
protected object AlwaysAuthenticatesMockPmHttpClient extends HttpClient {
override def post(input: String, url: String): HttpResponse = {
HttpResponse.ok(XmlUtil.stripWhitespace {
Some user
{ userId }
{ domain }
{ password }
MANAGER
}.toString)
}
}
protected def doTestReadQueryDefinition(networkQueryId: Long, expectedQueryNameAndQueryDef: Option[(String, QueryDefinition)]) {
val request = I2b2AdminReadQueryDefinitionRequest(projectId, waitTime, authn, networkQueryId)
val currentHandler = handler
val resp = adminClient.readQueryDefinition(request)
def stripNamespaces(s: String) = XmlUtil.stripNamespaces(XML.loadString(s))
expectedQueryNameAndQueryDef match {
case Some((expectedQueryName, expectedQueryDef)) => {
val response @ ReadQueryDefinitionResponse(masterId, name, userId, createDate, queryDefinition) = resp
masterId should be(networkQueryId)
name should be(expectedQueryName)
userId should be(authn.username)
createDate should not be (null)
//NB: I'm not sure why whacky namespaces were coming back from the resource;
//this checks that the gist of the queryDef XML makes it back.
//TODO: revisit this
stripNamespaces(queryDefinition) should equal(stripNamespaces(expectedQueryDef.toI2b2String))
}
case None => resp.isInstanceOf[ErrorResponse] should be(true)
}
}
protected def doTestReadI2b2AdminPreviousQueries(request: ReadI2b2AdminPreviousQueriesRequest, expectedQueryMasters: Seq[QueryMaster]) {
val currentHandler = handler
val ReadPreviousQueriesResponse(queryMasters) = adminClient.readI2b2AdminPreviousQueries(request)
if(expectedQueryMasters.isEmpty) { queryMasters.isEmpty should be(true) }
else {
- println(queryMasters)
- println(expectedQueryMasters)
-
+
(queryMasters zip expectedQueryMasters).foreach { case (queryMaster, expected) =>
queryMaster.createDate should not be(null)
queryMaster.name should equal(expected.name)
queryMaster.queryMasterId should equal(expected.queryMasterId)
queryMaster.userId should equal(expected.userId)
queryMaster.groupId should equal(expected.groupId)
queryMaster.flagged should equal(expected.flagged)
queryMaster.held should equal(expected.held)
}
}
}
}
\ No newline at end of file
diff --git a/apps/shrine-app/src/test/scala/net/shrine/wiring/ShrineConfigTest.scala b/apps/shrine-app/src/test/scala/net/shrine/wiring/ShrineConfigTest.scala
index 23982a854..7eabb348e 100644
--- a/apps/shrine-app/src/test/scala/net/shrine/wiring/ShrineConfigTest.scala
+++ b/apps/shrine-app/src/test/scala/net/shrine/wiring/ShrineConfigTest.scala
@@ -1,132 +1,130 @@
package net.shrine.wiring
import com.typesafe.config.ConfigFactory
import net.shrine.authentication.AuthenticationType
import net.shrine.authorization.AuthorizationType
import net.shrine.broadcaster.NodeListParserTest
import net.shrine.client.EndpointConfigTest
import net.shrine.crypto.{KeyStoreType, SigningCertStrategy}
import net.shrine.protocol.TestResultOutputTypes
import net.shrine.util.ShouldMatchersForJUnit
import org.junit.Test
/**
* @author clint
* @since Feb 6, 2013
*/
final class ShrineConfigTest extends ShouldMatchersForJUnit {
private def shrineConfig(baseFileName: String, loadBreakdownsFile: Boolean = true) = {
val baseConfig = ConfigFactory.load(baseFileName)
val breakdownConfig = ConfigFactory.load("breakdowns")
val config = if(loadBreakdownsFile) baseConfig.withFallback(breakdownConfig) else baseConfig
ShrineConfig(config)
}
import scala.concurrent.duration._
@Test
def testApply() {
import NodeListParserTest.node
import EndpointConfigTest.endpoint
val conf = shrineConfig("shrine")
conf.pmEndpoint should equal(endpoint("http://services.i2b2.org/i2b2/rest/PMService/getServices"))
conf.ontEndpoint should equal(endpoint("http://example.com:9090/i2b2/rest/OntologyService/"))
conf.adapterConfig.get.crcEndpoint should equal(endpoint("http://services.i2b2.org/i2b2/rest/QueryToolService/"))
conf.queryEntryPointConfig.get.authenticationType should be(AuthenticationType.Ecommons)
conf.queryEntryPointConfig.get.authorizationType should be(AuthorizationType.HmsSteward)
conf.queryEntryPointConfig.get.sheriffEndpoint.get should equal(endpoint("http://localhost:8080/shrine-hms-authorization/queryAuthorization"))
conf.queryEntryPointConfig.get.sheriffCredentials.get.domain should be(None)
conf.queryEntryPointConfig.get.sheriffCredentials.get.username should be("sheriffUsername")
conf.queryEntryPointConfig.get.sheriffCredentials.get.password should be("sheriffPassword")
conf.humanReadableNodeName should equal("SHRINE Cell")
conf.crcHiveCredentials should equal(conf.pmHiveCredentials)
conf.crcHiveCredentials.domain should equal("HarvardDemo")
conf.crcHiveCredentials.username should equal("demo")
conf.crcHiveCredentials.password should equal("demouser")
conf.crcHiveCredentials.projectId should equal("Demo")
conf.ontHiveCredentials.domain should equal("HarvardDemo")
conf.ontHiveCredentials.username should equal("demo")
conf.ontHiveCredentials.password should equal("demouser")
conf.ontHiveCredentials.projectId should equal("SHRINE")
conf.ontHiveCredentials.domain should equal("HarvardDemo")
conf.ontHiveCredentials.username should equal("demo")
conf.ontHiveCredentials.password should equal("demouser")
conf.ontHiveCredentials.projectId should equal("SHRINE")
conf.queryEntryPointConfig.get.broadcasterIsLocal should be(false)
conf.queryEntryPointConfig.get.broadcasterServiceEndpoint.get should equal(endpoint("http://example.com/shrine/rest/broadcaster/broadcast"))
conf.queryEntryPointConfig.get.maxQueryWaitTime should equal(5.minutes)
conf.queryEntryPointConfig.get.signingCertStrategy should equal(SigningCertStrategy.Attach)
conf.adapterConfig.get.setSizeObfuscation should equal(true)
conf.queryEntryPointConfig.get.includeAggregateResults should equal(false)
conf.adapterConfig.get.adapterLockoutAttemptsThreshold should equal(10)
conf.hubConfig.get.maxQueryWaitTime should equal(4.5.minutes)
conf.adapterConfig.get.maxSignatureAge should equal(5.minutes)
conf.adapterStatusQuery should equal("""\\SHRINE\SHRINE\Diagnoses\Mental Illness\Disorders usually diagnosed in infancy, childhood, or adolescence\Pervasive developmental disorders\Infantile autism, current or active state\""")
conf.adapterConfig.get.adapterMappingsFileName should equal("AdapterMappings.xml")
conf.shrineDatabaseType should equal("mysql")
conf.keystoreDescriptor.file should be("shrine.keystore")
conf.keystoreDescriptor.password should be("chiptesting")
conf.keystoreDescriptor.privateKeyAlias should be(Some("test-cert"))
conf.keystoreDescriptor.keyStoreType should be(KeyStoreType.PKCS12)
conf.keystoreDescriptor.caCertAliases should be(Seq("carra ca"))
conf.hubConfig.get.downstreamNodes.toSet should equal {
Set(
node("some hospital", "http://example.com/foo"),
node("CHB", "http://example.com/chb"),
node("PHS", "http://example.com/phs"))
}
conf.adapterConfig.get.immediatelyRunIncomingQueries should be(false)
conf.breakdownResultOutputTypes should equal(TestResultOutputTypes.values)
}
@Test
def testApplyOptionalFields() {
val conf = shrineConfig("shrine-no-optional-configs", loadBreakdownsFile = false)
- println(conf)
-
conf.adapterConfig should be(None)
conf.hubConfig should be(None)
conf.queryEntryPointConfig should be(None)
conf.breakdownResultOutputTypes should be(Set.empty)
}
@Test
def testApplySomeOptionalFields() {
val conf = shrineConfig("shrine-some-optional-props")
conf.queryEntryPointConfig.get.authenticationType should be(AuthenticationType.Pm)
conf.queryEntryPointConfig.get.authorizationType should be(AuthorizationType.NoAuthorization)
conf.queryEntryPointConfig.get.signingCertStrategy should be(SigningCertStrategy.DontAttach)
conf.breakdownResultOutputTypes should equal(TestResultOutputTypes.values)
}
}
\ No newline at end of file
diff --git a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala
index e59885567..14b9a33c6 100644
--- a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala
+++ b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala
@@ -1,76 +1,100 @@
package net.shrine.authorization
import net.shrine.log.Loggable
+import net.shrine.problem.{LoggingProblemHandler, Problem, ProblemSources, AbstractProblem, ProblemDigest}
import scala.util.Try
import net.shrine.client.HttpResponse
import net.shrine.i2b2.protocol.pm.GetUserConfigurationRequest
import net.shrine.i2b2.protocol.pm.User
import net.shrine.protocol.AuthenticationInfo
import net.shrine.protocol.ErrorResponse
import scala.util.control.NonFatal
/**
* @author clint
- * @date Apr 5, 2013
+ * @since Apr 5, 2013
*/
trait PmAuthorizerComponent { self: PmHttpClientComponent with Loggable =>
import PmAuthorizerComponent._
+ //noinspection RedundantBlock
object Pm {
def parsePmResult(authn: AuthenticationInfo)(httpResponse: HttpResponse): Try[Either[ErrorResponse, User]] = {
User.fromI2b2(httpResponse.body).map(Right(_)).recoverWith {
case NonFatal(e) => {
debug(s"Couldn't extract a User from '$httpResponse'")
Try(Left(ErrorResponse.fromI2b2(httpResponse.body)))
}
}.recover {
case NonFatal(e) => {
- warn(s"Couldn't understand response from PM: '$httpResponse'", e)
-
- Left(ErrorResponse(s"Error authorizing ${authn.domain}:${authn.username}: ${e.getMessage}"))
+ val problem = CouldNotInterpretResponseFromPmCell(pmPoster.url,authn,httpResponse,e)
+ LoggingProblemHandler.handleProblem(problem)
+ Left(ErrorResponse(problem.summary,Some(problem)))
}
}
}
def authorize(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo): AuthorizationStatus = {
val request = GetUserConfigurationRequest(authn)
val responseAttempt = Try {
debug(s"Authorizing with PM cell at ${pmPoster.url}")
pmPoster.post(request.toI2b2String)
}
val authStatusAttempt = responseAttempt.flatMap(parsePmResult(authn)).map {
case Right(user) => {
val managerUserOption = for {
roles <- user.rolesByProject.get(projectId)
if neededRoles.forall(roles.contains)
} yield user
- managerUserOption.map(Authorized(_)).getOrElse {
- NotAuthorized(s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'")
+ managerUserOption.map(Authorized).getOrElse {
+ NotAuthorized(MissingRequiredRoles(projectId,neededRoles,authn))
}
}
- case Left(ErrorResponse(message)) => NotAuthorized(message)
+ case Left(errorResponse) => {
+ //todo remove when ErrorResponse gets its message
+ info(s"ErrorResponse message '${errorResponse.errorMessage}' may not have carried through to the NotAuthorized object")
+ NotAuthorized(errorResponse.problemDigest)
+ }
}
authStatusAttempt.getOrElse {
- NotAuthorized(s"Error authorizing ${authn.domain}:${authn.username} with PM at ${pmPoster.url}")
+ NotAuthorized(CouldNotReachPmCell(pmPoster.url,authn))
}
}
}
}
object PmAuthorizerComponent {
sealed trait AuthorizationStatus
case class Authorized(user: User) extends AuthorizationStatus
- case class NotAuthorized(reason: String) extends AuthorizationStatus {
- def toErrorResponse = ErrorResponse(reason)
+ case class NotAuthorized(problemDigest: ProblemDigest) extends AuthorizationStatus {
+ def toErrorResponse = ErrorResponse(problemDigest.summary,problemDigest)
+ }
+
+ object NotAuthorized {
+ def apply(problem:Problem):NotAuthorized = NotAuthorized(problem.toDigest)
}
+}
+
+case class MissingRequiredRoles(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep) {
+ override val summary: String = s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'"
+}
+
+case class CouldNotReachPmCell(pmUrl:String,authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep) {
+ override def summary: String = s"Could not reach PM cell at $pmUrl for ${authn.domain}:${authn.username}"
+}
+
+case class CouldNotInterpretResponseFromPmCell(pmUrl:String,authn: AuthenticationInfo,httpResponse: HttpResponse,x:Throwable) extends AbstractProblem(ProblemSources.Qep) {
+ override def summary: String = s"Exception while interpreting response from PM cell at ${pmUrl} for ${authn.domain}:${authn.username}: $httpResponse"
+
+ override def throwable = Some(x)
}
\ No newline at end of file
diff --git a/commons/auth/src/test/scala/net/shrine/authorization/PmAuthorizerComponentTest.scala b/commons/auth/src/test/scala/net/shrine/authorization/PmAuthorizerComponentTest.scala
index 1f0878448..e486c523e 100644
--- a/commons/auth/src/test/scala/net/shrine/authorization/PmAuthorizerComponentTest.scala
+++ b/commons/auth/src/test/scala/net/shrine/authorization/PmAuthorizerComponentTest.scala
@@ -1,197 +1,199 @@
package net.shrine.authorization
import net.shrine.log.Loggable
+import net.shrine.problem.ProblemNotInCodec
import org.junit.Test
import net.shrine.util.ShouldMatchersForJUnit
import net.shrine.client.HttpClient
import net.shrine.i2b2.protocol.pm.User
import net.shrine.protocol.AuthenticationInfo
import net.shrine.protocol.Credential
import net.shrine.util.XmlUtil
import net.shrine.client.Poster
-import net.shrine.authorization.PmAuthorizerComponent.Authorized
-import net.shrine.authorization.PmAuthorizerComponent.NotAuthorized
/**
* @author clint
- * @date Apr 5, 2013
+ * @since Apr 5, 2013
*/
+//noinspection UnitMethodIsParameterless,UnitMethodIsParameterless,EmptyParenMethodAccessedAsParameterless,ScalaUnnecessaryParentheses
final class PmAuthorizerComponentTest extends ShouldMatchersForJUnit {
import PmAuthorizerComponentTest._
import PmAuthorizerComponent._
//Adds a bogus URL, for convenience
private[this] def poster(httpClient: HttpClient) = Poster("", httpClient)
@Test
def testGoodResponseNotManager {
val component = new TestPmAuthorizerComponent(poster(new LazyMockHttpClient(validUserResponseXml.toString)))
- val NotAuthorized(reason) = component.Pm.authorize(projectId1, Set(User.Roles.Manager), authn)
+ val NotAuthorized(problemDigest) = component.Pm.authorize(projectId1, Set(User.Roles.Manager), authn)
- reason should not be(null)
- reason.size should not be(0)
+ problemDigest should not be(null)
+ problemDigest.codec should be (classOf[MissingRequiredRoles].getName)
}
@Test
def testGoodResponseIsManager {
val component = new TestPmAuthorizerComponent(poster(new LazyMockHttpClient(validUserResponseXml.toString)))
val Authorized(user) = component.Pm.authorize(projectId2, Set(User.Roles.Manager), authn)
user should not be(null)
user.fullName should be(fullName)
user.username should be(username)
user.domain should be(domain)
user.credential should be(authn.credential)
}
@Test
def testErrorResponse {
val component = new TestPmAuthorizerComponent(poster(new LazyMockHttpClient(i2b2ErrorXml.toString)))
- val NotAuthorized(reason) = component.Pm.authorize(projectId1, Set.empty, authn)
-
- reason should be(errorMessage)
+ val NotAuthorized(problemDigest) = component.Pm.authorize(projectId1, Set.empty, authn)
+
+ problemDigest should not be(null)
+
+ problemDigest.codec should be (classOf[CouldNotInterpretResponseFromPmCell].getName)
}
@Test
def testJunkResponse {
val component = new TestPmAuthorizerComponent(poster(new LazyMockHttpClient("jahfskajhkjashfdkjashkfd")))
- val NotAuthorized(reason) = component.Pm.authorize(projectId1, Set.empty, authn)
-
- reason should not be(null)
- reason.size should not be(0)
- }
+ val NotAuthorized(problemDigest) = component.Pm.authorize(projectId1, Set.empty, authn)
+
+ problemDigest should not be(null)
+ problemDigest.codec should be (classOf[CouldNotReachPmCell].getName)
+ }
@Test
- def testResponseThatBlowsUp {
+ def testResponseThatBlowsUp() {
val component = new TestPmAuthorizerComponent(poster(new LazyMockHttpClient(throw new Exception with scala.util.control.NoStackTrace)))
- val NotAuthorized(reason) = component.Pm.authorize(projectId1, Set.empty, authn)
-
- reason should not be(null)
- reason.size should not be(0)
+ val NotAuthorized(problemDigest) = component.Pm.authorize(projectId1, Set.empty, authn)
+
+ problemDigest should not be(null)
+ problemDigest.codec should be (classOf[CouldNotReachPmCell].getName)
}
private val fullName = "Some Person"
private val username = "some-user"
private val domain = "some-place"
private val password = "sal;dk;aslkd;"
private val errorMessage = "Something blew up"
- private lazy val authn = AuthenticationInfo(domain, username, Credential(password, true))
+ private lazy val authn = AuthenticationInfo(domain, username, Credential(password, isToken = true))
private val projectId1 = "foo"
private val projectId2 = "bar"
private val params1 = Map("x" -> "1", "y" -> "2")
private val params2 = Map("y" -> "2", "z" -> "3")
private val roles1 = Set("a", "b", "c")
private val roles2 = Set("MANAGER", "x", "y")
private lazy val projects = Seq((projectId1, params1, roles1), (projectId2, params2, roles2))
private lazy val validUserResponseXml = XmlUtil.stripWhitespace {
1.1
2.4
SHRINE
1.3-compatible
SHRINE
2011-04-08T16:21:12.251-04:00
DONE
DEVELOPMENT
http://www.i2b2.org
{fullName}
{username}
{password}
{domain}
{
projects.map { case (projectId, params, roles) =>
Demo Group { projectId }
http://www.i2b2.org
{
params.map { case (name, value) =>
{ value }
}
}
{
roles.map(r => { r })
}
}
}
Data Repository
http://localhost/crc
REST
Ontology Cell
http://localhost/ont
REST
false
200
false
}
private val i2b2ErrorXml = XmlUtil.stripWhitespace {
1.1
2.4
SHRINE
1.3-compatible
SHRINE
2011-04-08T16:21:12.251-04:00
{ errorMessage }
}
}
object PmAuthorizerComponentTest {
final class TestPmAuthorizerComponent(override val pmPoster: Poster) extends PmAuthorizerComponent with PmHttpClientComponent with Loggable
}
\ No newline at end of file
diff --git a/commons/ont-support/src/main/scala/net/shrine/ont/data/ShrineSqlOntologyDao.scala b/commons/ont-support/src/main/scala/net/shrine/ont/data/ShrineSqlOntologyDao.scala
index fdf5e1c10..787d0f451 100644
--- a/commons/ont-support/src/main/scala/net/shrine/ont/data/ShrineSqlOntologyDao.scala
+++ b/commons/ont-support/src/main/scala/net/shrine/ont/data/ShrineSqlOntologyDao.scala
@@ -1,80 +1,81 @@
package net.shrine.ont.data
import net.shrine.ont.messaging.Concept
import scala.io.Source
import scala.util.matching.Regex
import scala.util.matching.Regex.Match
import java.io.InputStream
/**
* @author Clint Gilbert
* @date Feb 8, 2012
*
*/
final class ShrineSqlOntologyDao(val file: InputStream) extends OntologyDao {
require(file != null)
override def ontologyEntries: Iterable[Concept] = {
//Matches VALUES (99, '', 'synonym', ''
//where is_synonym is 'Y' or 'N'
val pathAndSynonymRegex = """VALUES\s+\(\d+,\s+'(.+?)',\s+'(.+?)',\s+'(\w)',\s+'(.+?)',\s+(NULL|'.*?'),\s+(NULL|'(.*?)')""".r
def mungeBaseCode(rawBaseCode: String): Option[String] = {
val icd9BaseCodePrefix = """SHRINE|ICD9:"""
def isNotNull(s: String) = s != "NULL"
def isNotEmpty(s: String) = !s.isEmpty
def isAcceptable(s: String): Boolean = !s.isEmpty && s != "NULL" && s.startsWith(icd9BaseCodePrefix)
Option(rawBaseCode).filter(isAcceptable).map(_.drop(icd9BaseCodePrefix.size))
}
def toConcept(termMatch: Match): Concept = {
/*val synonym = termMatch.group(3) match {
case "Y" => Option(termMatch.group(2))
case "N" => None
}*/
//Ignore synonym_cd (y/n) field.
//NB: Do this because medications are stored as coded names (..\CV000\CV350\203144\, etc)
//but with synonyms containing human-readable names for the drugs ('pravastatin sodium', etc).
//Crucially, synonym_cd is 'N' in this case (not sure why), and we don't want those synonyms to
//be ignored. This has the effect of making medications' names available when searching an index
//of terms, and also makes the relevance ranking "better", in that simple terms "male", "female"
//are higher up the list. Some terms now have a redundant synonym now, but this is an ok tradeoff.
val synonym = Option(termMatch.group(2))
val rawPath = termMatch.group(1)
val rawBaseCode = termMatch.group(7)
//Need to add '\\SHRINE' that's missing from the SQL file, but needed by i2b2
Concept("""\\SHRINE""" + rawPath, synonym, mungeBaseCode(rawBaseCode))
}
parseWith(pathAndSynonymRegex, toConcept)
}
private def parseWith(regex: Regex, parser: Match => Concept): Iterable[Concept] = {
def parseLine(line: String): Option[Concept] = {
val result = regex.findFirstMatchIn(line).map(parser)
if(result.isEmpty) {
+ //todo found this scary bit
println("Failed to parse line: " + line)
}
result
}
def noEmptyLines(line: String) = line != null && line.trim != ""
def mungeSingleQuotes(line: String) = line.replace("''", "'")
val source = Source.fromInputStream(file)
source.getLines.filter(noEmptyLines)/*.map(mungeSingleQuotes)*/.flatMap(parseLine).toIterable
}
}
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala b/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala
index c79abebc0..0dbc8ffa2 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala
@@ -1,100 +1,124 @@
package net.shrine.protocol
-import xml.NodeSeq
+import net.shrine.problem.{LoggingProblemHandler, Problem, ProblemNotInCodec, ProblemDigest}
+
+import scala.xml.{NodeBuffer, NodeSeq}
import net.shrine.util.XmlUtil
import net.shrine.serialization.XmlUnmarshaller
import net.shrine.serialization.I2b2Unmarshaller
import net.shrine.util.NodeSeqEnrichments
import scala.util.Try
import scala.util.control.NonFatal
/**
* @author Bill Simons
* @since 4/25/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*
* NB: Now a case class for structural equality
*/
-final case class ErrorResponse(errorMessage: String) extends ShrineResponse {
-
- //todo codec id, one-liner, medium message, detailed message
+final case class ErrorResponse(errorMessage: String,problemDigest:ProblemDigest) extends ShrineResponse {
- override protected def status = { errorMessage }
+ override protected def status: NodeSeq = {
+ val buffer = new NodeBuffer
+ buffer += { errorMessage }
+ buffer += problemDigest.toXml
+ }
override protected def i2b2MessageBody = null
import ErrorResponse.rootTagName
- override def toXml = XmlUtil.stripWhitespace {
- XmlUtil.renameRootTag(rootTagName) {
-
- //todo this is where the ProblemDigest goes.
-
- { errorMessage }
-
-
+ override def toXml = { XmlUtil.stripWhitespace {
+ val xml = XmlUtil.renameRootTag(rootTagName) {
+
+ { errorMessage }
+ {problemDigest.toXml}
+
+ }
+ xml
}
}
}
object ErrorResponse extends XmlUnmarshaller[ErrorResponse] with I2b2Unmarshaller[ErrorResponse] with HasRootTagName {
val rootTagName = "errorResponse"
+ //todo deprecate this one
+ def apply(errorMessage:String,problem:Option[Problem] = None) = {
+ new ErrorResponse(errorMessage,problem.fold{
+ val problem = ProblemNotInCodec(s"'$errorMessage'")
+ LoggingProblemHandler.handleProblem(problem) //todo someday hook up to the proper problem handler hierarchy.
+ problem.toDigest
+ }(p => p.toDigest))
+ }
+
+ def apply(problem:Problem) = {
+ new ErrorResponse(problem.summary,problem.toDigest)
+ }
override def fromXml(xml: NodeSeq): ErrorResponse = {
+
val messageXml = xml \ "message"
//NB: Fail fast
require(messageXml.nonEmpty)
- ErrorResponse(XmlUtil.trim(messageXml))
+ val problemDigest = ProblemDigest.fromXml(xml)
+
+ ErrorResponse(XmlUtil.trim(messageXml),problemDigest)
}
override def fromI2b2(xml: NodeSeq): ErrorResponse = {
import NodeSeqEnrichments.Strictness._
+ //todo what determines parseFormatA vs parseFormatB when written? It looks like our ErrorResponses use A.
+
def parseFormatA: Try[ErrorResponse] = {
for {
statusXml <- xml withChild "response_header" withChild "result_status" withChild "status"
- typeText <- statusXml attribute "type"
- if typeText == "ERROR" //NB: Fail fast
- statusMessage = XmlUtil.trim(statusXml)
+ resultStatusXml <- xml withChild "response_header" withChild "result_status"
+ typeText <- statusXml attribute "type" if typeText == "ERROR" //NB: Fail fast{
+ statusMessage = XmlUtil.trim(statusXml)
+ problemDigest = ProblemDigest.fromXml(resultStatusXml)
} yield {
- ErrorResponse(statusMessage)
+ ErrorResponse(statusMessage,problemDigest)
}
}
def parseFormatB: Try[ErrorResponse] = {
for {
conditionXml <- xml withChild "message_body" withChild "response" withChild "status" withChild "condition"
typeText <- conditionXml attribute "type"
if typeText == "ERROR"
statusMessage = XmlUtil.trim(conditionXml)
} yield {
ErrorResponse(statusMessage)
}
}
- parseFormatA.recoverWith { case NonFatal(e) => parseFormatB }.get
+ parseFormatA.recoverWith { case NonFatal(e) => {
+ e.printStackTrace() //todo log instead
+ parseFormatB
+ } }.get
}
/**
*
*
*
*
*
* Query result instance id 3126 not found
*
*
*
*
*/
}
\ No newline at end of file
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
index 04ce64adb..56c978628 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
@@ -1,351 +1,351 @@
package net.shrine.protocol
import javax.xml.datatype.XMLGregorianCalendar
import net.shrine.problem.{Problem, ProblemNotInCodec, ProblemDigest}
import net.shrine.protocol.QueryResult.StatusType
import scala.xml.NodeSeq
import net.shrine.util.{Tries, XmlUtil, NodeSeqEnrichments, SEnum, XmlDateHelper, OptionEnrichments}
import net.shrine.serialization.{ I2b2Marshaller, XmlMarshaller }
import scala.util.Try
/**
* @author Bill Simons
* @since 4/15/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*
* NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing
*/
final case class QueryResult(
resultId: Long,
instanceId: Long,
resultType: Option[ResultOutputType],
setSize: Long,
startDate: Option[XMLGregorianCalendar],
endDate: Option[XMLGregorianCalendar],
description: Option[String],
statusType: StatusType,
statusMessage: Option[String],
problemDigest: Option[ProblemDigest] = None,
breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty
) extends XmlMarshaller with I2b2Marshaller {
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
None, //description
statusType,
None) //statusMessage
}
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
description: String,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
Option(description),
statusType,
None) //statusMessage
}
def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match {
case Some(rt) => rt == testedResultType
case _ => false
}
import QueryResult._
//NB: Fragile, non-type-safe ==
def isError = statusType == StatusType.Error
def elapsed: Option[Long] = {
def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis
for {
start <- startDate
end <- endDate
} yield inMillis(end) - inMillis(start)
}
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = {
breakdowns.values.toSeq.sortBy(_.resultType.name)
}
override def toI2b2: NodeSeq = {
import OptionEnrichments._
XmlUtil.stripWhitespace {
{ resultId }
{ instanceId }
{ description.toXml() }
{
resultType match {
case Some(rt) if !rt.isError => //noinspection RedundantBlock
{
if (rt.isBreakdown) { rt.toI2b2NameOnly() }
else { rt.toI2b2 }
}
case _ => ResultOutputType.ERROR.toI2b2NameOnly("")
}
}
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ statusType }
{ statusType.toI2b2(this) }
{
//NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case,
//so if we're going to do that, let's produce saner XML.
sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data"))
}
}
}
override def toXml: NodeSeq = XmlUtil.stripWhitespace {
import OptionEnrichments._
{ resultId }
{ instanceId }
{ resultType.toXml(_.toXml) }
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ description.toXml() }
{ statusType }
{ statusMessage.toXml() }
{
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
sortedBreakdowns.map(_.toXml)
}
{ problemDigest.map(_.toXml).getOrElse("") }
}
def withId(id: Long): QueryResult = copy(resultId = id)
def withInstanceId(id: Long): QueryResult = copy(instanceId = id)
def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize))
def withSetSize(size: Long): QueryResult = copy(setSize = size)
def withDescription(desc: String): QueryResult = copy(description = Option(desc))
def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType))
def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData))
def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns)
}
object QueryResult {
final case class StatusType(
name: String,
isDone: Boolean,
i2b2Id: Option[Int] = Some(-1),
private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2) extends StatusType.Value {
def isError = this == StatusType.Error
def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult)
}
object StatusType extends SEnum[StatusType] {
private val defaultToI2b2: QueryResult => NodeSeq = { queryResult =>
val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{
throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id")
}
{ i2b2Id }{ queryResult.statusType.name }
}
val noMessage:NodeSeq = null
val Error = StatusType("ERROR", isDone = true, None, { queryResult =>
(queryResult.statusMessage, queryResult.problemDigest) match {
case (Some(msg),Some(pd)) => { msg } ++ pd.toXml
case (Some(msg),None) => { msg }
case (None,Some(pd)) => pd.toXml
case (None, None) => noMessage
}
})
/*
msg =>
net.shrine.something.is.Broken
Something is borked
{ msg }
Herein is a stack trace, multiple lines
))
*/
val Finished = StatusType("FINISHED", isDone = true, Some(3))
//TODO: Can we use the same for Queued, Processing, and Incomplete?
val Processing = StatusType("PROCESSING", isDone = false, Some(2))
val Queued = StatusType("QUEUED", isDone = false, Some(2))
val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2))
//TODO: What s should these have? Does anyone care?
val Held = StatusType("HELD", isDone = false)
val SmallQueue = StatusType("SMALL_QUEUE", isDone = false)
val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false)
val LargeQueue = StatusType("LARGE_QUEUE", isDone = false)
val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false)
}
def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong
private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption
def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _)
def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim
def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = {
import ResultOutputType.valueOf
val typeName = asText(elemNames: _*)(xml)
valueOf(typeName) orElse valueOf(breakdownTypes)(typeName)
}
def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = {
val attempt = parse(xml)
attempt.toOption
}
def extractProblemDigest(xml: NodeSeq):Option[ProblemDigest] = {
val subXml = xml \ "problem"
- if(subXml.nonEmpty) Some(ProblemDigest.fromXml(subXml))
+ if(subXml.nonEmpty) Some(ProblemDigest.fromXml(xml))
else None
}
def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def extract(elemName: String): Option[String] = {
Option((xml \ elemName).text.trim).filter(!_.isEmpty)
}
def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate)
val asLong = extractLong(xml) _
import NodeSeqEnrichments.Strictness._
import Tries.sequence
def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = {
//noinspection ScalaUnnecessaryParentheses
val mapAttempt = for {
subXml <- xml.withChild(elemName)
envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes)))
mappings = envelopes.map(envelope => (envelope.resultType -> envelope))
} yield Map.empty ++ mappings
mapAttempt.getOrElse(Map.empty)
}
QueryResult(
resultId = asLong("resultId"),
instanceId = asLong("instanceId"),
resultType = extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml),
setSize = asLong("setSize"),
startDate = extractDate("startDate"),
endDate = extractDate("endDate"),
description = extract("description"),
statusType = StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call
statusMessage = extract("statusMessage"),
problemDigest = extractProblemDigest(xml),
breakdowns = extractBreakdowns("resultEnvelope")
)
}
def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def asLong = extractLong(xml) _
def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim)
def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate)
QueryResult(
resultId = asLong("result_instance_id"),
instanceId = asLong("query_instance_id"),
resultType = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2),
setSize = asLong("set_size"),
startDate = asXmlGcOption("start_date"),
endDate = asXmlGcOption("end_date"),
description = asTextOption("description"),
statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get, //TODO: Avoid fragile .get call
statusMessage = asTextOption("query_status_type", "description"),
problemDigest = extractProblemDigest(xml \ "query_status_type"))
}
def errorResult(description: Option[String], statusMessage: String,problem:Option[Problem] = None):QueryResult = {
val problemDigest = problem.getOrElse(ProblemNotInCodec(statusMessage)).toDigest
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
/**
* For reconsitituting errorResults from a database
*/
def errorResult(description:Option[String], statusMessage:String, codec:String, summary:String, digestDescription:String,details:String): QueryResult = {
val problemDigest = ProblemDigest(codec,summary,digestDescription,details)
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
}
\ No newline at end of file
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/ShrineResponse.scala b/commons/protocol/src/main/scala/net/shrine/protocol/ShrineResponse.scala
index 88548c077..1af4b2de7 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/ShrineResponse.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/ShrineResponse.scala
@@ -1,88 +1,89 @@
package net.shrine.protocol
import net.shrine.serialization.XmlMarshaller
import net.shrine.serialization.XmlUnmarshaller
-import scala.xml.NodeSeq
+import scala.xml.{Elem, NodeSeq}
import net.shrine.serialization.I2b2Marshaller
import net.shrine.util.XmlUtil
import scala.util.Try
/**
* @author clint
- * @date Nov 5, 2012
+ * @since Nov 5, 2012
*/
trait ShrineResponse extends BaseShrineResponse with I2b2Marshaller {
protected def i2b2MessageBody: NodeSeq
- protected def status = DONE
+ protected def status:NodeSeq = DONE
//TODO better xmlns strategy
override def toI2b2: NodeSeq = XmlUtil.stripWhitespace {
1.1
2.4
SHRINE
1.3-compatible
SHRINE
2011-04-08T16:21:12.251-04:00
{ status }
{ i2b2MessageBody }
}
+ //todo start here Monday. figure out I2B2 xml for this
}
object ShrineResponse {
private def lift(unmarshaller: XmlUnmarshaller[ShrineResponse]): Set[ResultOutputType] => NodeSeq => Try[ShrineResponse] = {
_ => xml => Try(unmarshaller.fromXml(xml))
}
private def lift(unmarshal: Set[ResultOutputType] => NodeSeq => ShrineResponse): Set[ResultOutputType] => NodeSeq => Try[ShrineResponse] = {
knownTypes => xml => Try(unmarshal(knownTypes)(xml))
}
private val unmarshallers = Map[String, Set[ResultOutputType] => NodeSeq => Try[ShrineResponse]](
"deleteQueryResponse" -> lift(DeleteQueryResponse),
"readPreviousQueriesResponse" -> lift(ReadPreviousQueriesResponse),
"readQueryDefinitionResponse" -> lift(ReadQueryDefinitionResponse),
"readQueryInstancesResponse" -> lift(ReadQueryInstancesResponse),
"renameQueryResponse" -> lift(RenameQueryResponse),
"readInstanceResultsResponse" -> lift(ReadInstanceResultsResponse.fromXml _),
"aggregatedReadInstanceResultsResponse" -> lift(AggregatedReadInstanceResultsResponse.fromXml _),
"runQueryResponse" -> RunQueryResponse.fromXml _,
"aggregatedRunQueryResponse" -> AggregatedRunQueryResponse.fromXml _,
"readQueryResultResponse" -> lift(ReadQueryResultResponse.fromXml _),
"aggregatedReadQueryResultResponse" -> lift(AggregatedReadQueryResultResponse.fromXml _),
ErrorResponse.rootTagName -> lift(ErrorResponse),
ReadApprovedQueryTopicsResponse.rootTagName -> lift(ReadApprovedQueryTopicsResponse),
ReadPdoResponse.rootTagName -> lift(ReadPdoResponse),
ReadResultResponse.rootTagName -> ReadResultResponse.fromXml _,
FlagQueryResponse.rootTagName -> (_ => xml => FlagQueryResponse.fromXml(xml)),
UnFlagQueryResponse.rootTagName -> (_ => xml => UnFlagQueryResponse.fromXml(xml)))
def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[ShrineResponse] = {
xml match {
case null => scala.util.Failure(new IllegalArgumentException("null xml passed in"))
case _ => for {
rootTag <- Try(xml.head)
rootTagName = rootTag.label
if unmarshallers.contains(rootTagName)
unmarshal = unmarshallers(rootTagName)
unmarshalled <- unmarshal(breakdownTypes)(xml)
} yield unmarshalled
}
}
}
\ No newline at end of file
diff --git a/commons/protocol/src/test/scala/net/shrine/protocol/ErrorResponseTest.scala b/commons/protocol/src/test/scala/net/shrine/protocol/ErrorResponseTest.scala
index 2991bcdcd..6be128993 100644
--- a/commons/protocol/src/test/scala/net/shrine/protocol/ErrorResponseTest.scala
+++ b/commons/protocol/src/test/scala/net/shrine/protocol/ErrorResponseTest.scala
@@ -1,147 +1,151 @@
package net.shrine.protocol
import junit.framework.TestCase
+import net.shrine.problem.TestProblem
import net.shrine.util.ShouldMatchersForJUnit
import org.junit.Test
import scala.xml.NodeSeq
import net.shrine.util.XmlUtil
/**
* @author clint
* @since Apr 5, 2013
*/
final class ErrorResponseTest extends TestCase with ShouldMatchersForJUnit {
val message = "foo"
- val resp = ErrorResponse(message)
+ val resp = ErrorResponse(message,Some(TestProblem))
val expectedShrineXml = XmlUtil.stripWhitespace {
{ message }
+ {TestProblem.toDigest.toXml}
}
val expectedI2b2Xml = XmlUtil.stripWhitespace {
1.1
2.4
SHRINE
1.3-compatible
SHRINE
2011-04-08T16:21:12.251-04:00
{ message }
+ {TestProblem.toDigest.toXml}
}
@Test
def testToXml() = doTestToXml(expectedShrineXml, _.toXml)
@Test
def testToI2b2() = doTestToXml(expectedI2b2Xml, _.toI2b2)
@Test
def testToXmlRoundTrip() = doTestRoundTrip(_.toXml, ErrorResponse.fromXml)
@Test
def testToI2b2RoundTrip() = doTestRoundTrip(_.toI2b2, ErrorResponse.fromI2b2)
@Test
def testFromXml() = doTestFromXml(expectedShrineXml, ErrorResponse.fromXml)
@Test
def testFromI2b2() = doTestFromXml(expectedI2b2Xml, ErrorResponse.fromI2b2)
//NB: See https://open.med.harvard.edu/jira/browse/SHRINE-745
+
@Test
def testFromI2b2AlternateFormat() {
val altI2b2Xml = XmlUtil.stripWhitespace {
1.1
2.4
edu.harvard.i2b2.crc
1.5
i2b2 Hive
i2b2_QueryTool
0.2
i2b2 Hive
1
i2b2
Log information
DONE
Query result instance id 3126 not found
}
val resp = ErrorResponse.fromI2b2(altI2b2Xml)
resp should not be null
resp.errorMessage should equal("Query result instance id 3126 not found")
}
private def doTestFromXml(xml: NodeSeq, deserialize: NodeSeq => ErrorResponse) {
intercept[Exception] {
deserialize(null)
}
intercept[Exception] {
deserialize()
}
intercept[Exception] {
//Correct I2b2 XML structure, wrong status type
deserialize({ message })
}
deserialize(xml) should equal(resp)
}
private def doTestToXml(expected: NodeSeq, serialize: ErrorResponse => NodeSeq) {
val xml = serialize(resp)
-//todo turn this back on xml.toString should equal(expected.toString)
+ xml.toString should equal(expected.toString())
}
private def doTestRoundTrip(serialize: ErrorResponse => NodeSeq, deserialize: NodeSeq => ErrorResponse) {
val unmarshalled = deserialize(serialize(resp))
unmarshalled should equal(resp)
}
}
\ No newline at end of file
diff --git a/commons/protocol/src/test/scala/net/shrine/protocol/ShrineResponseTest.scala b/commons/protocol/src/test/scala/net/shrine/protocol/ShrineResponseTest.scala
index d376be18b..d676157a4 100644
--- a/commons/protocol/src/test/scala/net/shrine/protocol/ShrineResponseTest.scala
+++ b/commons/protocol/src/test/scala/net/shrine/protocol/ShrineResponseTest.scala
@@ -1,104 +1,106 @@
package net.shrine.protocol
+import net.shrine.problem.TestProblem
+
import scala.xml.NodeSeq
import org.junit.Test
import net.shrine.util.ShouldMatchersForJUnit
import net.shrine.protocol.query.QueryDefinition
import net.shrine.protocol.query.Term
import net.shrine.util.XmlDateHelper
import net.shrine.util.XmlUtil
import scala.util.Success
/**
* @author clint
* @since Nov 5, 2012
*/
//noinspection UnitMethodIsParameterless,NameBooleanParameters,ScalaUnnecessaryParentheses
final class ShrineResponseTest extends ShouldMatchersForJUnit {
@Test
def testFromXml {
//ShrineResponse.fromXml(null: String).isFailure should be(true)
ShrineResponse.fromXml(DefaultBreakdownResultOutputTypes.toSet)(null: NodeSeq).isFailure should be(true)
ShrineResponse.fromXml(DefaultBreakdownResultOutputTypes.toSet)(NodeSeq.Empty).isFailure should be(true)
def roundTrip(response: ShrineResponse): Unit = {
val unmarshalled = ShrineResponse.fromXml(DefaultBreakdownResultOutputTypes.toSet)(response.toXml)
unmarshalled.get.getClass should equal(response.getClass)
unmarshalled should not be (null)
unmarshalled should equal(Success(response))
}
val queryResult1 = QueryResult(
resultId = 1L,
instanceId = 2342L,
resultType = Some(ResultOutputType.PATIENT_COUNT_XML),
setSize = 123L,
startDate = None,
endDate = None,
description = None,
statusType = QueryResult.StatusType.Finished,
statusMessage = None)
roundTrip(ReadQueryResultResponse(123L, queryResult1))
roundTrip(AggregatedReadQueryResultResponse(123L, Seq(queryResult1)))
roundTrip(DeleteQueryResponse(123L))
roundTrip(ReadInstanceResultsResponse(2342L, queryResult1))
roundTrip(AggregatedReadInstanceResultsResponse(2342L, Seq(queryResult1)))
roundTrip(ReadPreviousQueriesResponse(Seq(QueryMaster("queryMasterId", 12345L, "name", "userId", "groupId", XmlDateHelper.now, Some(false)))))
roundTrip(ReadQueryDefinitionResponse(8457L, "name", "userId", XmlDateHelper.now, "queryDefXml"))
roundTrip(ReadQueryInstancesResponse(12345L, "userId", "groupId", Seq.empty))
roundTrip(RenameQueryResponse(12345L, "name"))
roundTrip(RunQueryResponse(38957L, XmlDateHelper.now, "userId", "groupId", QueryDefinition("foo", Term("bar")), 2342L, queryResult1))
roundTrip(AggregatedRunQueryResponse(38957L, XmlDateHelper.now, "userId", "groupId", QueryDefinition("foo", Term("bar")), 2342L, Seq(queryResult1)))
roundTrip(UnFlagQueryResponse)
roundTrip(FlagQueryResponse)
- roundTrip(ErrorResponse("errorMessage"))
+ roundTrip(ErrorResponse("errorMessage",Some(TestProblem)))
}
@Test
def testToXml {
val response = new FooResponse
response.toXmlString should equal("")
}
@Test
def testToI2b2 {
val expected = XmlUtil.stripWhitespace(
1.1
2.4
SHRINE
1.3-compatible
SHRINE
2011-04-08T16:21:12.251-04:00
DONE
)
val response = new FooResponse
response.toI2b2String should equal(expected.toString())
}
private final class FooResponse extends ShrineResponse {
protected override def i2b2MessageBody =
override def toXml = i2b2MessageBody
}
}
\ No newline at end of file
diff --git a/commons/util/src/main/scala/net/shrine/problem/Problem.scala b/commons/util/src/main/scala/net/shrine/problem/Problem.scala
index f38c4f9ea..244a62751 100644
--- a/commons/util/src/main/scala/net/shrine/problem/Problem.scala
+++ b/commons/util/src/main/scala/net/shrine/problem/Problem.scala
@@ -1,138 +1,150 @@
package net.shrine.problem
import java.net.{InetAddress, ConnectException}
import java.util.Date
import net.shrine.log.Loggable
import net.shrine.serialization.{XmlUnmarshaller, XmlMarshaller}
-import scala.xml.NodeSeq
+import scala.xml.{Node, NodeSeq}
/**
* Describes what information we have about a problem at the site in code where we discover it.
*
* @author david
* @since 8/6/15
*/
trait Problem {
def summary:String
def problemName = getClass.getName
def throwable:Option[Throwable] = None
def stamp:Stamp
def description = s"$summary ${stamp.pretty}"
def throwableDetail = throwable.map(x => x.getStackTrace.mkString(sys.props("line.separator")))
def details:String = s"$description ${throwableDetail.getOrElse("")}"
def toDigest:ProblemDigest = ProblemDigest(problemName,summary,description,details)
}
case class ProblemDigest(codec:String,summary:String,description:String,details:String) extends XmlMarshaller {
- override def toXml: NodeSeq = {
+ override def toXml: Node = {
{codec}
{summary}
{description}
{details}
}
}
object ProblemDigest extends XmlUnmarshaller[ProblemDigest] with Loggable {
def apply(oldMessage:String):ProblemDigest = {
val ex = new IllegalStateException(s"'$oldMessage' detected, not in codec. Please report this problem and stack trace to Shrine dev.")
ex.fillInStackTrace()
warn(ex)
ProblemDigest("ProblemNotInCodec",oldMessage,"","")
}
override def fromXml(xml: NodeSeq): ProblemDigest = {
- def extractText(tagName:String) = (xml \ tagName).text
+ val problemNode = xml \ "problem"
+ require(problemNode.nonEmpty,s"No problem tag in $xml")
+
+ def extractText(tagName:String) = {
+ val t = (problemNode \ tagName).text
+ require(t.nonEmpty)
+ t
+ }
val codec = extractText("codec")
val summary = extractText("summary")
val description = extractText("description")
val details = extractText("details")
ProblemDigest(codec,summary,description,details)
}
}
case class Stamp(host:InetAddress,time:Long,source:ProblemSources.ProblemSource) {
def pretty = s"at ${new Date(time)} on $host ${source.pretty}"
}
object Stamp {
def apply(source:ProblemSources.ProblemSource): Stamp = Stamp(InetAddress.getLocalHost,System.currentTimeMillis(),source)
}
abstract class AbstractProblem(source:ProblemSources.ProblemSource) extends Problem {
val stamp = Stamp(source)
}
trait ProblemHandler {
def handleProblem(problem:Problem)
}
/**
* An example problem handler
*/
object LoggingProblemHandler extends ProblemHandler with Loggable {
override def handleProblem(problem: Problem): Unit = {
problem.throwable.fold(error(problem.toString))(throwable =>
error(problem.toString,throwable)
)
}
}
object ProblemSources{
sealed trait ProblemSource {
//todo name without $
def pretty = getClass.getSimpleName
}
case object Adapter extends ProblemSource
case object Hub extends ProblemSource
case object Qep extends ProblemSource
case object Dsa extends ProblemSource
case object Unknown extends ProblemSource
def problemSources = Set(Adapter,Hub,Qep,Dsa,Unknown)
}
-case class ProblemNotInCodec(summary:String) extends AbstractProblem(ProblemSources.Unknown){
- override val throwable = {
- val x = new IllegalStateException(s"$summary , is not yet in the codec.")
- x.fillInStackTrace()
- Option(x)
- }
+case class ProblemNotInCodec(summary:String,t:Throwable) extends AbstractProblem(ProblemSources.Unknown){
+ override val throwable = Some(t)
override val description = s"${super.description} . This error is not yet in the codec. Please report the stack trace to the Shrine development team at TODO"
}
+object ProblemNotInCodec {
+
+ def apply(summary:String):ProblemNotInCodec = {
+ val x = new IllegalStateException(s"$summary , is not yet in the codec.")
+ x.fillInStackTrace()
+ new ProblemNotInCodec(summary,x)
+ }
+}
+
/**
* For "Failure querying node 'SITE NAME': java.net.ConnectException: Connection refused"
*
* This one is interesting because "Connection refused" is different from "Connection timed out" according to Keith's
* notes, but the only way to pick that up is to pull the text out of that contained exception. However, all four options
* are probably worth checking no matter what the exception's message.
*/
//todo NodeId is in protocol, which will be accessible from the hub code where this class should live
//case class CouldNotConnectToQueryNode(nodeId:NodeId,connectExcepition:ConnectException) extends Problem {
case class CouldNotConnectToNode(nodeName:String,connectException:ConnectException) extends AbstractProblem(ProblemSources.Hub) {
val summary = s"Could not connect to node $nodeName"
override def throwable = Some(connectException)
}
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala
index 82372b68f..02d60db5f 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala
@@ -1,31 +1,41 @@
package net.shrine.aggregation
-import net.shrine.protocol.ShrineResponse
+
import net.shrine.aggregation.BasicAggregator.{Invalid, Error, Valid}
+import net.shrine.problem.{ProblemSources, AbstractProblem}
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.BaseShrineResponse
/**
*
* @author Clint Gilbert
- * @date Sep 16, 2011
+ * @since Sep 16, 2011
*
- * @link http://cbmi.med.harvard.edu
+ * @see http://cbmi.med.harvard.edu
*
* This software is licensed under the LGPL
- * @link http://www.gnu.org/licenses/lgpl.html
+ * @see http://www.gnu.org/licenses/lgpl.html
*
* Extends BasicAggregator to ignore Errors and Invalid responses
*
* Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest)
*/
abstract class IgnoresErrorsAggregator[T <: BaseShrineResponse : Manifest] extends BasicAggregator[T] {
private[aggregation] override def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse = {
//Filter out errors and invalid responses
makeResponseFrom(validResponses)
}
//Default implementation, just returns first valid response, or if there are none, an ErrorResponse
private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]]): BaseShrineResponse = {
- validResponses.map(_.response).toSet.headOption.getOrElse(new ErrorResponse("No valid responses to aggregate"))
+
+
+ validResponses.map(_.response).toSet.headOption.getOrElse{
+ val problem = NoValidResponsesToAggregate()
+ ErrorResponse(problem.summary,Some(problem))
+ }
}
+}
+
+case class NoValidResponsesToAggregate() extends AbstractProblem(ProblemSources.Hub) {
+ override def summary: String = "No valid responses to aggregate"
}
\ No newline at end of file
diff --git a/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/ReadQueryResultAggregatorTest.scala b/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/ReadQueryResultAggregatorTest.scala
index 8af33b5d1..bd0f6d38c 100644
--- a/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/ReadQueryResultAggregatorTest.scala
+++ b/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/ReadQueryResultAggregatorTest.scala
@@ -1,132 +1,132 @@
package net.shrine.aggregation
import scala.concurrent.duration.DurationInt
import org.junit.Test
import net.shrine.util.ShouldMatchersForJUnit
import net.shrine.protocol.AggregatedReadQueryResultResponse
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.NodeId
import net.shrine.protocol.QueryResult
import net.shrine.protocol.ReadQueryResultResponse
import net.shrine.protocol.Result
import net.shrine.protocol.ResultOutputType
import net.shrine.protocol.BaseShrineResponse
/**
* @author clint
* @since Nov 7, 2012
*/
final class ReadQueryResultAggregatorTest extends ShouldMatchersForJUnit {
private val queryId = 12345L
import ResultOutputType._
private def asAggregatedResponse(resp: BaseShrineResponse) = resp.asInstanceOf[AggregatedReadQueryResultResponse]
private val setSize1 = 123L
private val setSize2 = 456L
private val totalSetSize = setSize1 + setSize2
private val queryResult1 = QueryResult(1L, 2L, Some(PATIENT_COUNT_XML), setSize1, None, None, None, QueryResult.StatusType.Finished, None)
private val queryResult2 = QueryResult(1L, 2L, Some(PATIENT_COUNT_XML), setSize2, None, None, None, QueryResult.StatusType.Finished, None)
private val response1 = ReadQueryResultResponse(queryId, queryResult1)
private val response2 = ReadQueryResultResponse(queryId, queryResult2)
import scala.concurrent.duration._
private val result1 = Result(NodeId("X"), 1.second, response1)
private val result2 = Result(NodeId("Y"), 1.second, response2)
private val errors = Seq(ErrorResponse("blarg"), ErrorResponse("glarg"))
@Test
def testAggregate {
val aggregator = new ReadQueryResultAggregator(queryId, true)
val response = asAggregatedResponse(aggregator.aggregate(Seq(result1, result2), Nil))
val Seq(actualQueryResult1, actualQueryResult2, aggregatedQueryResult) = response.results
actualQueryResult1 should equal(queryResult1)
actualQueryResult2 should equal(queryResult2)
val expectedAggregatedResult = queryResult1.withSetSize(totalSetSize).withInstanceId(queryId).withDescription("Aggregated Count")
aggregatedQueryResult should equal(expectedAggregatedResult)
}
@Test
def testAggregateNoAggregatedResult {
val aggregator = new ReadQueryResultAggregator(queryId, false)
val response = asAggregatedResponse(aggregator.aggregate(Seq(result1, result2), Nil))
val Seq(actualQueryResult1, actualQueryResult2) = response.results
actualQueryResult1 should equal(queryResult1)
actualQueryResult2 should equal(queryResult2)
}
@Test
def testAggregateNoResponses {
for (doAggregation <- Seq(true, false)) {
val aggregator = new ReadQueryResultAggregator(queryId, true)
val response = asAggregatedResponse(aggregator.aggregate(Nil, Nil))
response.queryId should equal(queryId)
response.results.isEmpty should be(true)
}
}
@Test
def testAggregateOnlyErrorResponses {
val aggregator = new ReadQueryResultAggregator(queryId, true)
val response = asAggregatedResponse(aggregator.aggregate(Nil, errors))
response.queryId should equal(queryId)
response.results.exists(qr => qr.problemDigest.exists(pd => pd.codec == classOf[ErrorResultProblem].getName)) should be (true)
}
@Test
def testAggregateSomeErrors {
val aggregator = new ReadQueryResultAggregator(queryId, true)
val response = asAggregatedResponse(aggregator.aggregate(Seq(result1, result2), errors))
val Seq(actualQueryResult1, actualQueryResult2, aggregatedQueryResult, actualErrorQueryResults @ _*) = response.results
actualQueryResult1 should equal(queryResult1)
actualQueryResult2 should equal(queryResult2)
val expectedAggregatedResult = queryResult1.withSetSize(totalSetSize).withInstanceId(queryId).withDescription("Aggregated Count")
aggregatedQueryResult should equal(expectedAggregatedResult)
actualErrorQueryResults.exists(qr => qr.problemDigest.exists(pd => pd.codec == classOf[ErrorResultProblem].getName)) should be (true)
}
@Test
def testAggregateSomeDownstreamErrors {
val aggregator = new ReadQueryResultAggregator(queryId, true)
val result3 = Result(NodeId("A"), 1.second, errors.head)
val result4 = Result(NodeId("A"), 1.second, errors.last)
val response = asAggregatedResponse(aggregator.aggregate(Seq(result1, result2, result3, result4), Nil))
val Seq(actualQueryResult1, actualQueryResult2, aggregatedQueryResult, actualErrorQueryResults @ _*) = response.results
actualQueryResult1 should equal(queryResult1)
actualQueryResult2 should equal(queryResult2)
val expectedAggregatedResult = queryResult1.withSetSize(totalSetSize).withInstanceId(queryId).withDescription("Aggregated Count")
aggregatedQueryResult should equal(expectedAggregatedResult)
- actualErrorQueryResults.forall(qr => qr.description == Some("A"))
+ actualErrorQueryResults.forall(qr => qr.description.contains("A")) should be(true)
}
}
\ No newline at end of file
diff --git a/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/RunQueryAggregatorTest.scala b/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/RunQueryAggregatorTest.scala
index 78ad65cc2..93f13f256 100644
--- a/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/RunQueryAggregatorTest.scala
+++ b/hub/broadcaster-aggregator/src/test/scala/net/shrine/aggregation/RunQueryAggregatorTest.scala
@@ -1,154 +1,154 @@
package net.shrine.aggregation
import scala.concurrent.duration.DurationInt
import org.junit.Test
import net.shrine.util.ShouldMatchersForJUnit
import net.shrine.protocol.AggregatedRunQueryResponse
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.I2b2ResultEnvelope
import net.shrine.protocol.NodeId
import net.shrine.protocol.QueryResult
import net.shrine.protocol.Result
import net.shrine.protocol.ResultOutputType
import net.shrine.protocol.ResultOutputType.PATIENTSET
import net.shrine.protocol.ResultOutputType.PATIENT_COUNT_XML
import net.shrine.protocol.RunQueryResponse
import net.shrine.protocol.query.QueryDefinition
import net.shrine.protocol.query.Term
import net.shrine.util.XmlDateHelper
import net.shrine.protocol.DefaultBreakdownResultOutputTypes
/**
*
*
* @author Justin Quan
- * @link http://chip.org
+ * @see http://chip.org
* Date: 8/12/11
*/
final class RunQueryAggregatorTest extends ShouldMatchersForJUnit {
private val queryId = 1234L
private val queryName = "someQueryName"
private val now = XmlDateHelper.now
private val userId = "user"
private val groupId = "group"
private val requestQueryDef = QueryDefinition(queryName, Term("""\\i2b2\i2b2\Demographics\Age\0-9 years old\"""))
private val requestQueryDefString = requestQueryDef.toI2b2String
private val queryInstanceId = 9999L
import scala.concurrent.duration._
@Test
def testAggregate {
val qrCount = new QueryResult(1L, queryInstanceId, PATIENT_COUNT_XML, 10L, now, now, "Desc", QueryResult.StatusType.Finished)
val qrSet = new QueryResult(2L, queryInstanceId, PATIENTSET, 10L, now, now, "Desc", QueryResult.StatusType.Finished)
val rqr1 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qrCount)
val rqr2 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qrSet)
val result2 = Result(NodeId("description1"), 1.second, rqr2)
val result1 = Result(NodeId("description2"), 1.second, rqr1)
val aggregator = new RunQueryAggregator(queryId, userId, groupId, requestQueryDef, true)
val actual = aggregator.aggregate(Vector(result1, result2), Nil).asInstanceOf[AggregatedRunQueryResponse]
actual.queryId should equal(queryId)
actual.queryInstanceId should equal(-1L)
actual.results.size should equal(3)
actual.results.filter(_.resultTypeIs(PATIENT_COUNT_XML)).size should equal(2) //1 for the actual count result, 1 for the aggregated total count
actual.results.filter(_.resultTypeIs(PATIENTSET)).size should equal(1)
actual.results.filter(hasTotalCount).size should equal(1)
actual.results.filter(hasTotalCount).head.setSize should equal(20)
actual.queryName should equal(queryName)
}
@Test
def testAggCount {
val qrSet = new QueryResult(2L, queryInstanceId, PATIENTSET, 10L, now, now, "Desc", QueryResult.StatusType.Finished)
val rqr1 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qrSet)
val rqr2 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qrSet)
val result1 = Result(NodeId("description1"), 1.second, rqr1)
val result2 = Result(NodeId("description2"), 1.second, rqr2)
val aggregator = new RunQueryAggregator(queryId, userId, groupId, requestQueryDef, true)
//TODO: test handling error responses
val actual = aggregator.aggregate(Vector(result1, result2), Nil).asInstanceOf[AggregatedRunQueryResponse]
actual.results.filter(_.description.getOrElse("").equalsIgnoreCase("TOTAL COUNT")).head.setSize should equal(20)
}
@Test
def testHandleErrorResponse {
val qrCount = new QueryResult(1L, queryInstanceId, PATIENT_COUNT_XML, 10L, now, now, "Desc", QueryResult.StatusType.Finished)
val rqr1 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qrCount)
val errorMessage = "error message"
- val errorResponse = new ErrorResponse(errorMessage)
+ val errorResponse = ErrorResponse(errorMessage)
val result1 = Result(NodeId("description1"), 1.second, rqr1)
val result2 = Result(NodeId("description2"), 1.second, errorResponse)
val aggregator = new RunQueryAggregator(queryId, userId, groupId, requestQueryDef, true)
val actual = aggregator.aggregate(Vector(result1, result2), Nil).asInstanceOf[AggregatedRunQueryResponse]
actual.results.size should equal(3)
actual.results.filter(_.resultTypeIs(PATIENT_COUNT_XML)).head.setSize should equal(10)
actual.results.filter(_.statusType == QueryResult.StatusType.Error).head.statusMessage should equal(Some(errorMessage))
actual.results.filter(hasTotalCount).head.setSize should equal(10)
}
@Test
def testAggregateResponsesWithBreakdowns {
def toColumnTuple(i: Int) = ("x" + i, i.toLong)
val breakdowns1 = Map.empty ++ DefaultBreakdownResultOutputTypes.values.map { resultType =>
resultType -> I2b2ResultEnvelope(resultType, (1 to 10).map(toColumnTuple).toMap)
}
val breakdowns2 = Map.empty ++ DefaultBreakdownResultOutputTypes.values.map { resultType =>
resultType -> I2b2ResultEnvelope(resultType, (11 to 20).map(toColumnTuple).toMap)
}
val qr1 = new QueryResult(1L, queryInstanceId, Some(PATIENT_COUNT_XML), 10L, Some(now), Some(now), Some("Desc"), QueryResult.StatusType.Finished, None, breakdowns = breakdowns1)
val qr2 = new QueryResult(2L, queryInstanceId, Some(PATIENT_COUNT_XML), 20L, Some(now), Some(now), Some("Desc"), QueryResult.StatusType.Finished, None, breakdowns = breakdowns2)
val rqr1 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qr1)
val rqr2 = RunQueryResponse(queryId, now, userId, groupId, requestQueryDef, queryInstanceId, qr2)
val result1 = Result(NodeId("description2"), 1.second, rqr1)
val result2 = Result(NodeId("description1"), 1.second, rqr2)
val aggregator = new RunQueryAggregator(queryId, userId, groupId, requestQueryDef, true)
val actual = aggregator.aggregate(Seq(result1, result2), Nil).asInstanceOf[AggregatedRunQueryResponse]
actual.results.size should equal(3)
actual.results.filter(hasTotalCount).size should equal(1)
val Seq(actualQr1, actualQr2, actualQr3) = actual.results.filter(_.resultTypeIs(PATIENT_COUNT_XML))
actualQr1.setSize should equal(10)
actualQr2.setSize should equal(20)
actualQr3.setSize should equal(30)
actualQr1.breakdowns should equal(breakdowns1)
actualQr2.breakdowns should equal(breakdowns2)
actualQr3.breakdowns.isEmpty should be(true)
}
private def hasTotalCount(result: QueryResult) = result.description.getOrElse("").equalsIgnoreCase("TOTAL COUNT")
private def toQueryResultMap(results: QueryResult*) = Map.empty ++ (for {
result <- results
resultType <- result.resultType
} yield (resultType, result))
}
diff --git a/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/HubBroadcastServiceTest.scala b/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/HubBroadcastServiceTest.scala
index 86bb821c2..700cc38e8 100644
--- a/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/HubBroadcastServiceTest.scala
+++ b/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/HubBroadcastServiceTest.scala
@@ -1,98 +1,100 @@
package net.shrine.broadcaster
+import net.shrine.problem.TestProblem
+
import scala.concurrent.Await
import org.junit.Test
import net.shrine.util.ShouldMatchersForJUnit
import net.shrine.aggregation.Aggregator
import net.shrine.crypto.DefaultSignerVerifier
import net.shrine.crypto.TestKeystore
import net.shrine.protocol.AuthenticationInfo
import net.shrine.protocol.BroadcastMessage
import net.shrine.protocol.Credential
import net.shrine.protocol.DeleteQueryRequest
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.Failure
import net.shrine.protocol.NodeId
import net.shrine.protocol.Result
import net.shrine.protocol.ShrineResponse
import net.shrine.protocol.SingleNodeResult
import net.shrine.protocol.Timeout
import net.shrine.broadcaster.dao.MockHubDao
/**
* @author clint
- * @date Nov 19, 2013
+ * @since Nov 19, 2013
*/
final class HubBroadcastAndAggregationServiceTest extends AbstractSquerylHubDaoTest with ShouldMatchersForJUnit {
import scala.concurrent.duration._
import MockBroadcasters._
- private def result(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah"))
+ private def result(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah",Some(TestProblem)))
private val results = "abcde".map(result)
private lazy val nullResultsByOrigin: Map[NodeId, SingleNodeResult] = Map(NodeId("X") -> null, NodeId("Y") -> null)
private lazy val resultsWithNullsByOrigin: Map[NodeId, SingleNodeResult] = {
results.collect { case r @ Result(origin, _, _) => origin -> r }.toMap ++ nullResultsByOrigin
}
private val broadcastMessage = {
val authn = AuthenticationInfo("domain", "username", Credential("asdasd", false))
import scala.concurrent.duration._
BroadcastMessage(authn, DeleteQueryRequest("projectId", 12345.milliseconds, authn, 12345L))
}
@Test
def testAggregateHandlesNullResults {
val mockBroadcaster = MockAdapterClientBroadcaster(resultsWithNullsByOrigin)
val broadcastService = new HubBroadcastAndAggregationService(InJvmBroadcasterClient(mockBroadcaster))
val aggregator: Aggregator = new Aggregator {
override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): ShrineResponse = {
- ErrorResponse(results.size.toString)
+ ErrorResponse(results.size.toString,Some(TestProblem))
}
}
val aggregatedResult = Await.result(broadcastService.sendAndAggregate(broadcastMessage, aggregator, true), 5.minutes)
mockBroadcaster.messageParam.signature.isDefined should be(false)
- aggregatedResult should equal(ErrorResponse(s"${results.size}"))
+ aggregatedResult should equal(ErrorResponse(s"${results.size}",Some(TestProblem)))
}
@Test
def testAggregateHandlesFailures {
def toResult(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah"))
def toFailure(description: Char) = Failure(NodeId(description.toString), new Exception with scala.util.control.NoStackTrace)
val failuresByOrigin: Map[NodeId, SingleNodeResult] = {
"UV".map(toFailure).map { case f @ Failure(origin, _) => origin -> f }.toMap
}
val timeoutsByOrigin: Map[NodeId, SingleNodeResult] = Map(NodeId("Z") -> Timeout(NodeId("Z")))
val resultsWithFailuresByOrigin: Map[NodeId, SingleNodeResult] = resultsWithNullsByOrigin ++ failuresByOrigin ++ timeoutsByOrigin
val mockBroadcaster = MockAdapterClientBroadcaster(resultsWithFailuresByOrigin)
val broadcastService = new HubBroadcastAndAggregationService(InJvmBroadcasterClient(mockBroadcaster))
val aggregator: Aggregator = new Aggregator {
override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): ShrineResponse = {
- ErrorResponse(s"${results.size},${errors.size}")
+ ErrorResponse(s"${results.size},${errors.size}",Some(TestProblem))
}
}
val aggregatedResult = Await.result(broadcastService.sendAndAggregate(broadcastMessage, aggregator, true), 5.minutes)
mockBroadcaster.messageParam.signature.isDefined should be(false)
- aggregatedResult should equal(ErrorResponse(s"${results.size + failuresByOrigin.size + timeoutsByOrigin.size},0"))
+ aggregatedResult should equal(ErrorResponse(s"${results.size + failuresByOrigin.size + timeoutsByOrigin.size},0",Some(TestProblem)))
}
}
diff --git a/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationServiceTest.scala b/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationServiceTest.scala
index 1a02d0445..28f37e2e2 100644
--- a/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationServiceTest.scala
+++ b/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationServiceTest.scala
@@ -1,100 +1,102 @@
package net.shrine.broadcaster
+import net.shrine.problem.TestProblem
+
import scala.concurrent.Await
import org.junit.Test
import net.shrine.util.ShouldMatchersForJUnit
import net.shrine.aggregation.Aggregator
import net.shrine.crypto.DefaultSignerVerifier
import net.shrine.crypto.TestKeystore
import net.shrine.protocol.AuthenticationInfo
import net.shrine.protocol.BroadcastMessage
import net.shrine.protocol.Credential
import net.shrine.protocol.DeleteQueryRequest
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.Failure
import net.shrine.protocol.NodeId
import net.shrine.protocol.Result
import net.shrine.protocol.ShrineResponse
import net.shrine.protocol.SingleNodeResult
import net.shrine.protocol.Timeout
import net.shrine.crypto.SigningCertStrategy
import net.shrine.broadcaster.dao.MockHubDao
/**
* @author clint
- * @date Nov 19, 2013
+ * @since Nov 19, 2013
*/
final class SigningBroadcastAndAggregationServiceTest extends ShouldMatchersForJUnit {
import scala.concurrent.duration._
import MockBroadcasters._
- private def result(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah"))
+ private def result(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah",Some(TestProblem)))
private val results = "abcde".map(result)
private lazy val nullResultsByOrigin: Map[NodeId, SingleNodeResult] = Map(NodeId("X") -> null, NodeId("Y") -> null)
private lazy val resultsWithNullsByOrigin: Map[NodeId, SingleNodeResult] = {
results.collect { case r @ Result(origin, _, _) => origin -> r }.toMap ++ nullResultsByOrigin
}
private lazy val signer = new DefaultSignerVerifier(TestKeystore.certCollection)
private val broadcastMessage = {
val authn = AuthenticationInfo("domain", "username", Credential("asdasd", false))
import scala.concurrent.duration._
BroadcastMessage(authn, DeleteQueryRequest("projectId", 12345.milliseconds, authn, 12345L))
}
@Test
def testAggregateHandlesNullResults {
val mockBroadcaster = MockAdapterClientBroadcaster(resultsWithNullsByOrigin)
val broadcastService = SigningBroadcastAndAggregationService(InJvmBroadcasterClient(mockBroadcaster), signer, SigningCertStrategy.Attach)
val aggregator: Aggregator = new Aggregator {
override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): ShrineResponse = {
- ErrorResponse(results.size.toString)
+ ErrorResponse(results.size.toString,Some(TestProblem))
}
}
val aggregatedResult = Await.result(broadcastService.sendAndAggregate(broadcastMessage, aggregator, true), 5.minutes)
mockBroadcaster.messageParam.signature.isDefined should be(true)
- aggregatedResult should equal(ErrorResponse(s"${results.size}"))
+ aggregatedResult should equal(ErrorResponse(s"${results.size}",Some(TestProblem)))
}
@Test
def testAggregateHandlesFailures {
- def toResult(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah"))
+ def toResult(description: Char) = Result(NodeId(description.toString), 1.second, ErrorResponse("blah blah blah",Some(TestProblem)))
def toFailure(description: Char) = Failure(NodeId(description.toString), new Exception with scala.util.control.NoStackTrace)
val failuresByOrigin: Map[NodeId, SingleNodeResult] = {
"UV".map(toFailure).map { case f @ Failure(origin, _) => origin -> f }.toMap
}
val timeoutsByOrigin: Map[NodeId, SingleNodeResult] = Map(NodeId("Z") -> Timeout(NodeId("Z")))
val resultsWithFailuresByOrigin: Map[NodeId, SingleNodeResult] = resultsWithNullsByOrigin ++ failuresByOrigin ++ timeoutsByOrigin
val mockBroadcaster = MockAdapterClientBroadcaster(resultsWithFailuresByOrigin)
val broadcastService = SigningBroadcastAndAggregationService(InJvmBroadcasterClient(mockBroadcaster), signer, SigningCertStrategy.DontAttach)
val aggregator: Aggregator = new Aggregator {
override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): ShrineResponse = {
- ErrorResponse(s"${results.size},${errors.size}")
+ ErrorResponse(s"${results.size},${errors.size}",Some(TestProblem))
}
}
val aggregatedResult = Await.result(broadcastService.sendAndAggregate(broadcastMessage, aggregator, true), 5.minutes)
mockBroadcaster.messageParam.signature.isDefined should be(true)
- aggregatedResult should equal(ErrorResponse(s"${results.size + failuresByOrigin.size + timeoutsByOrigin.size},0"))
+ aggregatedResult should equal(ErrorResponse(s"${results.size + failuresByOrigin.size + timeoutsByOrigin.size},0",Some(TestProblem)))
}
}