diff --git a/adapter/adapter-service/src/main/resources/reference.conf b/adapter/adapter-service/src/main/resources/reference.conf index 7a449beaa..1df54693e 100644 --- a/adapter/adapter-service/src/main/resources/reference.conf +++ b/adapter/adapter-service/src/main/resources/reference.conf @@ -1,12 +1,19 @@ shrine { adapter { + + obfucscation { + binSize = 5 //Round to the nearest binSize. Use 1 for no effect (to match SHRINE 1.21 and earlier). + sigma = 6.5 //Noise to inject. Use 0 for no effect. (Use 1.33 to match SHRINE 1.21 and earlier). + clamp = 10 //Maximum ammount of noise to inject. (Use 3 to match SHRINE 1.21 and earlier). + } + adapterLockoutAttemptsThreshold = 10 // Number of allowed queries with the same actual result that can exist before a researcher is locked out of the adapter. Set to '0' to never lock out. In 1.23 the default will change to 0. In 1.24 the lockout code and this config value will be removed botDefense { countsAndMilliseconds = [ {count = 10, milliseconds = 60000}, //allow up to 10 queries in one minute {count = 200, milliseconds = 36000000} //allow up to 4 queries in 10 hours ] } } } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala index 6e948f3e2..19e6d678b 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala @@ -1,298 +1,297 @@ package net.shrine.adapter import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import net.shrine.adapter.audit.AdapterAuditDb import scala.Option.option2Iterable import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.Try import scala.xml.NodeSeq -import net.shrine.adapter.Obfuscator.obfuscateResults import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.model.Breakdown import net.shrine.adapter.dao.model.ShrineQueryResult import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BroadcastMessage, ErrorResponse, HasQueryResults, HiveCredentials, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse} import net.shrine.protocol.query.QueryDefinition -import net.shrine.util.StackTrace import net.shrine.util.Tries.sequence import scala.concurrent.duration.Duration import net.shrine.client.Poster import net.shrine.problem.{AbstractProblem, ProblemSources} /** * @author clint * @since Nov 2, 2012 * */ object AbstractReadQueryResultAdapter { private final case class RawResponseAttempts(countResponseAttempt: Try[ReadResultResponse], breakdownResponseAttempts: Seq[Try[ReadResultResponse]]) private final case class SpecificResponseAttempts[R](responseAttempt: Try[R], breakdownResponseAttempts: Seq[Try[ReadResultResponse]]) } //noinspection RedundantBlock abstract class AbstractReadQueryResultAdapter[Req <: BaseShrineRequest, Rsp <: ShrineResponse with HasQueryResults]( poster: Poster, override val hiveCredentials: HiveCredentials, dao: AdapterDao, doObfuscation: Boolean, getQueryId: Req => Long, getProjectId: Req => String, toResponse: (Long, QueryResult) => Rsp, breakdownTypes: Set[ResultOutputType], - collectAdapterAudit:Boolean + collectAdapterAudit:Boolean, + obfuscator:Obfuscator ) extends WithHiveCredentialsAdapter(hiveCredentials) { //TODO: Make this configurable private val numThreads = math.max(5, Runtime.getRuntime.availableProcessors) //TODO: Use scala.concurrent.ExecutionContext.Implicits.global instead? private lazy val executorService = Executors.newFixedThreadPool(numThreads) private lazy val executionContext = ExecutionContext.fromExecutorService(executorService) override def shutdown() { try { executorService.shutdown() executorService.awaitTermination(5, TimeUnit.SECONDS) } finally { executorService.shutdownNow() super.shutdown() } } import AbstractReadQueryResultAdapter._ override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = { val req = message.request.asInstanceOf[Req] val queryId = getQueryId(req) def findShrineQueryRow = dao.findQueryByNetworkId(queryId) def findShrineQueryResults = dao.findResultsFor(queryId) findShrineQueryRow match { case None => { debug(s"Query $queryId not found in the Shrine DB") ErrorResponse(QueryNotFound(queryId)) } case Some(shrineQueryRow) => { findShrineQueryResults match { case None => { debug(s"Query $queryId found but its results are not available") //TODO: When precisely can this happen? Should we go back to the CRC here? ErrorResponse(QueryResultNotAvailable(queryId)) } case Some(shrineQueryResult) => { if (shrineQueryResult.isDone) { debug(s"Query $queryId is done and already stored, returning stored results") makeResponseFrom(queryId, shrineQueryResult) } else { debug(s"Query $queryId is incomplete, asking CRC for results") val result: ShrineResponse = retrieveQueryResults(queryId, req, shrineQueryResult, message) if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(queryId,result) result } } } } } } private def makeResponseFrom(queryId: Long, shrineQueryResult: ShrineQueryResult): ShrineResponse = { shrineQueryResult.toQueryResults(doObfuscation).map(toResponse(queryId, _)).getOrElse(ErrorResponse(QueryNotFound(queryId))) } private def retrieveQueryResults(queryId: Long, req: Req, shrineQueryResult: ShrineQueryResult, message: BroadcastMessage): ShrineResponse = { //NB: If the requested query was not finished executing on the i2b2 side when Shrine recorded it, attempt to //retrieve it and all its sub-components (breakdown results, if any) in parallel. Asking for the results in //parallel is quite possibly too clever, but may be faster than asking for them serially. //TODO: Review this. //Make requests for results in parallel val futureResponses = scatter(message.networkAuthn, req, shrineQueryResult) //Gather all the results (block until they're all returned) val SpecificResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = gather(queryId, futureResponses, req.waitTime) countResponseAttempt match { //If we successfully received the parent response (the one with query type PATIENT_COUNT_XML), re-store it along //with any retrieved breakdowns before returning it. case Success(countResponse) => { //NB: Only store the result if needed, that is, if all results are done //TODO: REVIEW THIS storeResultIfNecessary(shrineQueryResult, countResponse, req.authn, queryId, getFailedBreakdownTypes(breakdownResponseAttempts)) countResponse } case Failure(e) => ErrorResponse(CouldNotRetrieveQueryFromCrc(queryId,e)) } } private def scatter(authn: AuthenticationInfo, req: Req, shrineQueryResult: ShrineQueryResult): Future[RawResponseAttempts] = { def makeRequest(localResultId: Long) = ReadResultRequest(hiveCredentials.projectId, req.waitTime, hiveCredentials.toAuthenticationInfo, localResultId.toString) def process(localResultId: Long): ShrineResponse = { delegateResultRetrievingAdapter.process(authn, makeRequest(localResultId)) } implicit val executionContext = this.executionContext import scala.concurrent.blocking def futureBlockingAttempt[T](f: => T): Future[Try[T]] = Future(blocking(Try(f))) val futureCountAttempt: Future[Try[ShrineResponse]] = futureBlockingAttempt { process(shrineQueryResult.count.localId) } val futureBreakdownAttempts = Future.sequence(for { Breakdown(_, localResultId, resultType, data) <- shrineQueryResult.breakdowns } yield futureBlockingAttempt { process(localResultId) }) //Log errors retrieving count futureCountAttempt.collect { case Success(e: ErrorResponse) => error(s"Error requesting count result from the CRC: '$e'") case Failure(e) => error(s"Error requesting count result from the CRC: ", e) } //Log errors retrieving breakdown for { breakdownResponseAttempts <- futureBreakdownAttempts } { breakdownResponseAttempts.collect { case Success(e: ErrorResponse) => error(s"Error requesting breakdown result from the CRC: '$e'") case Failure(e) => error(s"Error requesting breakdown result from the CRC: ", e) } } //"Filter" for non-ErrorResponses val futureNonErrorCountAttempt: Future[Try[ReadResultResponse]] = futureCountAttempt.collect { case Success(resp: ReadResultResponse) => Success(resp) //NB: Need to repackage response here to avoid ugly, obscure, superfluous cast case unexpected => Failure(new Exception(s"Getting count result failed. Response is: '$unexpected'")) } //"Filter" for non-ErrorResponses val futureNonErrorBreakdownResponseAttempts: Future[Seq[Try[ReadResultResponse]]] = for { breakdownResponseAttempts <- futureBreakdownAttempts } yield { breakdownResponseAttempts.collect { case Success(resp: ReadResultResponse) => Try(resp) } } for { countResponseAttempt <- futureNonErrorCountAttempt breakdownResponseAttempts <- futureNonErrorBreakdownResponseAttempts } yield { RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) } } private def gather(queryId: Long, futureResponses: Future[RawResponseAttempts], waitTime: Duration): SpecificResponseAttempts[Rsp] = { val RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = Await.result(futureResponses, waitTime) //Log any failures (countResponseAttempt +: breakdownResponseAttempts).collect { case Failure(e) => e }.foreach(error("Error retrieving result from the CRC: ", _)) //NB: Count response and ALL breakdown responses must be available (not Failures) or else a Failure will be returned val responseAttempt = for { countResponse: ReadResultResponse <- countResponseAttempt countQueryResult = countResponse.metadata breakdownResponses: Seq[ReadResultResponse] <- sequence(breakdownResponseAttempts) } yield { val breakdownsByType = (for { breakdownResponse <- breakdownResponses resultType <- breakdownResponse.metadata.resultType } yield resultType -> breakdownResponse.data).toMap val queryResultWithBreakdowns = countQueryResult.withBreakdowns(breakdownsByType) - val queryResultToReturn = if(doObfuscation) Obfuscator.obfuscate(queryResultWithBreakdowns) else queryResultWithBreakdowns + val queryResultToReturn = if(doObfuscation) obfuscator.obfuscate(queryResultWithBreakdowns) else queryResultWithBreakdowns toResponse(queryId, queryResultToReturn) } SpecificResponseAttempts(responseAttempt, breakdownResponseAttempts) } private def getFailedBreakdownTypes(attempts: Seq[Try[ReadResultResponse]]): Set[ResultOutputType] = { val successfulBreakdownTypes = attempts.collect { case Success(ReadResultResponse(_, metadata, _)) => metadata.resultType }.flatten breakdownTypes -- successfulBreakdownTypes } private def storeResultIfNecessary(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) { val responseIsDone = response.results.forall(_.statusType.isDone) if (responseIsDone) { storeResult(shrineQueryResult, response, authn, queryId, failedBreakdownTypes) } } private def storeResult(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) { val rawResults = response.results - val obfuscatedResults = obfuscateResults(doObfuscation)(response.results) + val obfuscatedResults = obfuscator.obfuscateResults(doObfuscation)(response.results) for { shrineQuery <- dao.findQueryByNetworkId(queryId) queryResult <- rawResults.headOption obfuscatedQueryResult <- obfuscatedResults.headOption } { val queryDefinition = QueryDefinition(shrineQuery.name, shrineQuery.queryDefinition.expr) dao.inTransaction { dao.deleteQuery(queryId) dao.storeResults(authn, shrineQueryResult.localId, queryId, queryDefinition, rawResults, obfuscatedResults, failedBreakdownTypes.toSeq, queryResult.breakdowns, obfuscatedQueryResult.breakdowns) } } } private type Unmarshaller[R] = Set[ResultOutputType] => NodeSeq => Try[R] private final class DelegateAdapter[Rqst <: ShrineRequest, Rspns <: ShrineResponse](unmarshaller: Unmarshaller[Rspns]) extends CrcAdapter[Rqst, Rspns](poster, hiveCredentials) { def process(authn: AuthenticationInfo, req: Rqst): Rspns = processRequest(BroadcastMessage(authn, req)).asInstanceOf[Rspns] override protected def parseShrineResponse(xml: NodeSeq): ShrineResponse = unmarshaller(breakdownTypes)(xml).get //TODO: Avoid .get call } - private lazy val delegateResultRetrievingAdapter = new DelegateAdapter[ReadResultRequest, ReadResultResponse](ReadResultResponse.fromI2b2 _) + private lazy val delegateResultRetrievingAdapter = new DelegateAdapter[ReadResultRequest, ReadResultResponse](ReadResultResponse.fromI2b2) } case class QueryNotFound(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) { override def summary: String = s"Query not found" override def description:String = s"No query with id $queryId found on ${stamp.host.getHostName}" } case class QueryResultNotAvailable(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) { override def summary: String = s"Query $queryId found but its results are not available yet" override def description:String = s"Query $queryId found but its results are not available yet on ${stamp.host.getHostName}" } case class CouldNotRetrieveQueryFromCrc(queryId:Long,x: Throwable) extends AbstractProblem(ProblemSources.Adapter) { override def summary: String = s"Could not retrieve query $queryId from the CRC" override def description:String = s"Unhandled exception while retrieving query $queryId while retrieving it from the CRC on ${stamp.host.getHostName}" override def throwable = Some(x) } diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterComponents.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterComponents.scala index 8b9345ab1..c3a03d4a8 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterComponents.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterComponents.scala @@ -1,146 +1,151 @@ package net.shrine.adapter import scala.collection.JavaConverters._ import com.typesafe.config.Config import net.shrine.adapter.dao.{AdapterDao, I2b2AdminDao} import net.shrine.adapter.dao.squeryl.{SquerylAdapterDao, SquerylI2b2AdminDao} import net.shrine.adapter.dao.squeryl.tables.Tables import net.shrine.adapter.service.{AdapterService, I2b2AdminService} import net.shrine.adapter.translators.{ExpressionTranslator, QueryDefinitionTranslator} import net.shrine.client.{EndpointConfig, Poster} import net.shrine.config.mappings.{AdapterMappings, AdapterMappingsSource, ClasspathFormatDetectingAdapterMappingsSource} import net.shrine.crypto.{DefaultSignerVerifier, KeyStoreCertCollection} import net.shrine.dao.squeryl.SquerylInitializer import net.shrine.protocol.{HiveCredentials, NodeId, RequestType, ResultOutputType} import net.shrine.config.{ConfigExtensions, DurationConfigParser} import scala.concurrent.duration.Duration /** * All the parts required for an adapter. * * @author david * @since 1.22 */ case class AdapterComponents( adapterService: AdapterService, i2b2AdminService: I2b2AdminService, adapterDao: AdapterDao, adapterMappings: AdapterMappings) object AdapterComponents { //todo try and trim this argument list back def apply( adapterConfig:Config, //config is "shrine.adapter" certCollection: KeyStoreCertCollection, squerylInitializer: SquerylInitializer, breakdownTypes: Set[ResultOutputType], crcHiveCredentials: HiveCredentials, signerVerifier: DefaultSignerVerifier, pmPoster: Poster, nodeId: NodeId ):AdapterComponents = { val crcEndpoint: EndpointConfig = adapterConfig.getConfigured("crcEndpoint",EndpointConfig(_)) val crcPoster: Poster = Poster(certCollection,crcEndpoint) val squerylAdapterTables: Tables = new Tables val adapterDao: AdapterDao = new SquerylAdapterDao(squerylInitializer, squerylAdapterTables)(breakdownTypes) //NB: Is i2b2HiveCredentials.projectId the right project id to use? val i2b2AdminDao: I2b2AdminDao = new SquerylI2b2AdminDao(crcHiveCredentials.projectId, squerylInitializer, squerylAdapterTables) val adapterMappingsFile = adapterConfig.getString("adapterMappingsFileName") val adapterMappingsSource: AdapterMappingsSource = ClasspathFormatDetectingAdapterMappingsSource(adapterMappingsFile) //NB: Fail fast val adapterMappings: AdapterMappings = adapterMappingsSource.load(adapterMappingsFile).get val expressionTranslator: ExpressionTranslator = ExpressionTranslator(adapterMappings) val queryDefinitionTranslator: QueryDefinitionTranslator = new QueryDefinitionTranslator(expressionTranslator) val doObfuscation = adapterConfig.getBoolean("setSizeObfuscation") val collectAdapterAudit = adapterConfig.getBoolean("audit.collectAdapterAudit") val botCountTimeThresholds: Seq[(Long, Duration)] = { import scala.concurrent.duration._ val countsAndMilliseconds: Seq[Config] = adapterConfig.getConfig("botDefense").getConfigList("countsAndMilliseconds").asScala countsAndMilliseconds.map(pairConfig => (pairConfig.getLong("count"),pairConfig.getLong("milliseconds").milliseconds)) } + val obfuscator:Obfuscator = adapterConfig.getConfigured("obfucscation",Obfuscator(_)) + val runQueryAdapter = RunQueryAdapter( poster = crcPoster, dao = adapterDao, hiveCredentials = crcHiveCredentials, conceptTranslator = queryDefinitionTranslator, adapterLockoutAttemptsThreshold = adapterConfig.getInt("adapterLockoutAttemptsThreshold"), doObfuscation = doObfuscation, runQueriesImmediately = adapterConfig.getOption("immediatelyRunIncomingQueries", _.getBoolean).getOrElse(true), //todo use reference.conf breakdownTypes = breakdownTypes, collectAdapterAudit = collectAdapterAudit, - botCountTimeThresholds = botCountTimeThresholds + botCountTimeThresholds = botCountTimeThresholds, + obfuscator = obfuscator ) val readInstanceResultsAdapter: Adapter = new ReadInstanceResultsAdapter( - crcPoster, - crcHiveCredentials, - adapterDao, - doObfuscation, - breakdownTypes, - collectAdapterAudit + poster = crcPoster, + hiveCredentials = crcHiveCredentials, + dao = adapterDao, + doObfuscation = doObfuscation, + breakdownTypes = breakdownTypes, + collectAdapterAudit = collectAdapterAudit, + obfuscator = obfuscator ) val readQueryResultAdapter: Adapter = new ReadQueryResultAdapter( crcPoster, crcHiveCredentials, adapterDao, doObfuscation, breakdownTypes, - collectAdapterAudit + collectAdapterAudit, + obfuscator = obfuscator ) val readPreviousQueriesAdapter: Adapter = new ReadPreviousQueriesAdapter(adapterDao) val deleteQueryAdapter: Adapter = new DeleteQueryAdapter(adapterDao) val renameQueryAdapter: Adapter = new RenameQueryAdapter(adapterDao) val readQueryDefinitionAdapter: Adapter = new ReadQueryDefinitionAdapter(adapterDao) val readTranslatedQueryDefinitionAdapter: Adapter = new ReadTranslatedQueryDefinitionAdapter(nodeId, queryDefinitionTranslator) val flagQueryAdapter: Adapter = new FlagQueryAdapter(adapterDao) val unFlagQueryAdapter: Adapter = new UnFlagQueryAdapter(adapterDao) val adapterMap = AdapterMap(Map( RequestType.QueryDefinitionRequest -> runQueryAdapter, RequestType.GetRequestXml -> readQueryDefinitionAdapter, RequestType.UserRequest -> readPreviousQueriesAdapter, RequestType.InstanceRequest -> readInstanceResultsAdapter, RequestType.MasterDeleteRequest -> deleteQueryAdapter, RequestType.MasterRenameRequest -> renameQueryAdapter, RequestType.GetQueryResult -> readQueryResultAdapter, RequestType.ReadTranslatedQueryDefinitionRequest -> readTranslatedQueryDefinitionAdapter, RequestType.FlagQueryRequest -> flagQueryAdapter, RequestType.UnFlagQueryRequest -> unFlagQueryAdapter)) AdapterComponents( adapterService = new AdapterService( nodeId = nodeId, signatureVerifier = signerVerifier, maxSignatureAge = adapterConfig.getConfigured("maxSignatureAge", DurationConfigParser(_)), adapterMap = adapterMap ), i2b2AdminService = new I2b2AdminService( dao = adapterDao, i2b2AdminDao = i2b2AdminDao, pmPoster = pmPoster, runQueryAdapter = runQueryAdapter ), adapterDao = adapterDao, adapterMappings = adapterMappings) } } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Obfuscator.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Obfuscator.scala index 4176c1928..519cfc53a 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Obfuscator.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Obfuscator.scala @@ -1,79 +1,66 @@ package net.shrine.adapter +import com.typesafe.config.Config import net.shrine.protocol.QueryResult -import java.util.Random + +import scala.util.Random /** - * @author Bill Simons - * @author clint - * @date 4/21/11 - * @link http://cbmi.med.harvard.edu + * @author Ricardo De Lima + * @author Bill Simons + * @author clint + * @author dwalend + * @since August 18, 2009 + * @see http://cbmi.med.harvard.edu */ -object Obfuscator { +case class Obfuscator(binSize:Int,stdDev:Double,noiseClamp:Int) { + val random = new Random + + def obfuscateResults(doObfuscation: Boolean)(results: Seq[QueryResult]): Seq[QueryResult] = { + if (doObfuscation) results.map(obfuscate) else results + } + def obfuscate(result: QueryResult): QueryResult = { val withObfscSetSize = result.modifySetSize(obfuscate) - - if(withObfscSetSize.breakdowns.isEmpty) { withObfscSetSize } + + if (withObfscSetSize.breakdowns.isEmpty) { + withObfscSetSize + } else { val obfuscatedBreakdowns = result.breakdowns.mapValues(_.mapValues(obfuscate)) - + withObfscSetSize.withBreakdowns(obfuscatedBreakdowns) } } def obfuscate(l: Long): Long = { - import GaussianObfuscator._ - - val obfuscationAmount = determineObfuscationAmount(l) - - determineObfuscatedSetSize(l, obfuscationAmount) - } - - def obfuscateResults(doObfuscation: Boolean)(results: Seq[QueryResult]): Seq[QueryResult] = { - if(doObfuscation) results.map(obfuscate) else results - } - - /** - * @author clint - * @author Ricardo De Lima - * @date August 18, 2009 - * - * Harvard Medical School Center for BioMedical Informatics - * - * @link http://cbmi.med.harvard.edu - */ - object GaussianObfuscator { - private val stdDev = 1.33 - - private val mean = 0 - - private val rand = new Random - - val range = 3 - private val lower = (-range).toDouble - - private val upper = range.toDouble - - def determineObfuscationAmount(x: Long): Int = scala.math.round(gaussian(mean, stdDev)).toInt + def roundToNearest(i: Double, n: Double): Long = { + Math.round( + if ((i % n) >= n / 2) i + n - (i % n) //round up + else i - i % n //round down + ) + } - def determineObfuscatedSetSize(setSize: Long, obfuscationAmount: Int): Long = { - if (setSize <= 10) { -1L } - else { setSize + obfuscationAmount } + def clampedGaussian(i: Long, clamp: Long): Double = { + val noise = random.nextGaussian() * stdDev + //clamp it + if (noise > clamp) clamp + else if (noise < -clamp) -clamp + else noise } - /** - * Return a real number from a gaussian distribution with given mean and - * stddev - */ - def gaussian(mean: Double, stddev: Double): Double = { - def limitRange(v: Double): Double = { - val partiallyClamped = if (v < lower) lower else v + //bin + val binned = roundToNearest(l, binSize) - if (partiallyClamped > upper) upper else partiallyClamped - } + //add noise + val noised = binned + clampedGaussian(binned, noiseClamp) - limitRange(mean + (stddev * rand.nextGaussian)) - } + //bin again + roundToNearest(noised, binSize) } +} + +object Obfuscator { + def apply(config:Config): Obfuscator = Obfuscator(config.getInt("binSize"),config.getDouble("sigma"),config.getInt("clamp")) } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadInstanceResultsAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadInstanceResultsAdapter.scala index 56f5b443f..8b5c9ca30 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadInstanceResultsAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadInstanceResultsAdapter.scala @@ -1,38 +1,37 @@ package net.shrine.adapter -import xml.NodeSeq import net.shrine.adapter.dao.AdapterDao -import net.shrine.protocol.{HiveCredentials, ReadInstanceResultsResponse, ReadInstanceResultsRequest, BroadcastMessage, ShrineResponse, ErrorResponse, ResultOutputType} -import net.shrine.serialization.XmlMarshaller -import net.shrine.client.HttpClient import net.shrine.client.Poster +import net.shrine.protocol.{HiveCredentials, ReadInstanceResultsRequest, ReadInstanceResultsResponse, ResultOutputType} /** * @author Bill Simons * @since 4/14/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ final class ReadInstanceResultsAdapter( poster: Poster, override val hiveCredentials: HiveCredentials, dao: AdapterDao, doObfuscation: Boolean, breakdownTypes: Set[ResultOutputType], - collectAdapterAudit:Boolean + collectAdapterAudit:Boolean, + obfuscator: Obfuscator ) extends AbstractReadQueryResultAdapter[ReadInstanceResultsRequest, ReadInstanceResultsResponse]( - poster, - hiveCredentials, - dao, - doObfuscation, - _.shrineNetworkQueryId, - _.projectId, - ReadInstanceResultsResponse(_, _), - breakdownTypes, - collectAdapterAudit + poster = poster, + hiveCredentials = hiveCredentials, + dao = dao, + doObfuscation = doObfuscation, + getQueryId = _.shrineNetworkQueryId, + getProjectId = _.projectId, + toResponse = ReadInstanceResultsResponse(_, _), + breakdownTypes = breakdownTypes, + collectAdapterAudit = collectAdapterAudit, + obfuscator = obfuscator ) diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadQueryResultAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadQueryResultAdapter.scala index 275971731..533523239 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadQueryResultAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadQueryResultAdapter.scala @@ -1,33 +1,33 @@ package net.shrine.adapter import net.shrine.adapter.dao.AdapterDao -import net.shrine.protocol.{HiveCredentials, BroadcastMessage, ErrorResponse, ReadQueryResultRequest, ReadQueryResultResponse, ResultOutputType} -import net.shrine.serialization.XmlMarshaller -import net.shrine.client.HttpClient import net.shrine.client.Poster +import net.shrine.protocol.{HiveCredentials, ReadQueryResultRequest, ReadQueryResultResponse, ResultOutputType} /** * @author clint * @since Nov 2, 2012 * */ final class ReadQueryResultAdapter( poster: Poster, override val hiveCredentials: HiveCredentials, dao: AdapterDao, doObfuscation: Boolean, breakdownTypes: Set[ResultOutputType], - collectAdapterAudit:Boolean + collectAdapterAudit:Boolean, + obfuscator: Obfuscator ) extends AbstractReadQueryResultAdapter[ReadQueryResultRequest, ReadQueryResultResponse]( - poster, - hiveCredentials, - dao, - doObfuscation, - _.queryId, - _.projectId, - ReadQueryResultResponse(_, _), - breakdownTypes, - collectAdapterAudit + poster = poster, + hiveCredentials = hiveCredentials, + dao = dao, + doObfuscation = doObfuscation, + getQueryId = _.queryId, + getProjectId = _.projectId, + toResponse = ReadQueryResultResponse(_, _), + breakdownTypes = breakdownTypes, + collectAdapterAudit = collectAdapterAudit, + obfuscator = obfuscator ) diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala index cc4c0561b..c3cbcea48 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala @@ -1,289 +1,284 @@ package net.shrine.adapter import net.shrine.adapter.audit.AdapterAuditDb import scala.util.Failure import scala.util.Success import scala.util.Try import scala.xml.NodeSeq import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.protocol.{AuthenticationInfo, BroadcastMessage, Credential, ErrorFromCrcException, ErrorResponse, HiveCredentials, I2b2ResultEnvelope, MissingCrCXmlResultException, QueryResult, RawCrcRunQueryResponse, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse, ShrineResponse} import net.shrine.client.Poster import net.shrine.problem.{AbstractProblem, LoggingProblemHandler, Problem, ProblemNotYetEncoded, ProblemSources} import scala.util.control.NonFatal import net.shrine.util.XmlDateHelper import scala.concurrent.duration.Duration import scala.xml.XML /** * @author Bill Simons * @author clint * @since 4/15/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ final case class RunQueryAdapter( poster: Poster, dao: AdapterDao, override val hiveCredentials: HiveCredentials, conceptTranslator: QueryDefinitionTranslator, adapterLockoutAttemptsThreshold: Int, //Set to 0 to disable lockout. todo remove in SHRINE 1.24 doObfuscation: Boolean, runQueriesImmediately: Boolean, breakdownTypes: Set[ResultOutputType], collectAdapterAudit:Boolean, - botCountTimeThresholds:Seq[(Long,Duration)] + botCountTimeThresholds:Seq[(Long,Duration)], + obfuscator: Obfuscator ) extends CrcAdapter[RunQueryRequest, RunQueryResponse](poster, hiveCredentials) { logStartup() import RunQueryAdapter._ override protected[adapter] def parseShrineResponse(xml: NodeSeq) = RawCrcRunQueryResponse.fromI2b2(breakdownTypes)(xml).get //TODO: Avoid .get call override protected[adapter] def translateNetworkToLocal(request: RunQueryRequest): RunQueryRequest = { try { request.mapQueryDefinition(conceptTranslator.translate) } catch { case NonFatal(e) => throw new AdapterMappingException(request,s"Error mapping query terms from network to local forms.", e) } } override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = { if (collectAdapterAudit) AdapterAuditDb.db.insertQueryReceived(message) if (isLockedOut(message.networkAuthn)) { throw new AdapterLockoutException(message.networkAuthn,poster.url) } dao.checkIfBot(message.networkAuthn,botCountTimeThresholds) val runQueryReq = message.request.asInstanceOf[RunQueryRequest] //We need to use the network identity from the BroadcastMessage, since that will have the network username //(ie, ecommons) of the querying user. Using the AuthenticationInfo from the incoming request breaks the fetching //of previous queries on deployed systems where the credentials in the identity param to this method and the authn //field of the incoming request are different, like the HMS Shrine deployment. //NB: Credential field is wiped out to preserve old behavior -Clint 14 Nov, 2013 val authnToUse = message.networkAuthn.copy(credential = Credential("", isToken = false)) if (!runQueriesImmediately) { debug(s"Queueing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}") storeQuery(authnToUse, message, runQueryReq) } else { debug(s"Performing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}") val result: ShrineResponse = runQuery(authnToUse, message.copy(request = runQueryReq.withAuthn(authnToUse)), runQueryReq.withAuthn(authnToUse)) if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(runQueryReq.networkQueryId,result) result } } private def storeQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): RunQueryResponse = { //Use dummy ids for what we would have received from the CRC val masterId: Long = -1L val queryInstanceId: Long = -1L val resultId: Long = -1L //TODO: is this right?? Or maybe it's project id? val groupId = authnToUse.domain val invalidSetSize = -1L val now = XmlDateHelper.now val queryResult = QueryResult(resultId, queryInstanceId, Some(ResultOutputType.PATIENT_COUNT_XML), invalidSetSize, Some(now), Some(now), Some("Query enqueued for later processing"), QueryResult.StatusType.Held, Some("Query enqueued for later processing")) dao.inTransaction { val insertedQueryId = dao.insertQuery(masterId.toString, request.networkQueryId, authnToUse, request.queryDefinition, isFlagged = false, hasBeenRun = false, flagMessage = None) val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(queryResult)) //NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in //AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete //queries, will work. val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head dao.insertCountResult(countQueryResultId, -1L, -1L) } RunQueryResponse(masterId, XmlDateHelper.now, authnToUse.username, groupId, request.queryDefinition, queryInstanceId, queryResult) } private def runQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): ShrineResponse = { if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionStarted(request) //NB: Pass through ErrorResponses received from the CRC. //See: https://open.med.harvard.edu/jira/browse/SHRINE-794 val result = super.processRequest(message) match { case e: ErrorResponse => e case rawRunQueryResponse: RawCrcRunQueryResponse => processRawCrcRunQueryResponse(authnToUse, request, rawRunQueryResponse) } if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionCompletedShrineResponse(request,result) result } private[adapter] def processRawCrcRunQueryResponse(authnToUse: AuthenticationInfo, request: RunQueryRequest, rawRunQueryResponse: RawCrcRunQueryResponse): RunQueryResponse = { def isBreakdown(result: QueryResult) = result.resultType.exists(_.isBreakdown) val originalResults: Seq[QueryResult] = rawRunQueryResponse.results val (originalBreakdownResults, originalNonBreakDownResults): (Seq[QueryResult],Seq[QueryResult]) = originalResults.partition(isBreakdown) val originalBreakdownCountAttempts: Seq[(QueryResult, Try[QueryResult])] = attemptToRetrieveBreakdowns(request, originalBreakdownResults) val (successfulBreakdownCountAttempts, failedBreakdownCountAttempts) = originalBreakdownCountAttempts.partition { case (_, t) => t.isSuccess } val failedBreakdownCountAttemptsWithProblems = failedBreakdownCountAttempts.map { attempt => val originalResult: QueryResult = attempt._1 val queryResult:QueryResult = if (originalResult.problemDigest.isDefined) originalResult else { attempt._2 match { case Success(_) => originalResult case Failure(x) => //noinspection RedundantBlock { val problem:Problem = x match { case e: ErrorFromCrcException => ErrorFromCrcBreakdown(e) case e: MissingCrCXmlResultException => CannotInterpretCrcBreakdownXml(e) case NonFatal(e) => { val summary = s"Unexpected exception while interpreting breakdown response" ProblemNotYetEncoded(summary, e) } } LoggingProblemHandler.handleProblem(problem) originalResult.copy(problemDigest = Some(problem.toDigest)) } } } (queryResult,attempt._2) } logBreakdownFailures(rawRunQueryResponse, failedBreakdownCountAttemptsWithProblems) val originalMergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope] = { val withBreakdownCounts = successfulBreakdownCountAttempts.collect { case (_, Success(queryResultWithBreakdowns)) => queryResultWithBreakdowns } withBreakdownCounts.map(_.breakdowns).fold(Map.empty)(_ ++ _) } - val obfuscatedQueryResults = originalResults.map(Obfuscator.obfuscate) + val obfuscatedQueryResults = originalResults.map(obfuscator.obfuscate) val obfuscatedNonBreakdownQueryResults = obfuscatedQueryResults.filterNot(isBreakdown) - val obfuscatedMergedBreakdowns = obfuscateBreakdowns(originalMergedBreakdowns) + val obfuscatedMergedBreakdowns = originalMergedBreakdowns.mapValues(_.mapValues(obfuscator.obfuscate)) val failedBreakdownTypes = failedBreakdownCountAttemptsWithProblems.flatMap { case (qr, _) => qr.resultType } dao.storeResults( authn = authnToUse, masterId = rawRunQueryResponse.queryId.toString, networkQueryId = request.networkQueryId, queryDefinition = request.queryDefinition, rawQueryResults = originalResults, obfuscatedQueryResults = obfuscatedQueryResults, failedBreakdownTypes = failedBreakdownTypes, mergedBreakdowns = originalMergedBreakdowns, obfuscatedBreakdowns = obfuscatedMergedBreakdowns) // at this point the queryResult could be a mix of successes and failures. // SHRINE reports only the successes. See SHRINE-1567 for details val queryResults: Seq[QueryResult] = if (doObfuscation) obfuscatedNonBreakdownQueryResults else originalNonBreakDownResults val breakdownsToReturn: Map[ResultOutputType, I2b2ResultEnvelope] = if (doObfuscation) obfuscatedMergedBreakdowns else originalMergedBreakdowns //TODO: Will fail in the case of NO non-breakdown QueryResults. Can this ever happen, and is it worth protecting against here? //can failedBreakdownCountAttempts be mixed back in here? val resultWithBreakdowns: QueryResult = queryResults.head.withBreakdowns(breakdownsToReturn) if(debugEnabled) { def justBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = breakdowns.mapValues(_.data) val obfuscationMessage = s"obfuscation is ${if(doObfuscation) "ON" else "OFF"}" debug(s"Returning QueryResult with count ${resultWithBreakdowns.setSize} (original count: ${originalNonBreakDownResults.headOption.map(_.setSize)} ; $obfuscationMessage)") debug(s"Returning QueryResult with breakdowns ${justBreakdowns(resultWithBreakdowns.breakdowns)} (original breakdowns: ${justBreakdowns(originalMergedBreakdowns)} ; $obfuscationMessage)") debug(s"Full QueryResult: $resultWithBreakdowns") } //if any results had problems, this commented out code can turn it into an error QueryResult //See SHRINE-1619 //val problem: Option[ProblemDigest] = failedBreakdownCountAttemptsWithProblems.headOption.flatMap(x => x._1.problemDigest) //val queryResult = problem.fold(resultWithBreakdowns)(pd => QueryResult.errorResult(Some(pd.description),"Error with CRC",pd)) rawRunQueryResponse.toRunQueryResponse.withResult(resultWithBreakdowns) } private def getResultFromCrc(parentRequest: RunQueryRequest, networkResultId: Long): Try[ReadResultResponse] = { def readResultRequest(runQueryReq: RunQueryRequest, networkResultId: Long) = ReadResultRequest(hiveCredentials.projectId, runQueryReq.waitTime, hiveCredentials.toAuthenticationInfo, networkResultId.toString) Try(XML.loadString(callCrc(readResultRequest(parentRequest, networkResultId)))).flatMap(ReadResultResponse.fromI2b2(breakdownTypes)) } private[adapter] def attemptToRetrieveCount(runQueryReq: RunQueryRequest, originalCountQueryResult: QueryResult): (QueryResult, Try[QueryResult]) = { originalCountQueryResult -> (for { countData <- getResultFromCrc(runQueryReq, originalCountQueryResult.resultId) } yield originalCountQueryResult.withSetSize(countData.metadata.setSize)) } private[adapter] def attemptToRetrieveBreakdowns(runQueryReq: RunQueryRequest, breakdownResults: Seq[QueryResult]): Seq[(QueryResult, Try[QueryResult])] = { breakdownResults.map { origBreakdownResult => origBreakdownResult -> (for { breakdownData <- getResultFromCrc(runQueryReq, origBreakdownResult.resultId).map(_.data) } yield origBreakdownResult.withBreakdown(breakdownData)) } } private[adapter] def logBreakdownFailures(response: RawCrcRunQueryResponse, failures: Seq[(QueryResult, Try[QueryResult])]) { for { (origQueryResult, Failure(e)) <- failures } { error(s"Couldn't load breakdown for QueryResult with masterId: ${response.queryId}, instanceId: ${origQueryResult.instanceId}, resultId: ${origQueryResult.resultId}. Asked for result type: ${origQueryResult.resultType}", e) } } private def isLockedOut(authn: AuthenticationInfo): Boolean = { adapterLockoutAttemptsThreshold match { case 0 => false case _ => dao.isUserLockedOut(authn, adapterLockoutAttemptsThreshold) } } private def logStartup(): Unit = { val message = { if (runQueriesImmediately) { s"${getClass.getSimpleName} will run queries immediately" } else { s"${getClass.getSimpleName} will queue queries for later execution" } } info(message) } } -object RunQueryAdapter { - private[adapter] def obfuscateBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Map[ResultOutputType, I2b2ResultEnvelope] = { - breakdowns.mapValues(_.mapValues(Obfuscator.obfuscate)) - } -} - case class ErrorFromCrcBreakdown(x:ErrorFromCrcException) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) override val summary: String = "The CRC reported an error." override val description = "The CRC reported an internal error." } case class CannotInterpretCrcBreakdownXml(x:MissingCrCXmlResultException) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) override val summary: String = "SHRINE cannot interpret the CRC response." override val description = "The CRC responded, but SHRINE could not interpret that response." } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala index 09fdfc874..4573e63c0 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala @@ -1,379 +1,380 @@ package net.shrine.adapter import scala.xml.NodeSeq import net.shrine.util.ShouldMatchersForJUnit import ObfuscatorTest.within3 import javax.xml.datatype.XMLGregorianCalendar import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest import net.shrine.client.HttpClient import net.shrine.client.HttpResponse import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, BroadcastMessage, CrcRequest, Credential, DefaultBreakdownResultOutputTypes, ErrorResponse, HiveCredentials, I2b2ResultEnvelope, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse, ShrineRequest, ShrineResponse} import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_AGE_COUNT_XML import net.shrine.protocol.ResultOutputType.PATIENT_COUNT_XML import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML import net.shrine.protocol.query.{QueryDefinition, Term} import net.shrine.util.XmlDateHelper import net.shrine.util.XmlDateHelper.now import net.shrine.util.XmlGcEnrichments import net.shrine.client.Poster import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.adapter.translators.ExpressionTranslator import net.shrine.problem.TestProblem import scala.util.Success /** * @author clint * @since Nov 8, 2012 */ //noinspection UnitMethodIsParameterless abstract class AbstractQueryRetrievalTestCase[R <: BaseShrineResponse]( makeAdapter: (AdapterDao, HttpClient) => WithHiveCredentialsAdapter, makeRequest: (Long, AuthenticationInfo) => BaseShrineRequest, extractor: R => Option[(Long, QueryResult)]) extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit { private val authn = AuthenticationInfo("some-domain", "some-user", Credential("alskdjlkasd", false)) def doTestProcessRequestMissingQuery { val adapter = makeAdapter(dao, MockHttpClient) val response = adapter.processRequest(BroadcastMessage(0L, authn, makeRequest(-1L, authn))) response.isInstanceOf[ErrorResponse] should be(true) } def doTestProcessInvalidRequest { val adapter = makeAdapter(dao, MockHttpClient) intercept[ClassCastException] { //request must be a type of request we can handle adapter.processRequest(BroadcastMessage(0L, authn, new AbstractQueryRetrievalTestCase.BogusRequest)) } } private val localMasterId = "alksjdkalsdjlasdjlkjsad" private val shrineNetworkQueryId = 123L private def doGetResults(adapter: Adapter) = adapter.processRequest(BroadcastMessage(shrineNetworkQueryId, authn, makeRequest(shrineNetworkQueryId, authn))) private def toMillis(xmlGc: XMLGregorianCalendar): Long = xmlGc.toGregorianCalendar.getTimeInMillis private val instanceId = 999L private val setSize = 12345L private val obfSetSize = setSize + 1 private val queryExpr = Term("foo") private val topicId = "laskdjlkasd" private val fooQuery = QueryDefinition("some-query",queryExpr) def doTestProcessRequestIncompleteQuery(countQueryShouldWork: Boolean = true): Unit = afterCreatingTables { val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None) import ResultOutputType._ import XmlDateHelper.now val breakdowns = Map(PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L))) val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1)) val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResultId = 456L val breakdownResultId = 98237943265436L val incompleteCountResult = QueryResult( resultId = countResultId, instanceId = instanceId, resultType = Some(PATIENT_COUNT_XML), setSize = setSize, startDate = Option(startDate), endDate = Option(endDate), description = Some("results from node X"), statusType = QueryResult.StatusType.Processing, statusMessage = None, breakdowns = breakdowns) val breakdownResult = breakdowns.head match { case (resultType, data) => incompleteCountResult.withId(breakdownResultId).withBreakdowns(Map(resultType -> data)).withResultType(resultType) } val queryStartDate = now val idsByResultType = dao.insertQueryResults(dbQueryId, incompleteCountResult :: breakdownResult :: Nil) final class MightWorkMockHttpClient(expectedHiveCredentials: HiveCredentials) extends HttpClient { override def post(input: String, url: String): HttpResponse = { def makeFinished(queryResult: QueryResult) = queryResult.copy(statusType = QueryResult.StatusType.Finished) def validateAuthnAndProjectId(req: ShrineRequest) { req.authn should equal(expectedHiveCredentials.toAuthenticationInfo) req.projectId should equal(expectedHiveCredentials.projectId) } val response = CrcRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input) match { case Success(req: ReadResultRequest) if req.localResultId == countResultId.toString => { validateAuthnAndProjectId(req) if (countQueryShouldWork) { ReadResultResponse(123L, makeFinished(incompleteCountResult), I2b2ResultEnvelope(PATIENT_COUNT_XML, Map(PATIENT_COUNT_XML.name -> incompleteCountResult.setSize))) } else { ErrorResponse(TestProblem(summary = "Retrieving count result failed")) } } case Success(req: ReadResultRequest) if req.localResultId == breakdownResultId.toString => { validateAuthnAndProjectId(req) ReadResultResponse(123L, makeFinished(breakdownResult), breakdowns.head._2) } case _ => fail(s"Unknown input: $input") } HttpResponse.ok(response.toI2b2String) } } val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, new MightWorkMockHttpClient(AbstractQueryRetrievalTestCase.hiveCredentials)) def getResults = doGetResults(adapter) getResults.isInstanceOf[ErrorResponse] should be(true) dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize) dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns) //The query shouldn't be 'done', since its status is PROCESSING dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Processing) //Now, calling processRequest (via getResults) should cause the query to be re-retrieved from the CRC val result = getResults.asInstanceOf[R] //Which should cause the query to be re-stored with a 'done' status (since that's what our mock CRC returns) val expectedStatusType = if (countQueryShouldWork) QueryResult.StatusType.Finished else QueryResult.StatusType.Processing dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(expectedStatusType) if (!countQueryShouldWork) { result.isInstanceOf[ErrorResponse] should be(true) } else { val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result) actualNetworkQueryId should equal(shrineNetworkQueryId) import ObfuscatorTest.within3 actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML)) within3(setSize, actualQueryResult.setSize) should be(true) actualQueryResult.description should be(Some("results from node X")) actualQueryResult.statusType should equal(QueryResult.StatusType.Finished) actualQueryResult.statusMessage should be(Some(QueryResult.StatusType.Finished.name)) actualQueryResult.breakdowns.foreach { case (rt, I2b2ResultEnvelope(_, data)) => { data.forall { case (key, value) => within3(value, breakdowns.get(rt).get.data.get(key).get) } } } for { startDate <- actualQueryResult.startDate endDate <- actualQueryResult.endDate } { val actualElapsed = toMillis(endDate) - toMillis(startDate) actualElapsed should equal(elapsed) } } } def doTestProcessRequestQueuedQuery: Unit = afterCreatingTables { import ResultOutputType._ import XmlDateHelper.now val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResultId = 456L val incompleteCountResult = QueryResult(-1L, -1L, Some(PATIENT_COUNT_XML), -1L, Option(startDate), Option(endDate), Some("results from node X"), QueryResult.StatusType.Queued, None) dao.inTransaction { val insertedQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None) //NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in //AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete //queries, will work. val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(incompleteCountResult)) val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head dao.insertCountResult(countQueryResultId, -1L, -1L) } val queryStartDate = now object MockHttpClient extends HttpClient { override def post(input: String, url: String): HttpResponse = ??? } val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, MockHttpClient) def getResults = doGetResults(adapter) getResults.isInstanceOf[ErrorResponse] should be(true) //The query shouldn't be 'done', since its status is QUEUED dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued) //Now, calling processRequest (via getResults) should NOT cause the query to be re-retrieved from the CRC, because the query was previously queued val result = getResults result.isInstanceOf[ErrorResponse] should be(true) dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued) } def doTestProcessRequest = afterCreatingTables { val adapter = makeAdapter(dao, MockHttpClient) def getResults = doGetResults(adapter) getResults match { case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryNotFound].getName) case x => fail(s"Got $x, not an ErrorResponse") } val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None) getResults match { case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryResultNotAvailable].getName) case x => fail(s"Got $x, not an ErrorResponse") } import ResultOutputType._ import XmlDateHelper.now val breakdowns = Map( PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L)), PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("x" -> 3L, "y" -> 4L))) val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1)) val startDate = now val elapsed = 100L val endDate = { import XmlGcEnrichments._ import scala.concurrent.duration._ startDate + elapsed.milliseconds } val countResult = QueryResult( resultId = 456L, instanceId = instanceId, resultType = Some(PATIENT_COUNT_XML), setSize = setSize, startDate = Option(startDate), endDate = Option(endDate), description = Some("results from node X"), statusType = QueryResult.StatusType.Finished, statusMessage = None, breakdowns = breakdowns ) val breakdownResults = breakdowns.map { case (resultType, data) => countResult.withBreakdowns(Map(resultType -> data)).withResultType(resultType) }.toSeq val queryStartDate = now val idsByResultType = dao.insertQueryResults(dbQueryId, countResult +: breakdownResults) getResults.isInstanceOf[ErrorResponse] should be(true) dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize) dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns) val result = getResults.asInstanceOf[R] val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result) actualNetworkQueryId should equal(shrineNetworkQueryId) actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML)) actualQueryResult.setSize should equal(obfSetSize) actualQueryResult.description should be(None) //TODO: This is probably wrong actualQueryResult.statusType should equal(QueryResult.StatusType.Finished) actualQueryResult.statusMessage should be(None) actualQueryResult.breakdowns should equal(obfscBreakdowns) for { startDate <- actualQueryResult.startDate endDate <- actualQueryResult.endDate } { val actualElapsed = toMillis(endDate) - toMillis(startDate) actualElapsed should equal(elapsed) } } } object AbstractQueryRetrievalTestCase { val hiveCredentials = HiveCredentials("some-hive-domain", "hive-username", "hive-password", "hive-project") val doObfuscation = true def runQueryAdapter(dao: AdapterDao, poster: Poster): RunQueryAdapter = { val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("foo" -> Set("bar")))) new RunQueryAdapter( poster = poster, dao = dao, hiveCredentials = AbstractQueryRetrievalTestCase.hiveCredentials, conceptTranslator = translator, adapterLockoutAttemptsThreshold = 10000, doObfuscation = doObfuscation, runQueriesImmediately = true, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, - botCountTimeThresholds = Seq.empty + botCountTimeThresholds = Seq.empty, + obfuscator = Obfuscator(5,6.5,10) ) } import scala.concurrent.duration._ final class BogusRequest extends ShrineRequest("fooProject", 1.second, null) { override val requestType = null protected override def i2b2MessageBody: NodeSeq = override def toXml = } } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ObfuscatorTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ObfuscatorTest.scala index ad8417d94..5bada7bfb 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ObfuscatorTest.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ObfuscatorTest.scala @@ -1,95 +1,97 @@ package net.shrine.adapter import org.junit.Test import net.shrine.protocol.QueryResult import net.shrine.protocol.ResultOutputType import net.shrine.protocol.I2b2ResultEnvelope import net.shrine.util.ShouldMatchersForJUnit import net.shrine.protocol.DefaultBreakdownResultOutputTypes /** * @author clint * @since Oct 22, 2012 */ object ObfuscatorTest { private def within(range: Long)(a: Long, b: Long) = scala.math.abs(b - a) <= range def within3 = within(3) _ def within1 = within(1) _ } final class ObfuscatorTest extends ShouldMatchersForJUnit { import ObfuscatorTest._ - + + val obfuscator = Obfuscator(1,1.3,3) + @Test def testObfuscateLong() { val l = 12345L - val obfuscated = Obfuscator.obfuscate(l) + val obfuscated = obfuscator.obfuscate(l) within3(l, obfuscated) should be(right = true) } @Test def testObfuscateQueryResult() { import DefaultBreakdownResultOutputTypes._ import ResultOutputType._ val breakdowns = Map( PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("x" -> 1, "y" -> 42)), PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("a" -> 123, "b" -> 456))) def queryResult(resultId: Long, setSize: Long) = QueryResult( resultId = resultId, instanceId = 123L, resultType = Some(PATIENT_COUNT_XML), setSize = setSize, startDate = None, endDate = None, description = None, statusType = QueryResult.StatusType.Finished, statusMessage = None, breakdowns = breakdowns ) val resultId1 = 12345L val setSize1 = 123L //No breakdowns { val noBreakdowns = queryResult(resultId1, setSize1).copy(breakdowns = Map.empty) noBreakdowns.setSize should equal(setSize1) noBreakdowns.breakdowns should equal(Map.empty) - val obfuscated = Obfuscator.obfuscate(noBreakdowns) + val obfuscated = obfuscator.obfuscate(noBreakdowns) within3(noBreakdowns.setSize, obfuscated.setSize) obfuscated.breakdowns should equal(Map.empty) } //breakdowns { - val QueryResult(_, _, _, obfscSetSize1, _, _, _, _, _, _, obfscBreakdowns) = Obfuscator.obfuscate(queryResult(resultId1, setSize1)) + val QueryResult(_, _, _, obfscSetSize1, _, _, _, _, _, _, obfscBreakdowns) = obfuscator.obfuscate(queryResult(resultId1, setSize1)) within3(setSize1, obfscSetSize1) should be(right = true) breakdowns.keySet should equal(obfscBreakdowns.keySet) for { (resultType, obfscEnv) <- obfscBreakdowns env <- breakdowns.get(resultType) } { env.data.keySet should equal(obfscEnv.data.keySet) for { (key, value) <- env.data obfscValue <- obfscEnv.data.get(key) } { within3(value, obfscValue) should be(right = true) } } } } } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala index 1d5a2864b..d192e2342 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala @@ -1,44 +1,45 @@ package net.shrine.adapter import scala.concurrent.duration.DurationInt import org.junit.Test import net.shrine.client.Poster import net.shrine.protocol.ReadInstanceResultsRequest import net.shrine.protocol.ReadInstanceResultsResponse import net.shrine.adapter.dao.AdapterDao import net.shrine.protocol.DefaultBreakdownResultOutputTypes /** * @author clint * @since Nov 7, 2012 */ final class ReadInstanceResultsAdapterTest extends AbstractQueryRetrievalTestCase( (dao, httpClient) => new ReadInstanceResultsAdapter( Poster("", httpClient), AbstractQueryRetrievalTestCase.hiveCredentials, dao, true, DefaultBreakdownResultOutputTypes.toSet, - collectAdapterAudit = false + collectAdapterAudit = false, + obfuscator = Obfuscator(1,1.3,3) ), (queryId, authn) => ReadInstanceResultsRequest("some-project-id", 10.seconds, authn, queryId), ReadInstanceResultsResponse.unapply) { @Test def testProcessInvalidRequest = doTestProcessInvalidRequest @Test def testProcessRequest = doTestProcessRequest @Test def testProcessRequestMissingQuery = doTestProcessRequestMissingQuery @Test def testProcessRequestIncompleteQuery = doTestProcessRequestIncompleteQuery(true) @Test def testProcessRequestIncompleteQueryCountResultRetrievalFails = doTestProcessRequestIncompleteQuery(false) @Test def testProcessRequestQueuedQuery = doTestProcessRequestQueuedQuery } diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryResultAdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryResultAdapterTest.scala index d9542ae44..cfbafd58c 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryResultAdapterTest.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryResultAdapterTest.scala @@ -1,45 +1,46 @@ package net.shrine.adapter import junit.framework.TestCase import net.shrine.util.ShouldMatchersForJUnit import net.shrine.protocol.ReadQueryResultRequest import net.shrine.protocol.ReadQueryResultResponse import org.junit.Test import scala.concurrent.duration._ import net.shrine.client.Poster import net.shrine.protocol.DefaultBreakdownResultOutputTypes /** * @author clint * @date Nov 7, 2012 */ final class ReadQueryResultAdapterTest extends AbstractQueryRetrievalTestCase( (dao, httpClient) => new ReadQueryResultAdapter( Poster("", httpClient), AbstractQueryRetrievalTestCase.hiveCredentials, dao, true, DefaultBreakdownResultOutputTypes.toSet, - collectAdapterAudit = false + collectAdapterAudit = false, + obfuscator = Obfuscator(1,1.3,3) ), (queryId, authn) => ReadQueryResultRequest("some-project-id", 10.seconds, authn, queryId), ReadQueryResultResponse.unapply) { @Test def testProcessInvalidRequest = doTestProcessInvalidRequest @Test def testProcessRequest = doTestProcessRequest @Test def testProcessRequestMissingQuery = doTestProcessRequestMissingQuery @Test def testProcessRequestIncompleteQuery = doTestProcessRequestIncompleteQuery(true) @Test def testProcessRequestIncompleteQueryCountResultRetrievalFails = doTestProcessRequestIncompleteQuery(false) @Test def testProcessRequestQueuedQuery = doTestProcessRequestQueuedQuery } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/RunQueryAdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/RunQueryAdapterTest.scala index 9d574b9cf..07338becd 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/RunQueryAdapterTest.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/RunQueryAdapterTest.scala @@ -1,974 +1,951 @@ package net.shrine.adapter import scala.concurrent.duration.DurationInt import org.junit.Test import net.shrine.util.ShouldMatchersForJUnit import ObfuscatorTest.within3 import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest import net.shrine.adapter.translators.ExpressionTranslator import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.client.HttpClient import net.shrine.client.HttpResponse import net.shrine.client.Poster import net.shrine.protocol.{AuthenticationInfo, BaseShrineResponse, BroadcastMessage, CrcRequest, Credential, DefaultBreakdownResultOutputTypes, ErrorResponse, HiveCredentials, I2b2ResultEnvelope, QueryResult, RawCrcRunQueryResponse, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse} import net.shrine.protocol.RawCrcRunQueryResponse.toQueryResultMap import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_AGE_COUNT_XML import net.shrine.protocol.ResultOutputType.PATIENT_COUNT_XML import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_RACE_COUNT_XML import net.shrine.protocol.query.OccuranceLimited import net.shrine.protocol.query.Or import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.query.Term import net.shrine.util.XmlDateHelper import net.shrine.util.XmlUtil import scala.util.Success import net.shrine.dao.squeryl.SquerylEntryPoint import scala.concurrent.duration.Duration import net.shrine.adapter.dao.model.ShrineError import net.shrine.adapter.dao.model.QueryResultRow import net.shrine.problem.TestProblem /** * @author Bill Simons * @author Clint Gilbert * @since 4/19/11 * @see http://cbmi.med.harvard.edu */ final class RunQueryAdapterTest extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit { private val queryDef = QueryDefinition("foo", Term("foo")) private val broadcastMessageId = 1234563789L private val queryId = 123L private val expectedNetworkQueryId = 999L private val expectedLocalMasterId = queryId.toString private val masterId = 99L private val instanceId = 456L private val resultId = 42L private val projectId = "projectId" private val setSize = 17L private val xmlResultId = 98765L private val userId = "userId" private val groupId = "groupId" private val topicId = "some-topic-id-123-foo" private val topicName = "Topic Name" private val justCounts = Set(PATIENT_COUNT_XML) private val now = XmlDateHelper.now private val countQueryResult = QueryResult(resultId, instanceId, Some(PATIENT_COUNT_XML), setSize, Some(now), Some(now), None, QueryResult.StatusType.Finished, None) private val dummyBreakdownData = Map("x" -> 99L, "y" -> 42L, "z" -> 3000L) private val hiveCredentials = HiveCredentials("some-hive-domain", "hive-username", "hive-password", "hive-project") private val authn = AuthenticationInfo("some-domain", "username", Credential("jksafhkjaf", false)) private val adapterLockoutThreshold = 99 private val altI2b2ErrorXml = XmlUtil.stripWhitespace { 1.1 2.4 edu.harvard.i2b2.crc 1.5 i2b2 Hive i2b2_QueryTool 0.2 i2b2 Hive 1 i2b2 Log information DONE Query result instance id 3126 not found }.toString private val otherNetworkId: Long = 12345L @Test def testProcessRawCrcRunQueryResponseCountQueryOnly: Unit = afterCreatingTables{ val outputTypes = Set(PATIENT_COUNT_XML) val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("network" -> Set("local1a", "local1b")))) val adapter = new RunQueryAdapter( Poster("crc-url", null), dao, hiveCredentials, translator, adapterLockoutThreshold, doObfuscation = false, runQueriesImmediately = true, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, - botCountTimeThresholds = Seq.empty + botCountTimeThresholds = Seq.empty, + obfuscator = Obfuscator(1,1.3,3) ) val request = RunQueryRequest(projectId, 1.second, authn, expectedNetworkQueryId, Option(topicId), Option(topicName), outputTypes, queryDef) val networkAuthn = AuthenticationInfo("some-domain", "username", Credential("sadasdasdasd", false)) val broadcastMessage = BroadcastMessage(queryId, networkAuthn, request) val rawRunQueryResponse = RawCrcRunQueryResponse( queryId = queryId, createDate = XmlDateHelper.now, userId = request.authn.username, groupId = request.authn.domain, requestXml = request.queryDefinition, queryInstanceId = otherNetworkId, singleNodeResults = toQueryResultMap(Seq(countQueryResult))) val resp = adapter.processRawCrcRunQueryResponse(networkAuthn, request, rawRunQueryResponse).asInstanceOf[RunQueryResponse] resp should not be (null) //Validate the response resp.createDate should not be(null) resp.groupId should be(request.authn.domain) resp.userId should be(request.authn.username) resp.queryId should be(queryId) resp.queryInstanceId should be(otherNetworkId) resp.requestXml should equal(request.queryDefinition) (countQueryResult eq resp.singleNodeResult) should be(false) within3(resp.singleNodeResult.setSize, countQueryResult.setSize) should be(true) resp.singleNodeResult.resultType.get should equal(PATIENT_COUNT_XML) resp.singleNodeResult.breakdowns should equal(Map.empty) //validate the DB val expectedNetworkTerm = queryDef.expr.get.asInstanceOf[Term] //We should have one row in the shrine_query table, for the query just performed val Seq(queryRow) = list(queryRows) { queryRow.dateCreated should not be (null) queryRow.domain should equal(request.authn.domain) queryRow.name should equal(queryDef.name) queryRow.localId should equal(expectedLocalMasterId) queryRow.networkId should equal(expectedNetworkQueryId) queryRow.username should equal(authn.username) queryRow.queryDefinition.expr.get should equal(expectedNetworkTerm) } //We should have one row in the count_result table, with the right obfuscated value, which is within the expected amount from the original count val Seq(countRow) = list(countResultRows) { countRow.creationDate should not be (null) countRow.originalValue should equal(countQueryResult.setSize) within3(countRow.obfuscatedValue, countRow.originalValue) should be(true) } } @Test def testProcessRawCrcRunQueryResponseCountAndBreakdownQuery: Unit = afterCreatingTables { val allBreakdownTypes = DefaultBreakdownResultOutputTypes.toSet val breakdownTypes = Seq(PATIENT_GENDER_COUNT_XML) val outputTypes = Set(PATIENT_COUNT_XML) ++ breakdownTypes val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("network" -> Set("local1a", "local1b")))) val request = RunQueryRequest(projectId, 1.second, authn, expectedNetworkQueryId, Option(topicId), Option(topicName), outputTypes, queryDef) val networkAuthn = AuthenticationInfo("some-domain", "username", Credential("sadasdasdasd", false)) val broadcastMessage = BroadcastMessage(queryId, networkAuthn, request) val breakdownQueryResults = breakdownTypes.zipWithIndex.map { case (rt, i) => countQueryResult.withId(resultId + i + 1).withResultType(rt) } val singleNodeResults = toQueryResultMap(countQueryResult +: breakdownQueryResults) val rawRunQueryResponse = RawCrcRunQueryResponse( queryId = queryId, createDate = XmlDateHelper.now, userId = request.authn.username, groupId = request.authn.domain, requestXml = request.queryDefinition, queryInstanceId = otherNetworkId, singleNodeResults = singleNodeResults) //Set up our mock CRC val poster = Poster("crc-url", new HttpClient { def post(input: String, url: String): HttpResponse = HttpResponse.ok { (RunQueryRequest.fromI2b2String(allBreakdownTypes)(input) orElse ReadResultRequest.fromI2b2String(allBreakdownTypes)(input)).get match { case runQueryReq: RunQueryRequest => rawRunQueryResponse.toI2b2String case readResultReq: ReadResultRequest => ReadResultResponse(xmlResultId = 42L, metadata = breakdownQueryResults.head, data = I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, dummyBreakdownData)).toI2b2String case _ => sys.error(s"Unknown request: '$input'") //Fail loudly } } }) val adapter = RunQueryAdapter( poster = poster, dao = dao, hiveCredentials = hiveCredentials, conceptTranslator = translator, adapterLockoutAttemptsThreshold = adapterLockoutThreshold, doObfuscation = false, runQueriesImmediately = true, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, - botCountTimeThresholds = Seq.empty + botCountTimeThresholds = Seq.empty, + obfuscator = Obfuscator(1,1.3,3) ) val resp = adapter.processRawCrcRunQueryResponse(networkAuthn, request, rawRunQueryResponse).asInstanceOf[RunQueryResponse] resp should not be (null) //Validate the response resp.createDate should not be(null) resp.groupId should be(request.authn.domain) resp.userId should be(request.authn.username) resp.queryId should be(queryId) resp.queryInstanceId should be(otherNetworkId) resp.requestXml should equal(request.queryDefinition) (countQueryResult eq resp.singleNodeResult) should be(false) within3(resp.singleNodeResult.setSize, countQueryResult.setSize) should be(true) resp.singleNodeResult.resultType.get should equal(PATIENT_COUNT_XML) resp.singleNodeResult.breakdowns.keySet should equal(Set(PATIENT_GENDER_COUNT_XML)) val breakdownEnvelope = resp.singleNodeResult.breakdowns.values.head breakdownEnvelope.resultType should equal(PATIENT_GENDER_COUNT_XML) breakdownEnvelope.data.keySet should equal(dummyBreakdownData.keySet) //All breakdowns are obfuscated for { (key, value) <- breakdownEnvelope.data } { within3(value, dummyBreakdownData(key)) should be(true) } //validate the DB val expectedNetworkTerm = queryDef.expr.get.asInstanceOf[Term] //We should have one row in the shrine_query table, for the query just performed val Seq(queryRow) = list(queryRows) { queryRow.dateCreated should not be (null) queryRow.domain should equal(request.authn.domain) queryRow.name should equal(queryDef.name) queryRow.localId should equal(expectedLocalMasterId) queryRow.networkId should equal(expectedNetworkQueryId) queryRow.username should equal(authn.username) queryRow.queryDefinition.expr.get should equal(expectedNetworkTerm) } //We should have one row in the count_result table, with the right obfuscated value, which is within the expected amount from the original count val Seq(countRow) = list(countResultRows) { countRow.creationDate should not be (null) countRow.originalValue should equal(countQueryResult.setSize) within3(countRow.obfuscatedValue, countRow.originalValue) should be(true) } val breakdownRows @ Seq(xRow, yRow, zRow) = list(breakdownResultRows) breakdownRows.map(_.dataKey).toSet should equal(dummyBreakdownData.keySet) within3(xRow.obfuscatedValue, xRow.originalValue) should be(true) xRow.originalValue should be(dummyBreakdownData(xRow.dataKey)) within3(yRow.obfuscatedValue, yRow.originalValue) should be(true) yRow.originalValue should be(dummyBreakdownData(yRow.dataKey)) within3(zRow.obfuscatedValue, zRow.originalValue) should be(true) zRow.originalValue should be(dummyBreakdownData(zRow.dataKey)) } //NB: See https://open.med.harvard.edu/jira/browse/SHRINE-745 @Test def testParseAltErrorXml { val adapter = RunQueryAdapter( poster = Poster("crc-url", null), dao = null, hiveCredentials = hiveCredentials, conceptTranslator = null, adapterLockoutAttemptsThreshold = adapterLockoutThreshold, doObfuscation = false, runQueriesImmediately = false, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, - botCountTimeThresholds = Seq.empty + botCountTimeThresholds = Seq.empty, + obfuscator = Obfuscator(5,6.5,10) ) val resp: ErrorResponse = adapter.parseShrineErrorResponseWithFallback(altI2b2ErrorXml).asInstanceOf[ErrorResponse] resp should not be (null) resp.errorMessage should be("Query result instance id 3126 not found") } @Test def testParseErrorXml { val xml = { 1.1 2.4 edu.harvard.i2b2.crc 1.4 i2b2 Hive i2b2web 1.4 i2b2 Hive 1 Demo Log information Message error connecting Project Management cell admin 0 0 CRC_QRY_runQueryInstance_fromQueryDefinition Age 0 1 0 0 1 2 Age \\i2b2\i2b2\Demographics\Age\ concept_dimension concept_path \i2b2\Demographics\Age\ T concept_cd false }.toString val adapter = RunQueryAdapter( poster = Poster("crc-url", null), dao = null, hiveCredentials = hiveCredentials, conceptTranslator = null, adapterLockoutAttemptsThreshold = adapterLockoutThreshold, doObfuscation = false, runQueriesImmediately = true, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, - botCountTimeThresholds = Seq.empty + botCountTimeThresholds = Seq.empty, + obfuscator = Obfuscator(5,6.5,10) ) val resp = adapter.parseShrineErrorResponseWithFallback(xml).asInstanceOf[ErrorResponse] resp should not be (null) resp.errorMessage should not be ("") } - @Test - def testObfuscateBreakdowns { - val breakdown1 = I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map.empty) - val breakdown2 = I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("foo" -> 123, "bar" -> 345)) - val breakdown3 = I2b2ResultEnvelope(PATIENT_RACE_COUNT_XML, Map("x" -> 999, "y" -> 888)) - - val original = Map.empty ++ Seq(breakdown1, breakdown2, breakdown3).map(env => (env.resultType, env)) - - val obfuscated = RunQueryAdapter.obfuscateBreakdowns(original) - - original.keySet should equal(obfuscated.keySet) - - original.keySet.forall(resultType => original(resultType).data.keySet == obfuscated(resultType).data.keySet) should be(true) - - val localTerms = Set("local1a", "local1b") - - for { - (resultType, origBreakdown) <- original - - mappings = Map("network" -> localTerms) - - translator = new QueryDefinitionTranslator(new ExpressionTranslator(mappings)) - obfscBreakdown <- obfuscated.get(resultType) - key <- origBreakdown.data.keySet - } { - (origBreakdown eq obfscBreakdown) should be(false) - - ObfuscatorTest.within3(origBreakdown.data(key), obfscBreakdown.data(key)) should be(true) - } - } - @Test def testTranslateNetworkToLocalDoesntLeakCredentialsViaException: Unit = { val mappings = Map.empty[String, Set[String]] val translator = new QueryDefinitionTranslator(new ExpressionTranslator(mappings)) val adapter = RunQueryAdapter( poster = Poster("crc-url", MockHttpClient), dao = null, hiveCredentials = null, conceptTranslator = translator, adapterLockoutAttemptsThreshold = adapterLockoutThreshold, doObfuscation = false, runQueriesImmediately = true, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, - botCountTimeThresholds = Seq.empty + botCountTimeThresholds = Seq.empty, + obfuscator = Obfuscator(5,6.5,10) ) val queryDefinition = QueryDefinition("foo", Term("blah")) val authn = AuthenticationInfo("d", "u", Credential("p", false)) val req = RunQueryRequest("projectId", Duration.Inf, authn, otherNetworkId, None, None, Set.empty, queryDef) try { adapter.translateNetworkToLocal(req) fail("Expected an AdapterMappingException") } catch { case e: AdapterMappingException => { e.getMessage.contains(authn.rawToString) should be(false) e.getMessage.contains(AuthenticationInfo.elided.toString) should be(true) } } } @Test def testTranslateQueryDefinitionXml { val localTerms = Set("local1a", "local1b") val mappings = Map("network" -> localTerms) val translator = new QueryDefinitionTranslator(new ExpressionTranslator(mappings)) val adapter = RunQueryAdapter( poster = Poster("crc-url", MockHttpClient), dao = null, hiveCredentials = null, conceptTranslator = translator, adapterLockoutAttemptsThreshold = adapterLockoutThreshold, doObfuscation = false, runQueriesImmediately = true, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, - botCountTimeThresholds = Seq.empty + botCountTimeThresholds = Seq.empty, + obfuscator = Obfuscator(5,6.5,10) ) val queryDefinition = QueryDefinition("10-17 years old@14:39:20", OccuranceLimited(1, Term("network"))) val newDef = adapter.conceptTranslator.translate(queryDefinition) val expected = QueryDefinition("10-17 years old@14:39:20", Or(Term("local1a"), Term("local1b"))) newDef should equal(expected) } @Test def testQueuedRegularCountQuery: Unit = afterCreatingTables { val adapter = RunQueryAdapter( poster = Poster("crc-url", MockHttpClient), dao = dao, hiveCredentials = null, conceptTranslator = null, adapterLockoutAttemptsThreshold = adapterLockoutThreshold, doObfuscation = false, runQueriesImmediately = false, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, - botCountTimeThresholds = Seq.empty + botCountTimeThresholds = Seq.empty, + obfuscator = Obfuscator(5,6.5,10) ) val networkAuthn = AuthenticationInfo("nd", "nu", Credential("np", false)) import scala.concurrent.duration._ val req = RunQueryRequest(projectId, 1.second, authn, expectedNetworkQueryId, Option(topicId), Option(topicName), Set(PATIENT_COUNT_XML), queryDef) val broadcastMessage = BroadcastMessage(queryId, networkAuthn, req) val resp = adapter.processRequest(broadcastMessage).asInstanceOf[RunQueryResponse] resp.groupId should equal(networkAuthn.domain) resp.createDate should not be (null) // :\ resp.queryId should equal(-1L) resp.queryInstanceId should equal(-1L) resp.requestXml should equal(queryDef) resp.userId should equal(networkAuthn.username) resp.singleNodeResult.breakdowns should equal(Map.empty) resp.singleNodeResult.description.isDefined should be(true) resp.singleNodeResult.elapsed should equal(Some(0L)) resp.singleNodeResult.endDate.isDefined should be(true) resp.singleNodeResult.startDate.isDefined should be(true) resp.singleNodeResult.instanceId should equal(-1L) resp.singleNodeResult.isError should be(false) resp.singleNodeResult.resultId should equal(-1L) resp.singleNodeResult.resultType should be(Some(PATIENT_COUNT_XML)) resp.singleNodeResult.setSize should equal(-1L) resp.singleNodeResult.statusMessage.isDefined should be(true) resp.singleNodeResult.statusType should be(QueryResult.StatusType.Held) resp.singleNodeResult.endDate.isDefined should be(true) val Some(storedQuery) = dao.findQueryByNetworkId(expectedNetworkQueryId) storedQuery.dateCreated should not be (null) // :\ storedQuery.domain should equal(networkAuthn.domain) storedQuery.isFlagged should equal(false) storedQuery.localId should equal(-1L.toString) storedQuery.name should equal(queryDef.name) storedQuery.networkId should equal(expectedNetworkQueryId) storedQuery.queryDefinition should equal(queryDef) storedQuery.username should equal(networkAuthn.username) } private def doTestRegularCountQuery(status: QueryResult.StatusType, count: Long) = afterCreatingTables { require(!status.isError) val countQueryResultToUse = countQueryResult.copy(statusType = status, setSize = count) val outputTypes = justCounts val resp = doQuery(outputTypes) { import RawCrcRunQueryResponse.toQueryResultMap RawCrcRunQueryResponse(queryId, now, userId, groupId, queryDef, instanceId, toQueryResultMap(Seq(countQueryResultToUse))).toI2b2String }.asInstanceOf[RunQueryResponse] doBasicRunQueryResponseTest(resp) val firstResult = resp.results.head resp.results should equal(Seq(firstResult)) val Some(savedQuery) = dao.findResultsFor(expectedNetworkQueryId) savedQuery.wasRun should equal(true) savedQuery.isFlagged should equal(false) savedQuery.networkQueryId should equal(expectedNetworkQueryId) savedQuery.breakdowns should equal(Nil) savedQuery.count.creationDate should not be (null) savedQuery.count.localId should equal(countQueryResultToUse.resultId) //savedQuery.count.resultId should equal(resultId) TODO: REVISIT savedQuery.count.statusType should equal(status) if (status.isDone && !status.isError) { savedQuery.count.data.get.startDate should not be (null) savedQuery.count.data.get.endDate should not be (null) savedQuery.count.data.get.originalValue should be(count) ObfuscatorTest.within3(savedQuery.count.data.get.obfuscatedValue, count) should be(true) } else { savedQuery.count.data should be(None) } } @Test def testRegularCountQuery = doTestRegularCountQuery(QueryResult.StatusType.Finished, countQueryResult.setSize) @Test def testRegularCountQueryComesBackProcessing = doTestRegularCountQuery(QueryResult.StatusType.Processing, -1L) @Test def testRegularCountQueryComesBackQueued = doTestRegularCountQuery(QueryResult.StatusType.Queued, -1L) @Test def testRegularCountQueryComesBackError = afterCreatingTables { val errorQueryResult = QueryResult.errorResult(Some("some-description"), "some-status-message",TestProblem()) val outputTypes = justCounts val resp = doQuery(outputTypes) { import RawCrcRunQueryResponse.toQueryResultMap RawCrcRunQueryResponse(queryId, now, userId, groupId, queryDef, instanceId, toQueryResultMap(Seq(errorQueryResult))).toI2b2String } doBasicRunQueryResponseTest(resp) //TODO: Why are status and description messages from CRC dropped when unmarshalling QueryResults? //resp.results should equal(Seq(errorQueryResult)) resp.asInstanceOf[RunQueryResponse].results.head.statusType should be(QueryResult.StatusType.Error) dao.findResultsFor(expectedNetworkQueryId) should be(None) val Some(savedQueryRow) = dao.findQueryByNetworkId(expectedNetworkQueryId) val Seq(queryResultRow: QueryResultRow) = { import SquerylEntryPoint._ implicit val breakdownTypes = DefaultBreakdownResultOutputTypes.toSet inTransaction { from(tables.queryResults) { row => where(row.queryId === savedQueryRow.id). select(row.toQueryResultRow) }.toSeq } } val Seq(errorRow: ShrineError) = { import SquerylEntryPoint._ inTransaction { from(tables.errorResults) { row => where(row.resultId === queryResultRow.id). select(row.toShrineError) }.toSeq } } errorRow should not be (null) //TODO: ErrorMessage //errorRow.message should equal(errorQueryResult.statusMessage) } private def doTestBreakdownsAreObfuscated(result: QueryResult): Unit = { result.breakdowns.values.map(_.data).foreach { actualBreakdowns => actualBreakdowns.keySet should equal(dummyBreakdownData.keySet) for { breakdownName <- actualBreakdowns.keySet } { within3(actualBreakdowns(breakdownName), dummyBreakdownData(breakdownName)) should be(true) } } } @Test def testGetBreakdownsWithRegularCountQuery { val breakdowns = DefaultBreakdownResultOutputTypes.values.map(breakdownFor) val resp = doTestGetBreakdowns(breakdowns) val firstResult = resp.results.head firstResult.resultType should equal(Some(PATIENT_COUNT_XML)) firstResult.setSize should equal(setSize) firstResult.description should equal(None) firstResult.breakdowns.keySet should equal(DefaultBreakdownResultOutputTypes.toSet) //NB: Verify that breakdowns are obfuscated doTestBreakdownsAreObfuscated(firstResult) resp.results.size should equal(1) } @Test def testGetBreakdownsSomeFailures { val resultTypesExpectedToSucceed = Seq(PATIENT_AGE_COUNT_XML, PATIENT_GENDER_COUNT_XML) val breakdowns = resultTypesExpectedToSucceed.map(breakdownFor) val resp = doTestGetBreakdowns(breakdowns) val firstResult = resp.results.head firstResult.resultType should equal(Some(PATIENT_COUNT_XML)) firstResult.setSize should equal(setSize) firstResult.description should equal(None) firstResult.breakdowns.keySet should equal(resultTypesExpectedToSucceed.toSet) //NB: Verify that breakdowns are obfuscated doTestBreakdownsAreObfuscated(firstResult) resp.results.size should equal(1) } @Test def testErrorResponsesArePassedThrough: Unit = { val errorResponse = ErrorResponse(TestProblem(summary = "blarg!")) val resp = doQuery(Set(PATIENT_COUNT_XML)) { errorResponse.toI2b2String } resp should equal(errorResponse) } private def breakdownFor(resultType: ResultOutputType) = I2b2ResultEnvelope(resultType, dummyBreakdownData) private def doTestGetBreakdowns(successfulBreakdowns: Seq[I2b2ResultEnvelope]): RunQueryResponse = { val outputTypes = justCounts ++ DefaultBreakdownResultOutputTypes.toSet val resp = doQueryThatReturnsSpecifiedBreakdowns(outputTypes, successfulBreakdowns) doBasicRunQueryResponseTest(resp) resp } private def doBasicRunQueryResponseTest(r: BaseShrineResponse) { val resp = r.asInstanceOf[RunQueryResponse] resp.createDate should equal(now) resp.groupId should equal(groupId) resp.queryId should equal(queryId) resp.queryInstanceId should equal(instanceId) resp.queryName should equal(queryDef.name) resp.requestXml should equal(queryDef) } private def doQueryThatReturnsSpecifiedBreakdowns(outputTypes: Set[ResultOutputType], successfulBreakdowns: Seq[I2b2ResultEnvelope]): RunQueryResponse = afterCreatingTablesReturn { val breakdownQueryResults = DefaultBreakdownResultOutputTypes.values.zipWithIndex.map { case (rt, i) => countQueryResult.withId(resultId + i + 1).withResultType(rt) } //Need this rigamarole to ensure that resultIds line up such that the type of breakdown the adapter asks for //(PATIENT_AGE_COUNT_XML, etc) is what the mock HttpClient actually returns. Here, we build up maps of QueryResults //and I2b2ResultEnvelopes, keyed on resultIds generated in the previous expression, to use to look up values to use //to build ReadResultResponses val successfulBreakdownsByType = successfulBreakdowns.map(e => e.resultType -> e).toMap val successfulBreakdownTypes = successfulBreakdownsByType.keySet val breakdownQueryResultsByResultId = breakdownQueryResults.collect { case qr if successfulBreakdownTypes(qr.resultType.get) => qr.resultId -> qr }.toMap val breakdownsToBeReturnedByResultId = breakdownQueryResultsByResultId.map { case (resultId, queryResult) => (resultId, successfulBreakdownsByType(queryResult.resultType.get)) } val expectedLocalTerm = Term("bar") val httpClient = new HttpClient { override def post(input: String, url: String): HttpResponse = { val resp = CrcRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input) match { case Success(req: RunQueryRequest) => { //NB: Terms should be translated req.queryDefinition.expr.get should equal(expectedLocalTerm) //Credentials should be "translated" req.authn.username should equal(hiveCredentials.username) req.authn.domain should equal(hiveCredentials.domain) //I2b2 Project ID should be translated req.projectId should equal(hiveCredentials.projectId) val queryResultMap = RawCrcRunQueryResponse.toQueryResultMap(countQueryResult +: breakdownQueryResults) RawCrcRunQueryResponse(queryId, now, "userId", "groupId", queryDef, instanceId, queryResultMap) } //NB: return a ReadResultResponse with new breakdown data each time, but will throw if the asked-for breakdown //is not one of the ones passed to the enclosing method, simulating an error calling the CRC case Success(req: ReadResultRequest) => { val resultId = req.localResultId.toLong ReadResultResponse(xmlResultId, breakdownQueryResultsByResultId(resultId), breakdownsToBeReturnedByResultId(resultId)) } case _ => ??? //fail loudly } HttpResponse.ok(resp.toI2b2String) } } val result = doQuery(outputTypes, dao, httpClient) validateDb(successfulBreakdowns, breakdownQueryResultsByResultId) result.asInstanceOf[RunQueryResponse] } private def validateDb(breakdownsReturned: Seq[I2b2ResultEnvelope], breakdownQueryResultsByResultId: Map[Long, QueryResult]) { val expectedNetworkTerm = Term("foo") //We should have one row in the shrine_query table, for the query just performed val queryRow = first(queryRows) { queryRow.dateCreated should not be (null) queryRow.domain should equal(authn.domain) queryRow.name should equal(queryDef.name) queryRow.localId should equal(expectedLocalMasterId) queryRow.networkId should equal(expectedNetworkQueryId) queryRow.username should equal(authn.username) queryRow.queryDefinition.expr.get should equal(expectedNetworkTerm) } list(queryRows).size should equal(1) //We should have one row in the count_result table, with the right obfuscated value, which is within the expected amount from the original count val countRow = first(countResultRows) { countRow.creationDate should not be (null) countRow.originalValue should equal(countQueryResult.setSize) within3(countRow.obfuscatedValue, countQueryResult.setSize) should be(true) within3(countRow.obfuscatedValue, countRow.originalValue) should be(true) } list(countResultRows).size should equal(1) //We should have 5 rows in the query_result table, one for the count result and one for each of the 4 requested breakdown types val queryResults = list(queryResultRows) { val countQueryResultRow = queryResults.find(_.resultType == PATIENT_COUNT_XML).get countQueryResultRow.localId should equal(countQueryResult.resultId) countQueryResultRow.queryId should equal(queryRow.id) val resultIdsByResultType = breakdownQueryResultsByResultId.map { case (resultId, queryResult) => queryResult.resultType.get -> resultId }.toMap for (breakdownType <- DefaultBreakdownResultOutputTypes.values) { val breakdownQueryResultRow = queryResults.find(_.resultType == breakdownType).get breakdownQueryResultRow.queryId should equal(queryRow.id) //We'll have a result id if this breakdown type didn't fail if (resultIdsByResultType.contains(breakdownQueryResultRow.resultType)) { breakdownQueryResultRow.localId should equal(resultIdsByResultType(breakdownQueryResultRow.resultType)) } } } queryResults.size should equal(5) val returnedBreakdownTypes = breakdownsReturned.map(_.resultType).toSet val notReturnedBreakdownTypes = DefaultBreakdownResultOutputTypes.toSet -- returnedBreakdownTypes val errorResults = list(errorResultRows) //We should have a row in the error_result table for each breakdown that COULD NOT be retrieved { for { queryResult <- queryResults if notReturnedBreakdownTypes.contains(queryResult.resultType) resultType = queryResult.resultType resultId = queryResult.id } { errorResults.find(_.resultId == resultId).isDefined should be(true) } } errorResults.size should equal(notReturnedBreakdownTypes.size) //We should have properly-obfuscated rows in the breakdown_result table for each of the breakdown types that COULD be retrieved val breakdownResults = list(breakdownResultRows) val bdrs = breakdownResults.toIndexedSeq { for { queryResult <- queryResults if returnedBreakdownTypes.contains(queryResult.resultType) resultType = queryResult.resultType resultId = queryResult.id } { //Find all the rows for a particular breakdown type val rowsWithType = breakdownResults.filter(_.resultId == resultId) //Combining the rows should give the expected dummy data rowsWithType.map(row => row.dataKey -> row.originalValue).toMap should equal(dummyBreakdownData) for (breakdownRow <- rowsWithType) { within3(breakdownRow.obfuscatedValue, dummyBreakdownData(breakdownRow.dataKey)) should be(true) } } } } private def doQuery(outputTypes: Set[ResultOutputType])(i2b2XmlToReturn: => String): BaseShrineResponse = { doQuery(outputTypes, dao, MockHttpClient(i2b2XmlToReturn)) } private def doQuery(outputTypes: Set[ResultOutputType], adapterDao: AdapterDao, httpClient: HttpClient): BaseShrineResponse = { val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("foo" -> Set("bar")))) //NB: Don't obfuscate, for simpler testing val adapter = RunQueryAdapter( poster = Poster("crc-url", httpClient), dao = adapterDao, hiveCredentials = hiveCredentials, conceptTranslator = translator, adapterLockoutAttemptsThreshold = adapterLockoutThreshold, doObfuscation = false, runQueriesImmediately = true, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, - botCountTimeThresholds = Seq.empty + botCountTimeThresholds = Seq.empty, + obfuscator = Obfuscator(1,1.3,3) ) import scala.concurrent.duration._ val req = RunQueryRequest(projectId, 1.second, authn, expectedNetworkQueryId, Option(topicId), Option(topicName), outputTypes, queryDef) val networkAuthn = AuthenticationInfo("some-domain", "username", Credential("sadasdasdasd", false)) val broadcastMessage = BroadcastMessage(queryId, networkAuthn, req) adapter.processRequest(broadcastMessage) } } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/I2b2AdminResourceEndToEndJaxrsTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/I2b2AdminResourceEndToEndJaxrsTest.scala index 24889b1b9..b693cc480 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/I2b2AdminResourceEndToEndJaxrsTest.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/I2b2AdminResourceEndToEndJaxrsTest.scala @@ -1,189 +1,190 @@ package net.shrine.adapter.service import org.junit.Test -import net.shrine.adapter.HasI2b2AdminDao -import net.shrine.protocol.{HiveCredentials, ReadI2b2AdminPreviousQueriesRequest, ReadI2b2AdminQueryingUsersRequest, ReadI2b2AdminQueryingUsersResponse, I2b2AdminUserWithRole, ErrorResponse, RunHeldQueryRequest, RunQueryResponse, RunQueryRequest, ResultOutputType, QueryResult, BroadcastMessage, AuthenticationInfo, Credential, DefaultBreakdownResultOutputTypes} +import net.shrine.adapter.{HasI2b2AdminDao, Obfuscator, RunQueryAdapter} +import net.shrine.protocol.{AuthenticationInfo, BroadcastMessage, Credential, DefaultBreakdownResultOutputTypes, ErrorResponse, HiveCredentials, I2b2AdminUserWithRole, QueryResult, ReadI2b2AdminPreviousQueriesRequest, ReadI2b2AdminQueryingUsersRequest, ReadI2b2AdminQueryingUsersResponse, ResultOutputType, RunHeldQueryRequest, RunQueryRequest, RunQueryResponse} import net.shrine.client.Poster -import net.shrine.adapter.RunQueryAdapter import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.adapter.translators.ExpressionTranslator import net.shrine.client.HttpClient import net.shrine.client.HttpResponse import net.shrine.protocol.query.Term + import scala.util.Success import net.shrine.util.XmlDateHelper import net.shrine.protocol.query.QueryDefinition /** * @author clint * @since Apr 12, 2013 * * NB: Ideally we would extend JerseyTest here, but since we have to extend AbstractDependencyInjectionSpringContextTests, * we get into a diamond-problem when extending JerseyTest as well, even when both of them are extended by shim traits. * * We work around this issue by mising in JerseyTestCOmponent, which brings in a JerseyTest by composition, and ensures * that it is set up and torn down properly. */ final class I2b2AdminResourceEndToEndJaxrsTest extends AbstractI2b2AdminResourceJaxrsTest with HasI2b2AdminDao { private[this] val dummyUrl = "http://example.com" private[this] val dummyText = "This is dummy text" private[this] val dummyMasterId = 873456L private[this] val dummyInstanceId = 99L private[this] val dummyResultId = 42L private[this] val dummySetSize = 12345L private[this] val networkAuthn = AuthenticationInfo("network-domain", "network-username", Credential("network-password", false)) private lazy val runQueryAdapter: RunQueryAdapter = { val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("n1" -> Set("l1")))) val poster = new Poster(dummyUrl, new HttpClient { override def post(input: String, url: String): HttpResponse = { RunQueryRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input) match { case Success(req) => { val queryResult = QueryResult(dummyResultId, dummyInstanceId, Some(ResultOutputType.PATIENT_COUNT_XML), dummySetSize, Some(XmlDateHelper.now), Some(XmlDateHelper.now), Some("desc"), QueryResult.StatusType.Finished, Some("status")) val resp = RunQueryResponse(dummyMasterId, XmlDateHelper.now, networkAuthn.username, networkAuthn.domain, req.queryDefinition, 123L, queryResult) HttpResponse.ok(resp.toI2b2String) } case _ => ??? } } }) RunQueryAdapter( poster = poster, dao = dao, hiveCredentials = HiveCredentials("d", "u", "pwd", "pid"), conceptTranslator = translator, adapterLockoutAttemptsThreshold = 1000, doObfuscation = false, runQueriesImmediately = true, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, - botCountTimeThresholds = Seq.empty + botCountTimeThresholds = Seq.empty, + obfuscator = Obfuscator(5,6.5,10) ) } override def makeHandler = new I2b2AdminService(dao, i2b2AdminDao, Poster(dummyUrl, AlwaysAuthenticatesMockPmHttpClient), runQueryAdapter) @Test def testReadQueryDefinition = afterLoadingTestData { doTestReadQueryDefinition(networkQueryId1, Some((queryName1, queryDef1))) } @Test def testReadQueryDefinitionUnknownQueryId = afterLoadingTestData { doTestReadQueryDefinition(87134682364L, None) } import ReadI2b2AdminPreviousQueriesRequest.{Username, Category, SortOrder} import Username._ @Test def testReadI2b2AdminPreviousQueries = afterLoadingTestData { val searchString = queryName1 val maxResults = 123 val sortOrder = ReadI2b2AdminPreviousQueriesRequest.SortOrder.Ascending val categoryToSearchWithin = ReadI2b2AdminPreviousQueriesRequest.Category.All val searchStrategy = ReadI2b2AdminPreviousQueriesRequest.Strategy.Exact val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, All, searchString, maxResults, None, sortOrder, searchStrategy, categoryToSearchWithin) doTestReadI2b2AdminPreviousQueries(request, Seq(queryMaster1)) } @Test def testReadI2b2AdminPreviousQueriesNoResultsExpected = afterLoadingTestData { //A request that won't return anything val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, All, "askjdhakfgkafgkasf", 123, None) doTestReadI2b2AdminPreviousQueries(request, Nil) } @Test def testReadI2b2AdminPreviousQueriesExcludeUser: Unit = afterLoadingTestData { val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, Except(authn2.username), "", 10, None) doTestReadI2b2AdminPreviousQueries(request, Seq(queryMaster2, queryMaster1)) } @Test def testReadI2b2AdminPreviousQueriesOnlyFlagged: Unit = afterLoadingTestData { val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, All, "", 10, None, categoryToSearchWithin = Category.Flagged) doTestReadI2b2AdminPreviousQueries(request, Seq(queryMaster4, queryMaster1)) } @Test def testReadPreviousQueriesOnlyFlaggedExcludingUser: Unit = afterLoadingTestData { val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, Except(authn.username), "", 10, None, categoryToSearchWithin = Category.Flagged) doTestReadI2b2AdminPreviousQueries(request, Seq(queryMaster4)) } @Test def testReadPreviousQueriesExcludingUserWithSearchString: Unit = afterLoadingTestData { val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, All, queryName1, 10, None, categoryToSearchWithin = Category.Flagged) doTestReadI2b2AdminPreviousQueries(request, Seq(queryMaster1)) } @Test def testReadI2b2QueryingUsers = afterLoadingTestData { val request = ReadI2b2AdminQueryingUsersRequest(projectId, waitTime, authn, "foo") val ReadI2b2AdminQueryingUsersResponse(users) = adminClient.readI2b2AdminQueryingUsers(request) users.toSet should equal(Set(I2b2AdminUserWithRole(shrineProjectId, authn.username, "USER"), I2b2AdminUserWithRole(shrineProjectId, authn2.username, "USER"))) } @Test def testReadI2b2QueryingUsersNoResultsExpected = afterCreatingTables { val request = ReadI2b2AdminQueryingUsersRequest(projectId, waitTime, authn, "foo") val ReadI2b2AdminQueryingUsersResponse(users) = adminClient.readI2b2AdminQueryingUsers(request) //DB is empty, so no users will be returned users should equal(Nil) } @Test def testRunHeldQueryUnknownQuery = afterCreatingTables { val request = RunHeldQueryRequest(projectId, waitTime, authn, 12345L) val resp = adminClient.runHeldQuery(request) resp.isInstanceOf[ErrorResponse] should be(true) } @Test def testRunHeldQueryKnownQuery = afterCreatingTables { val networkQueryId = 12345L val request = RunHeldQueryRequest(projectId, waitTime, authn, networkQueryId) val queryName = "aslkdjasljkd" val queryExpr = Term("n1") val runQueryReq = RunQueryRequest(projectId, waitTime, authn, networkQueryId, None, None, Set(ResultOutputType.PATIENT_COUNT_XML), QueryDefinition(queryName, queryExpr)) runQueryAdapter.copy(runQueriesImmediately = false).processRequest(BroadcastMessage(networkAuthn, runQueryReq)) val resp = adminClient.runHeldQuery(request) val runQueryResp = resp.asInstanceOf[RunQueryResponse] runQueryResp.createDate should not be(null) runQueryResp.groupId should be(networkAuthn.domain) runQueryResp.userId should equal(networkAuthn.username) runQueryResp.queryId should equal(dummyMasterId) runQueryResp.singleNodeResult.setSize should equal(dummySetSize) runQueryResp.singleNodeResult.resultType should equal(Some(ResultOutputType.PATIENT_COUNT_XML)) //TODO runQueryResp.requestXml.name should equal(queryName) runQueryResp.requestXml.expr.get should equal(Term("l1")) } } diff --git a/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala b/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala index a4374634d..7f7658625 100644 --- a/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala +++ b/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala @@ -1,339 +1,336 @@ package net.shrine.integration import java.net.URL import net.shrine.log.Loggable import scala.concurrent.Future import scala.concurrent.duration.DurationInt import org.junit.Test import net.shrine.util.ShouldMatchersForJUnit -import net.shrine.adapter.AdapterMap -import net.shrine.adapter.DeleteQueryAdapter +import net.shrine.adapter.{AdapterMap, DeleteQueryAdapter, FlagQueryAdapter, Obfuscator, ReadQueryResultAdapter, RunQueryAdapter, UnFlagQueryAdapter} import net.shrine.adapter.client.AdapterClient import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest import net.shrine.adapter.service.AdapterRequestHandler import net.shrine.adapter.service.AdapterService import net.shrine.broadcaster.AdapterClientBroadcaster import net.shrine.broadcaster.NodeHandle import net.shrine.crypto.DefaultSignerVerifier import net.shrine.crypto.TestKeystore -import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, Credential, DeleteQueryRequest, DeleteQueryResponse, NodeId, Result, RunQueryRequest, CertId, RequestType, FlagQueryRequest, FlagQueryResponse, RawCrcRunQueryResponse, ResultOutputType, QueryResult, RunQueryResponse, AggregatedRunQueryResponse, UnFlagQueryRequest, UnFlagQueryResponse, DefaultBreakdownResultOutputTypes} +import net.shrine.protocol.{AggregatedRunQueryResponse, AuthenticationInfo, BroadcastMessage, CertId, Credential, DefaultBreakdownResultOutputTypes, DeleteQueryRequest, DeleteQueryResponse, FlagQueryRequest, FlagQueryResponse, HiveCredentials, NodeId, QueryResult, RawCrcRunQueryResponse, RequestType, Result, ResultOutputType, RunQueryRequest, RunQueryResponse, UnFlagQueryRequest, UnFlagQueryResponse} import net.shrine.qep.QepService import net.shrine.broadcaster.SigningBroadcastAndAggregationService import net.shrine.broadcaster.InJvmBroadcasterClient -import net.shrine.adapter.FlagQueryAdapter import net.shrine.protocol.query.Term -import net.shrine.adapter.RunQueryAdapter import net.shrine.client.Poster import net.shrine.client.HttpClient import net.shrine.client.HttpResponse import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.adapter.translators.ExpressionTranslator import net.shrine.util.XmlDateHelper -import net.shrine.adapter.ReadQueryResultAdapter import net.shrine.protocol.query.QueryDefinition -import net.shrine.adapter.UnFlagQueryAdapter import net.shrine.crypto.SigningCertStrategy /** * @author clint * @since Nov 27, 2013 * * An in-JVM simulation of a Shrine network with one hub and 4 downstream adapters. * * The hub and adapters are wired up with mock AdapterClients that do in-JVM communication via method calls * instead of remotely. * * The adapters are configured to respond with valid results for DeleteQueryRequests * only. Other requests could be handled, but that would not provide benefit to offset the effort of wiring * up more and more-complex Adapters. * * The test network is queried, and the final result, as well as the state of each adapter, is inspected to * ensure that the right messages were sent between elements of the system. * */ final class NetworkSimulationTest extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit { private val certCollection = TestKeystore.certCollection private lazy val myCertId: CertId = certCollection.myCertId.get private lazy val signerVerifier = new DefaultSignerVerifier(certCollection) private val domain = "test-domain" private val username = "test-username" private val password = "test-password" import NetworkSimulationTest._ import scala.concurrent.duration._ private def deleteQueryAdapter: DeleteQueryAdapter = new DeleteQueryAdapter(dao) private def flagQueryAdapter: FlagQueryAdapter = new FlagQueryAdapter(dao) private def unFlagQueryAdapter: UnFlagQueryAdapter = new UnFlagQueryAdapter(dao) private def mockPoster = Poster("http://example.com", new HttpClient { override def post(input: String, url: String): HttpResponse = ??? }) private val hiveCredentials = HiveCredentials("d", "u", "pwd", "pid") private def queuesQueriesRunQueryAdapter: RunQueryAdapter = { val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("n1" -> Set("l1")))) RunQueryAdapter( poster = mockPoster, dao = dao, hiveCredentials = hiveCredentials, conceptTranslator = translator, adapterLockoutAttemptsThreshold = 10000, doObfuscation = false, runQueriesImmediately = false, breakdownTypes = DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false, - botCountTimeThresholds = Seq.empty + botCountTimeThresholds = Seq.empty, + obfuscator = Obfuscator(5,6.5,10) ) } //todo this looks unused private def immediatelyRunsQueriesRunQueryAdapter(setSize: Long): RunQueryAdapter = { val mockCrcPoster = Poster("http://example.com", new HttpClient { override def post(input: String, url: String): HttpResponse = { val req = RunQueryRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input).get val now = XmlDateHelper.now val queryResult = QueryResult(1L, 42L, Some(ResultOutputType.PATIENT_COUNT_XML), setSize, Some(now), Some(now), Some("desc"), QueryResult.StatusType.Finished, Some("status")) val mockCrcXml = RawCrcRunQueryResponse(req.networkQueryId, XmlDateHelper.now, req.authn.username, req.projectId, req.queryDefinition, 42L, Map(ResultOutputType.PATIENT_COUNT_XML -> Seq(queryResult))).toI2b2String HttpResponse.ok(mockCrcXml) } }) queuesQueriesRunQueryAdapter.copy(poster = mockCrcPoster, runQueriesImmediately = true) } private def readQueryResultAdapter(setSize: Long): ReadQueryResultAdapter = { new ReadQueryResultAdapter( mockPoster, hiveCredentials, dao, doObfuscation = false, DefaultBreakdownResultOutputTypes.toSet, - collectAdapterAudit = false + collectAdapterAudit = false, + obfuscator = Obfuscator(5,6.5,10) ) } private lazy val adaptersByNodeId: Seq[(NodeId, MockAdapterRequestHandler)] = { import NodeName._ import RequestType.{ MasterDeleteRequest => MasterDeleteRequestRT, FlagQueryRequest => FlagQueryRequestRT, QueryDefinitionRequest => RunQueryRT, GetQueryResult => ReadQueryResultRT, UnFlagQueryRequest => UnFlagQueryRequestRT } (for { (childName, setSize) <- Seq((A, 1L), (B, 2L), (C, 3L), (D, 4L)) } yield { val nodeId = NodeId(childName.name) val maxSignatureAge = 1.hour val adapterMap = AdapterMap(Map( MasterDeleteRequestRT -> deleteQueryAdapter, FlagQueryRequestRT -> flagQueryAdapter, UnFlagQueryRequestRT -> unFlagQueryAdapter, RunQueryRT -> queuesQueriesRunQueryAdapter, ReadQueryResultRT -> readQueryResultAdapter(setSize))) nodeId -> MockAdapterRequestHandler(new AdapterService(nodeId, signerVerifier, maxSignatureAge, adapterMap)) }) } private lazy val shrineService: QepService = { val destinations: Set[NodeHandle] = { (for { (nodeId, adapterRequestHandler) <- adaptersByNodeId } yield { NodeHandle(nodeId, MockAdapterClient(nodeId, adapterRequestHandler)) }).toSet } QepService( "example.com", MockAuditDao, MockAuthenticator, MockQueryAuthorizationService, true, SigningBroadcastAndAggregationService(InJvmBroadcasterClient(AdapterClientBroadcaster(destinations, MockHubDao)), signerVerifier, SigningCertStrategy.Attach), 1.hour, DefaultBreakdownResultOutputTypes.toSet, false) } @Test def testSimulatedNetwork = afterCreatingTables { val authn = AuthenticationInfo(domain, username, Credential(password, false)) val masterId = 12345L import scala.concurrent.duration._ val req = DeleteQueryRequest("some-project-id", 1.second, authn, masterId) val resp = shrineService.deleteQuery(req, true) for { (nodeId, mockAdapter) <- adaptersByNodeId } { mockAdapter.lastMessage.networkAuthn.domain should equal(authn.domain) mockAdapter.lastMessage.networkAuthn.username should equal(authn.username) mockAdapter.lastMessage.request should equal(req) mockAdapter.lastResult.response should equal(DeleteQueryResponse(masterId)) } resp should equal(DeleteQueryResponse(masterId)) } @Test def testQueueQuery = afterCreatingTables { val authn = AuthenticationInfo(domain, username, Credential(password, false)) val topicId = "askldjlkas" val topicName = "Topic Name" val queryName = "lsadj3028940" import scala.concurrent.duration._ val runQueryReq = RunQueryRequest("some-project-id", 1.second, authn, 12345L, Some(topicId), Some(topicName), Set(ResultOutputType.PATIENT_COUNT_XML), QueryDefinition(queryName, Term("n1"))) val aggregatedRunQueryResp = shrineService.runQuery(runQueryReq, true).asInstanceOf[AggregatedRunQueryResponse] var broadcastMessageId: Option[Long] = None //Broadcast the original run query request; all nodes should queue the query for { (nodeId, mockAdapter) <- adaptersByNodeId } { broadcastMessageId = Option(mockAdapter.lastMessage.requestId) mockAdapter.lastMessage.networkAuthn.domain should equal(authn.domain) mockAdapter.lastMessage.networkAuthn.username should equal(authn.username) val lastReq = mockAdapter.lastMessage.request.asInstanceOf[RunQueryRequest] lastReq.authn should equal(runQueryReq.authn) lastReq.requestType should equal(runQueryReq.requestType) lastReq.waitTime should equal(runQueryReq.waitTime) //todo what to do with this check? lastReq.networkQueryId should equal(mockAdapter.lastMessage.requestId) lastReq.outputTypes should equal(runQueryReq.outputTypes) lastReq.projectId should equal(runQueryReq.projectId) lastReq.queryDefinition should equal(runQueryReq.queryDefinition) lastReq.topicId should equal(runQueryReq.topicId) val runQueryResp = mockAdapter.lastResult.response.asInstanceOf[RunQueryResponse] runQueryResp.queryId should equal(-1L) runQueryResp.singleNodeResult.statusType should equal(QueryResult.StatusType.Held) runQueryResp.singleNodeResult.setSize should equal(-1L) } aggregatedRunQueryResp.queryId should equal(broadcastMessageId.get) aggregatedRunQueryResp.results.map(_.setSize) should equal(Seq(-1L, -1L, -1L, -1L, -4L)) } @Test def testFlagQuery = afterCreatingTables { val authn = AuthenticationInfo(domain, username, Credential(password, false)) val masterId = 12345L import scala.concurrent.duration._ val networkQueryId = 9999L val name = "some query" val expr = Term("foo") val fooQuery = QueryDefinition(name,expr) dao.insertQuery(masterId.toString, networkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None) dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(false) dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(None) val req = FlagQueryRequest("some-project-id", 1.second, authn, networkQueryId, Some("foo")) val resp = shrineService.flagQuery(req, true) resp should equal(FlagQueryResponse) dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(true) dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(Some("foo")) } @Test def testUnFlagQuery = afterCreatingTables { val authn = AuthenticationInfo(domain, username, Credential(password, false)) val masterId = 12345L import scala.concurrent.duration._ val networkQueryId = 9999L val flagMsg = Some("foo") val name = "some query" val expr = Term("foo") val fooQuery = QueryDefinition(name,expr) dao.insertQuery(masterId.toString, networkQueryId, authn, fooQuery, isFlagged = true, hasBeenRun = true, flagMessage = flagMsg) dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(true) dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(flagMsg) val req = UnFlagQueryRequest("some-project-id", 1.second, authn, networkQueryId) val resp = shrineService.unFlagQuery(req, true) resp should equal(UnFlagQueryResponse) dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(false) dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(None) } } object NetworkSimulationTest { private final case class MockAdapterClient(nodeId: NodeId, adapter: AdapterRequestHandler) extends AdapterClient with Loggable { import scala.concurrent.ExecutionContext.Implicits.global override def query(message: BroadcastMessage): Future[Result] = Future.successful { debug(s"Invoking Adapter $nodeId with $message") val result = adapter.handleRequest(message) debug(s"Got result from $nodeId: $result") result } override def url: Option[URL] = ??? } private final case class MockAdapterRequestHandler(delegate: AdapterRequestHandler) extends AdapterRequestHandler { @volatile var lastMessage: BroadcastMessage = _ @volatile var lastResult: Result = _ override def handleRequest(request: BroadcastMessage): Result = { lastMessage = request val result = delegate.handleRequest(request) lastResult = result result } } }