diff --git a/adapter/adapter-service/src/main/resources/reference.conf b/adapter/adapter-service/src/main/resources/reference.conf
index 7a449beaa..1df54693e 100644
--- a/adapter/adapter-service/src/main/resources/reference.conf
+++ b/adapter/adapter-service/src/main/resources/reference.conf
@@ -1,12 +1,19 @@
shrine {
adapter {
+
+ obfucscation {
+ binSize = 5 //Round to the nearest binSize. Use 1 for no effect (to match SHRINE 1.21 and earlier).
+ sigma = 6.5 //Noise to inject. Use 0 for no effect. (Use 1.33 to match SHRINE 1.21 and earlier).
+ clamp = 10 //Maximum ammount of noise to inject. (Use 3 to match SHRINE 1.21 and earlier).
+ }
+
adapterLockoutAttemptsThreshold = 10 // Number of allowed queries with the same actual result that can exist before a researcher is locked out of the adapter. Set to '0' to never lock out. In 1.23 the default will change to 0. In 1.24 the lockout code and this config value will be removed
botDefense {
countsAndMilliseconds = [
{count = 10, milliseconds = 60000}, //allow up to 10 queries in one minute
{count = 200, milliseconds = 36000000} //allow up to 4 queries in 10 hours
]
}
}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
index 6e948f3e2..19e6d678b 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
@@ -1,298 +1,297 @@
package net.shrine.adapter
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import net.shrine.adapter.audit.AdapterAuditDb
import scala.Option.option2Iterable
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.xml.NodeSeq
-import net.shrine.adapter.Obfuscator.obfuscateResults
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.dao.model.Breakdown
import net.shrine.adapter.dao.model.ShrineQueryResult
import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BroadcastMessage, ErrorResponse, HasQueryResults, HiveCredentials, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse}
import net.shrine.protocol.query.QueryDefinition
-import net.shrine.util.StackTrace
import net.shrine.util.Tries.sequence
import scala.concurrent.duration.Duration
import net.shrine.client.Poster
import net.shrine.problem.{AbstractProblem, ProblemSources}
/**
* @author clint
* @since Nov 2, 2012
*
*/
object AbstractReadQueryResultAdapter {
private final case class RawResponseAttempts(countResponseAttempt: Try[ReadResultResponse], breakdownResponseAttempts: Seq[Try[ReadResultResponse]])
private final case class SpecificResponseAttempts[R](responseAttempt: Try[R], breakdownResponseAttempts: Seq[Try[ReadResultResponse]])
}
//noinspection RedundantBlock
abstract class AbstractReadQueryResultAdapter[Req <: BaseShrineRequest, Rsp <: ShrineResponse with HasQueryResults](
poster: Poster,
override val hiveCredentials: HiveCredentials,
dao: AdapterDao,
doObfuscation: Boolean,
getQueryId: Req => Long,
getProjectId: Req => String,
toResponse: (Long, QueryResult) => Rsp,
breakdownTypes: Set[ResultOutputType],
- collectAdapterAudit:Boolean
+ collectAdapterAudit:Boolean,
+ obfuscator:Obfuscator
) extends WithHiveCredentialsAdapter(hiveCredentials) {
//TODO: Make this configurable
private val numThreads = math.max(5, Runtime.getRuntime.availableProcessors)
//TODO: Use scala.concurrent.ExecutionContext.Implicits.global instead?
private lazy val executorService = Executors.newFixedThreadPool(numThreads)
private lazy val executionContext = ExecutionContext.fromExecutorService(executorService)
override def shutdown() {
try {
executorService.shutdown()
executorService.awaitTermination(5, TimeUnit.SECONDS)
} finally {
executorService.shutdownNow()
super.shutdown()
}
}
import AbstractReadQueryResultAdapter._
override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = {
val req = message.request.asInstanceOf[Req]
val queryId = getQueryId(req)
def findShrineQueryRow = dao.findQueryByNetworkId(queryId)
def findShrineQueryResults = dao.findResultsFor(queryId)
findShrineQueryRow match {
case None => {
debug(s"Query $queryId not found in the Shrine DB")
ErrorResponse(QueryNotFound(queryId))
}
case Some(shrineQueryRow) => {
findShrineQueryResults match {
case None => {
debug(s"Query $queryId found but its results are not available")
//TODO: When precisely can this happen? Should we go back to the CRC here?
ErrorResponse(QueryResultNotAvailable(queryId))
}
case Some(shrineQueryResult) => {
if (shrineQueryResult.isDone) {
debug(s"Query $queryId is done and already stored, returning stored results")
makeResponseFrom(queryId, shrineQueryResult)
} else {
debug(s"Query $queryId is incomplete, asking CRC for results")
val result: ShrineResponse = retrieveQueryResults(queryId, req, shrineQueryResult, message)
if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(queryId,result)
result
}
}
}
}
}
}
private def makeResponseFrom(queryId: Long, shrineQueryResult: ShrineQueryResult): ShrineResponse = {
shrineQueryResult.toQueryResults(doObfuscation).map(toResponse(queryId, _)).getOrElse(ErrorResponse(QueryNotFound(queryId)))
}
private def retrieveQueryResults(queryId: Long, req: Req, shrineQueryResult: ShrineQueryResult, message: BroadcastMessage): ShrineResponse = {
//NB: If the requested query was not finished executing on the i2b2 side when Shrine recorded it, attempt to
//retrieve it and all its sub-components (breakdown results, if any) in parallel. Asking for the results in
//parallel is quite possibly too clever, but may be faster than asking for them serially.
//TODO: Review this.
//Make requests for results in parallel
val futureResponses = scatter(message.networkAuthn, req, shrineQueryResult)
//Gather all the results (block until they're all returned)
val SpecificResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = gather(queryId, futureResponses, req.waitTime)
countResponseAttempt match {
//If we successfully received the parent response (the one with query type PATIENT_COUNT_XML), re-store it along
//with any retrieved breakdowns before returning it.
case Success(countResponse) => {
//NB: Only store the result if needed, that is, if all results are done
//TODO: REVIEW THIS
storeResultIfNecessary(shrineQueryResult, countResponse, req.authn, queryId, getFailedBreakdownTypes(breakdownResponseAttempts))
countResponse
}
case Failure(e) => ErrorResponse(CouldNotRetrieveQueryFromCrc(queryId,e))
}
}
private def scatter(authn: AuthenticationInfo, req: Req, shrineQueryResult: ShrineQueryResult): Future[RawResponseAttempts] = {
def makeRequest(localResultId: Long) = ReadResultRequest(hiveCredentials.projectId, req.waitTime, hiveCredentials.toAuthenticationInfo, localResultId.toString)
def process(localResultId: Long): ShrineResponse = {
delegateResultRetrievingAdapter.process(authn, makeRequest(localResultId))
}
implicit val executionContext = this.executionContext
import scala.concurrent.blocking
def futureBlockingAttempt[T](f: => T): Future[Try[T]] = Future(blocking(Try(f)))
val futureCountAttempt: Future[Try[ShrineResponse]] = futureBlockingAttempt {
process(shrineQueryResult.count.localId)
}
val futureBreakdownAttempts = Future.sequence(for {
Breakdown(_, localResultId, resultType, data) <- shrineQueryResult.breakdowns
} yield futureBlockingAttempt {
process(localResultId)
})
//Log errors retrieving count
futureCountAttempt.collect {
case Success(e: ErrorResponse) => error(s"Error requesting count result from the CRC: '$e'")
case Failure(e) => error(s"Error requesting count result from the CRC: ", e)
}
//Log errors retrieving breakdown
for {
breakdownResponseAttempts <- futureBreakdownAttempts
} {
breakdownResponseAttempts.collect {
case Success(e: ErrorResponse) => error(s"Error requesting breakdown result from the CRC: '$e'")
case Failure(e) => error(s"Error requesting breakdown result from the CRC: ", e)
}
}
//"Filter" for non-ErrorResponses
val futureNonErrorCountAttempt: Future[Try[ReadResultResponse]] = futureCountAttempt.collect {
case Success(resp: ReadResultResponse) => Success(resp) //NB: Need to repackage response here to avoid ugly, obscure, superfluous cast
case unexpected => Failure(new Exception(s"Getting count result failed. Response is: '$unexpected'"))
}
//"Filter" for non-ErrorResponses
val futureNonErrorBreakdownResponseAttempts: Future[Seq[Try[ReadResultResponse]]] = for {
breakdownResponseAttempts <- futureBreakdownAttempts
} yield {
breakdownResponseAttempts.collect {
case Success(resp: ReadResultResponse) => Try(resp)
}
}
for {
countResponseAttempt <- futureNonErrorCountAttempt
breakdownResponseAttempts <- futureNonErrorBreakdownResponseAttempts
} yield {
RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts)
}
}
private def gather(queryId: Long, futureResponses: Future[RawResponseAttempts], waitTime: Duration): SpecificResponseAttempts[Rsp] = {
val RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = Await.result(futureResponses, waitTime)
//Log any failures
(countResponseAttempt +: breakdownResponseAttempts).collect { case Failure(e) => e }.foreach(error("Error retrieving result from the CRC: ", _))
//NB: Count response and ALL breakdown responses must be available (not Failures) or else a Failure will be returned
val responseAttempt = for {
countResponse: ReadResultResponse <- countResponseAttempt
countQueryResult = countResponse.metadata
breakdownResponses: Seq[ReadResultResponse] <- sequence(breakdownResponseAttempts)
} yield {
val breakdownsByType = (for {
breakdownResponse <- breakdownResponses
resultType <- breakdownResponse.metadata.resultType
} yield resultType -> breakdownResponse.data).toMap
val queryResultWithBreakdowns = countQueryResult.withBreakdowns(breakdownsByType)
- val queryResultToReturn = if(doObfuscation) Obfuscator.obfuscate(queryResultWithBreakdowns) else queryResultWithBreakdowns
+ val queryResultToReturn = if(doObfuscation) obfuscator.obfuscate(queryResultWithBreakdowns) else queryResultWithBreakdowns
toResponse(queryId, queryResultToReturn)
}
SpecificResponseAttempts(responseAttempt, breakdownResponseAttempts)
}
private def getFailedBreakdownTypes(attempts: Seq[Try[ReadResultResponse]]): Set[ResultOutputType] = {
val successfulBreakdownTypes = attempts.collect { case Success(ReadResultResponse(_, metadata, _)) => metadata.resultType }.flatten
breakdownTypes -- successfulBreakdownTypes
}
private def storeResultIfNecessary(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) {
val responseIsDone = response.results.forall(_.statusType.isDone)
if (responseIsDone) {
storeResult(shrineQueryResult, response, authn, queryId, failedBreakdownTypes)
}
}
private def storeResult(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) {
val rawResults = response.results
- val obfuscatedResults = obfuscateResults(doObfuscation)(response.results)
+ val obfuscatedResults = obfuscator.obfuscateResults(doObfuscation)(response.results)
for {
shrineQuery <- dao.findQueryByNetworkId(queryId)
queryResult <- rawResults.headOption
obfuscatedQueryResult <- obfuscatedResults.headOption
} {
val queryDefinition = QueryDefinition(shrineQuery.name, shrineQuery.queryDefinition.expr)
dao.inTransaction {
dao.deleteQuery(queryId)
dao.storeResults(authn, shrineQueryResult.localId, queryId, queryDefinition, rawResults, obfuscatedResults, failedBreakdownTypes.toSeq, queryResult.breakdowns, obfuscatedQueryResult.breakdowns)
}
}
}
private type Unmarshaller[R] = Set[ResultOutputType] => NodeSeq => Try[R]
private final class DelegateAdapter[Rqst <: ShrineRequest, Rspns <: ShrineResponse](unmarshaller: Unmarshaller[Rspns]) extends CrcAdapter[Rqst, Rspns](poster, hiveCredentials) {
def process(authn: AuthenticationInfo, req: Rqst): Rspns = processRequest(BroadcastMessage(authn, req)).asInstanceOf[Rspns]
override protected def parseShrineResponse(xml: NodeSeq): ShrineResponse = unmarshaller(breakdownTypes)(xml).get //TODO: Avoid .get call
}
- private lazy val delegateResultRetrievingAdapter = new DelegateAdapter[ReadResultRequest, ReadResultResponse](ReadResultResponse.fromI2b2 _)
+ private lazy val delegateResultRetrievingAdapter = new DelegateAdapter[ReadResultRequest, ReadResultResponse](ReadResultResponse.fromI2b2)
}
case class QueryNotFound(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) {
override def summary: String = s"Query not found"
override def description:String = s"No query with id $queryId found on ${stamp.host.getHostName}"
}
case class QueryResultNotAvailable(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) {
override def summary: String = s"Query $queryId found but its results are not available yet"
override def description:String = s"Query $queryId found but its results are not available yet on ${stamp.host.getHostName}"
}
case class CouldNotRetrieveQueryFromCrc(queryId:Long,x: Throwable) extends AbstractProblem(ProblemSources.Adapter) {
override def summary: String = s"Could not retrieve query $queryId from the CRC"
override def description:String = s"Unhandled exception while retrieving query $queryId while retrieving it from the CRC on ${stamp.host.getHostName}"
override def throwable = Some(x)
}
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterComponents.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterComponents.scala
index 8b9345ab1..c3a03d4a8 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterComponents.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterComponents.scala
@@ -1,146 +1,151 @@
package net.shrine.adapter
import scala.collection.JavaConverters._
import com.typesafe.config.Config
import net.shrine.adapter.dao.{AdapterDao, I2b2AdminDao}
import net.shrine.adapter.dao.squeryl.{SquerylAdapterDao, SquerylI2b2AdminDao}
import net.shrine.adapter.dao.squeryl.tables.Tables
import net.shrine.adapter.service.{AdapterService, I2b2AdminService}
import net.shrine.adapter.translators.{ExpressionTranslator, QueryDefinitionTranslator}
import net.shrine.client.{EndpointConfig, Poster}
import net.shrine.config.mappings.{AdapterMappings, AdapterMappingsSource, ClasspathFormatDetectingAdapterMappingsSource}
import net.shrine.crypto.{DefaultSignerVerifier, KeyStoreCertCollection}
import net.shrine.dao.squeryl.SquerylInitializer
import net.shrine.protocol.{HiveCredentials, NodeId, RequestType, ResultOutputType}
import net.shrine.config.{ConfigExtensions, DurationConfigParser}
import scala.concurrent.duration.Duration
/**
* All the parts required for an adapter.
*
* @author david
* @since 1.22
*/
case class AdapterComponents(
adapterService: AdapterService,
i2b2AdminService: I2b2AdminService,
adapterDao: AdapterDao,
adapterMappings: AdapterMappings)
object AdapterComponents {
//todo try and trim this argument list back
def apply(
adapterConfig:Config, //config is "shrine.adapter"
certCollection: KeyStoreCertCollection,
squerylInitializer: SquerylInitializer,
breakdownTypes: Set[ResultOutputType],
crcHiveCredentials: HiveCredentials,
signerVerifier: DefaultSignerVerifier,
pmPoster: Poster,
nodeId: NodeId
):AdapterComponents = {
val crcEndpoint: EndpointConfig = adapterConfig.getConfigured("crcEndpoint",EndpointConfig(_))
val crcPoster: Poster = Poster(certCollection,crcEndpoint)
val squerylAdapterTables: Tables = new Tables
val adapterDao: AdapterDao = new SquerylAdapterDao(squerylInitializer, squerylAdapterTables)(breakdownTypes)
//NB: Is i2b2HiveCredentials.projectId the right project id to use?
val i2b2AdminDao: I2b2AdminDao = new SquerylI2b2AdminDao(crcHiveCredentials.projectId, squerylInitializer, squerylAdapterTables)
val adapterMappingsFile = adapterConfig.getString("adapterMappingsFileName")
val adapterMappingsSource: AdapterMappingsSource = ClasspathFormatDetectingAdapterMappingsSource(adapterMappingsFile)
//NB: Fail fast
val adapterMappings: AdapterMappings = adapterMappingsSource.load(adapterMappingsFile).get
val expressionTranslator: ExpressionTranslator = ExpressionTranslator(adapterMappings)
val queryDefinitionTranslator: QueryDefinitionTranslator = new QueryDefinitionTranslator(expressionTranslator)
val doObfuscation = adapterConfig.getBoolean("setSizeObfuscation")
val collectAdapterAudit = adapterConfig.getBoolean("audit.collectAdapterAudit")
val botCountTimeThresholds: Seq[(Long, Duration)] = {
import scala.concurrent.duration._
val countsAndMilliseconds: Seq[Config] = adapterConfig.getConfig("botDefense").getConfigList("countsAndMilliseconds").asScala
countsAndMilliseconds.map(pairConfig => (pairConfig.getLong("count"),pairConfig.getLong("milliseconds").milliseconds))
}
+ val obfuscator:Obfuscator = adapterConfig.getConfigured("obfucscation",Obfuscator(_))
+
val runQueryAdapter = RunQueryAdapter(
poster = crcPoster,
dao = adapterDao,
hiveCredentials = crcHiveCredentials,
conceptTranslator = queryDefinitionTranslator,
adapterLockoutAttemptsThreshold = adapterConfig.getInt("adapterLockoutAttemptsThreshold"),
doObfuscation = doObfuscation,
runQueriesImmediately = adapterConfig.getOption("immediatelyRunIncomingQueries", _.getBoolean).getOrElse(true), //todo use reference.conf
breakdownTypes = breakdownTypes,
collectAdapterAudit = collectAdapterAudit,
- botCountTimeThresholds = botCountTimeThresholds
+ botCountTimeThresholds = botCountTimeThresholds,
+ obfuscator = obfuscator
)
val readInstanceResultsAdapter: Adapter = new ReadInstanceResultsAdapter(
- crcPoster,
- crcHiveCredentials,
- adapterDao,
- doObfuscation,
- breakdownTypes,
- collectAdapterAudit
+ poster = crcPoster,
+ hiveCredentials = crcHiveCredentials,
+ dao = adapterDao,
+ doObfuscation = doObfuscation,
+ breakdownTypes = breakdownTypes,
+ collectAdapterAudit = collectAdapterAudit,
+ obfuscator = obfuscator
)
val readQueryResultAdapter: Adapter = new ReadQueryResultAdapter(
crcPoster,
crcHiveCredentials,
adapterDao,
doObfuscation,
breakdownTypes,
- collectAdapterAudit
+ collectAdapterAudit,
+ obfuscator = obfuscator
)
val readPreviousQueriesAdapter: Adapter = new ReadPreviousQueriesAdapter(adapterDao)
val deleteQueryAdapter: Adapter = new DeleteQueryAdapter(adapterDao)
val renameQueryAdapter: Adapter = new RenameQueryAdapter(adapterDao)
val readQueryDefinitionAdapter: Adapter = new ReadQueryDefinitionAdapter(adapterDao)
val readTranslatedQueryDefinitionAdapter: Adapter = new ReadTranslatedQueryDefinitionAdapter(nodeId, queryDefinitionTranslator)
val flagQueryAdapter: Adapter = new FlagQueryAdapter(adapterDao)
val unFlagQueryAdapter: Adapter = new UnFlagQueryAdapter(adapterDao)
val adapterMap = AdapterMap(Map(
RequestType.QueryDefinitionRequest -> runQueryAdapter,
RequestType.GetRequestXml -> readQueryDefinitionAdapter,
RequestType.UserRequest -> readPreviousQueriesAdapter,
RequestType.InstanceRequest -> readInstanceResultsAdapter,
RequestType.MasterDeleteRequest -> deleteQueryAdapter,
RequestType.MasterRenameRequest -> renameQueryAdapter,
RequestType.GetQueryResult -> readQueryResultAdapter,
RequestType.ReadTranslatedQueryDefinitionRequest -> readTranslatedQueryDefinitionAdapter,
RequestType.FlagQueryRequest -> flagQueryAdapter,
RequestType.UnFlagQueryRequest -> unFlagQueryAdapter))
AdapterComponents(
adapterService = new AdapterService(
nodeId = nodeId,
signatureVerifier = signerVerifier,
maxSignatureAge = adapterConfig.getConfigured("maxSignatureAge", DurationConfigParser(_)),
adapterMap = adapterMap
),
i2b2AdminService = new I2b2AdminService(
dao = adapterDao,
i2b2AdminDao = i2b2AdminDao,
pmPoster = pmPoster,
runQueryAdapter = runQueryAdapter
),
adapterDao = adapterDao,
adapterMappings = adapterMappings)
}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Obfuscator.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Obfuscator.scala
index 4176c1928..519cfc53a 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Obfuscator.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Obfuscator.scala
@@ -1,79 +1,66 @@
package net.shrine.adapter
+import com.typesafe.config.Config
import net.shrine.protocol.QueryResult
-import java.util.Random
+
+import scala.util.Random
/**
- * @author Bill Simons
- * @author clint
- * @date 4/21/11
- * @link http://cbmi.med.harvard.edu
+ * @author Ricardo De Lima
+ * @author Bill Simons
+ * @author clint
+ * @author dwalend
+ * @since August 18, 2009
+ * @see http://cbmi.med.harvard.edu
*/
-object Obfuscator {
+case class Obfuscator(binSize:Int,stdDev:Double,noiseClamp:Int) {
+ val random = new Random
+
+ def obfuscateResults(doObfuscation: Boolean)(results: Seq[QueryResult]): Seq[QueryResult] = {
+ if (doObfuscation) results.map(obfuscate) else results
+ }
+
def obfuscate(result: QueryResult): QueryResult = {
val withObfscSetSize = result.modifySetSize(obfuscate)
-
- if(withObfscSetSize.breakdowns.isEmpty) { withObfscSetSize }
+
+ if (withObfscSetSize.breakdowns.isEmpty) {
+ withObfscSetSize
+ }
else {
val obfuscatedBreakdowns = result.breakdowns.mapValues(_.mapValues(obfuscate))
-
+
withObfscSetSize.withBreakdowns(obfuscatedBreakdowns)
}
}
def obfuscate(l: Long): Long = {
- import GaussianObfuscator._
-
- val obfuscationAmount = determineObfuscationAmount(l)
-
- determineObfuscatedSetSize(l, obfuscationAmount)
- }
-
- def obfuscateResults(doObfuscation: Boolean)(results: Seq[QueryResult]): Seq[QueryResult] = {
- if(doObfuscation) results.map(obfuscate) else results
- }
-
- /**
- * @author clint
- * @author Ricardo De Lima
- * @date August 18, 2009
- *
- * Harvard Medical School Center for BioMedical Informatics
- *
- * @link http://cbmi.med.harvard.edu
- */
- object GaussianObfuscator {
- private val stdDev = 1.33
-
- private val mean = 0
-
- private val rand = new Random
-
- val range = 3
- private val lower = (-range).toDouble
-
- private val upper = range.toDouble
-
- def determineObfuscationAmount(x: Long): Int = scala.math.round(gaussian(mean, stdDev)).toInt
+ def roundToNearest(i: Double, n: Double): Long = {
+ Math.round(
+ if ((i % n) >= n / 2) i + n - (i % n) //round up
+ else i - i % n //round down
+ )
+ }
- def determineObfuscatedSetSize(setSize: Long, obfuscationAmount: Int): Long = {
- if (setSize <= 10) { -1L }
- else { setSize + obfuscationAmount }
+ def clampedGaussian(i: Long, clamp: Long): Double = {
+ val noise = random.nextGaussian() * stdDev
+ //clamp it
+ if (noise > clamp) clamp
+ else if (noise < -clamp) -clamp
+ else noise
}
- /**
- * Return a real number from a gaussian distribution with given mean and
- * stddev
- */
- def gaussian(mean: Double, stddev: Double): Double = {
- def limitRange(v: Double): Double = {
- val partiallyClamped = if (v < lower) lower else v
+ //bin
+ val binned = roundToNearest(l, binSize)
- if (partiallyClamped > upper) upper else partiallyClamped
- }
+ //add noise
+ val noised = binned + clampedGaussian(binned, noiseClamp)
- limitRange(mean + (stddev * rand.nextGaussian))
- }
+ //bin again
+ roundToNearest(noised, binSize)
}
+}
+
+object Obfuscator {
+ def apply(config:Config): Obfuscator = Obfuscator(config.getInt("binSize"),config.getDouble("sigma"),config.getInt("clamp"))
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadInstanceResultsAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadInstanceResultsAdapter.scala
index 56f5b443f..8b5c9ca30 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadInstanceResultsAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadInstanceResultsAdapter.scala
@@ -1,38 +1,37 @@
package net.shrine.adapter
-import xml.NodeSeq
import net.shrine.adapter.dao.AdapterDao
-import net.shrine.protocol.{HiveCredentials, ReadInstanceResultsResponse, ReadInstanceResultsRequest, BroadcastMessage, ShrineResponse, ErrorResponse, ResultOutputType}
-import net.shrine.serialization.XmlMarshaller
-import net.shrine.client.HttpClient
import net.shrine.client.Poster
+import net.shrine.protocol.{HiveCredentials, ReadInstanceResultsRequest, ReadInstanceResultsResponse, ResultOutputType}
/**
* @author Bill Simons
* @since 4/14/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
final class ReadInstanceResultsAdapter(
poster: Poster,
override val hiveCredentials: HiveCredentials,
dao: AdapterDao,
doObfuscation: Boolean,
breakdownTypes: Set[ResultOutputType],
- collectAdapterAudit:Boolean
+ collectAdapterAudit:Boolean,
+ obfuscator: Obfuscator
) extends AbstractReadQueryResultAdapter[ReadInstanceResultsRequest, ReadInstanceResultsResponse](
- poster,
- hiveCredentials,
- dao,
- doObfuscation,
- _.shrineNetworkQueryId,
- _.projectId,
- ReadInstanceResultsResponse(_, _),
- breakdownTypes,
- collectAdapterAudit
+ poster = poster,
+ hiveCredentials = hiveCredentials,
+ dao = dao,
+ doObfuscation = doObfuscation,
+ getQueryId = _.shrineNetworkQueryId,
+ getProjectId = _.projectId,
+ toResponse = ReadInstanceResultsResponse(_, _),
+ breakdownTypes = breakdownTypes,
+ collectAdapterAudit = collectAdapterAudit,
+ obfuscator = obfuscator
)
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadQueryResultAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadQueryResultAdapter.scala
index 275971731..533523239 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadQueryResultAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/ReadQueryResultAdapter.scala
@@ -1,33 +1,33 @@
package net.shrine.adapter
import net.shrine.adapter.dao.AdapterDao
-import net.shrine.protocol.{HiveCredentials, BroadcastMessage, ErrorResponse, ReadQueryResultRequest, ReadQueryResultResponse, ResultOutputType}
-import net.shrine.serialization.XmlMarshaller
-import net.shrine.client.HttpClient
import net.shrine.client.Poster
+import net.shrine.protocol.{HiveCredentials, ReadQueryResultRequest, ReadQueryResultResponse, ResultOutputType}
/**
* @author clint
* @since Nov 2, 2012
*
*/
final class ReadQueryResultAdapter(
poster: Poster,
override val hiveCredentials: HiveCredentials,
dao: AdapterDao,
doObfuscation: Boolean,
breakdownTypes: Set[ResultOutputType],
- collectAdapterAudit:Boolean
+ collectAdapterAudit:Boolean,
+ obfuscator: Obfuscator
) extends AbstractReadQueryResultAdapter[ReadQueryResultRequest, ReadQueryResultResponse](
- poster,
- hiveCredentials,
- dao,
- doObfuscation,
- _.queryId,
- _.projectId,
- ReadQueryResultResponse(_, _),
- breakdownTypes,
- collectAdapterAudit
+ poster = poster,
+ hiveCredentials = hiveCredentials,
+ dao = dao,
+ doObfuscation = doObfuscation,
+ getQueryId = _.queryId,
+ getProjectId = _.projectId,
+ toResponse = ReadQueryResultResponse(_, _),
+ breakdownTypes = breakdownTypes,
+ collectAdapterAudit = collectAdapterAudit,
+ obfuscator = obfuscator
)
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala
index cc4c0561b..c3cbcea48 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala
@@ -1,289 +1,284 @@
package net.shrine.adapter
import net.shrine.adapter.audit.AdapterAuditDb
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.xml.NodeSeq
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.translators.QueryDefinitionTranslator
import net.shrine.protocol.{AuthenticationInfo, BroadcastMessage, Credential, ErrorFromCrcException, ErrorResponse, HiveCredentials, I2b2ResultEnvelope, MissingCrCXmlResultException, QueryResult, RawCrcRunQueryResponse, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse, ShrineResponse}
import net.shrine.client.Poster
import net.shrine.problem.{AbstractProblem, LoggingProblemHandler, Problem, ProblemNotYetEncoded, ProblemSources}
import scala.util.control.NonFatal
import net.shrine.util.XmlDateHelper
import scala.concurrent.duration.Duration
import scala.xml.XML
/**
* @author Bill Simons
* @author clint
* @since 4/15/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
final case class RunQueryAdapter(
poster: Poster,
dao: AdapterDao,
override val hiveCredentials: HiveCredentials,
conceptTranslator: QueryDefinitionTranslator,
adapterLockoutAttemptsThreshold: Int, //Set to 0 to disable lockout. todo remove in SHRINE 1.24
doObfuscation: Boolean,
runQueriesImmediately: Boolean,
breakdownTypes: Set[ResultOutputType],
collectAdapterAudit:Boolean,
- botCountTimeThresholds:Seq[(Long,Duration)]
+ botCountTimeThresholds:Seq[(Long,Duration)],
+ obfuscator: Obfuscator
) extends CrcAdapter[RunQueryRequest, RunQueryResponse](poster, hiveCredentials) {
logStartup()
import RunQueryAdapter._
override protected[adapter] def parseShrineResponse(xml: NodeSeq) = RawCrcRunQueryResponse.fromI2b2(breakdownTypes)(xml).get //TODO: Avoid .get call
override protected[adapter] def translateNetworkToLocal(request: RunQueryRequest): RunQueryRequest = {
try { request.mapQueryDefinition(conceptTranslator.translate) }
catch {
case NonFatal(e) => throw new AdapterMappingException(request,s"Error mapping query terms from network to local forms.", e)
}
}
override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = {
if (collectAdapterAudit) AdapterAuditDb.db.insertQueryReceived(message)
if (isLockedOut(message.networkAuthn)) {
throw new AdapterLockoutException(message.networkAuthn,poster.url)
}
dao.checkIfBot(message.networkAuthn,botCountTimeThresholds)
val runQueryReq = message.request.asInstanceOf[RunQueryRequest]
//We need to use the network identity from the BroadcastMessage, since that will have the network username
//(ie, ecommons) of the querying user. Using the AuthenticationInfo from the incoming request breaks the fetching
//of previous queries on deployed systems where the credentials in the identity param to this method and the authn
//field of the incoming request are different, like the HMS Shrine deployment.
//NB: Credential field is wiped out to preserve old behavior -Clint 14 Nov, 2013
val authnToUse = message.networkAuthn.copy(credential = Credential("", isToken = false))
if (!runQueriesImmediately) {
debug(s"Queueing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}")
storeQuery(authnToUse, message, runQueryReq)
} else {
debug(s"Performing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}")
val result: ShrineResponse = runQuery(authnToUse, message.copy(request = runQueryReq.withAuthn(authnToUse)), runQueryReq.withAuthn(authnToUse))
if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(runQueryReq.networkQueryId,result)
result
}
}
private def storeQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): RunQueryResponse = {
//Use dummy ids for what we would have received from the CRC
val masterId: Long = -1L
val queryInstanceId: Long = -1L
val resultId: Long = -1L
//TODO: is this right?? Or maybe it's project id?
val groupId = authnToUse.domain
val invalidSetSize = -1L
val now = XmlDateHelper.now
val queryResult = QueryResult(resultId, queryInstanceId, Some(ResultOutputType.PATIENT_COUNT_XML), invalidSetSize, Some(now), Some(now), Some("Query enqueued for later processing"), QueryResult.StatusType.Held, Some("Query enqueued for later processing"))
dao.inTransaction {
val insertedQueryId = dao.insertQuery(masterId.toString, request.networkQueryId, authnToUse, request.queryDefinition, isFlagged = false, hasBeenRun = false, flagMessage = None)
val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(queryResult))
//NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in
//AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete
//queries, will work.
val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head
dao.insertCountResult(countQueryResultId, -1L, -1L)
}
RunQueryResponse(masterId, XmlDateHelper.now, authnToUse.username, groupId, request.queryDefinition, queryInstanceId, queryResult)
}
private def runQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): ShrineResponse = {
if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionStarted(request)
//NB: Pass through ErrorResponses received from the CRC.
//See: https://open.med.harvard.edu/jira/browse/SHRINE-794
val result = super.processRequest(message) match {
case e: ErrorResponse => e
case rawRunQueryResponse: RawCrcRunQueryResponse => processRawCrcRunQueryResponse(authnToUse, request, rawRunQueryResponse)
}
if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionCompletedShrineResponse(request,result)
result
}
private[adapter] def processRawCrcRunQueryResponse(authnToUse: AuthenticationInfo, request: RunQueryRequest, rawRunQueryResponse: RawCrcRunQueryResponse): RunQueryResponse = {
def isBreakdown(result: QueryResult) = result.resultType.exists(_.isBreakdown)
val originalResults: Seq[QueryResult] = rawRunQueryResponse.results
val (originalBreakdownResults, originalNonBreakDownResults): (Seq[QueryResult],Seq[QueryResult]) = originalResults.partition(isBreakdown)
val originalBreakdownCountAttempts: Seq[(QueryResult, Try[QueryResult])] = attemptToRetrieveBreakdowns(request, originalBreakdownResults)
val (successfulBreakdownCountAttempts, failedBreakdownCountAttempts) = originalBreakdownCountAttempts.partition { case (_, t) => t.isSuccess }
val failedBreakdownCountAttemptsWithProblems = failedBreakdownCountAttempts.map { attempt =>
val originalResult: QueryResult = attempt._1
val queryResult:QueryResult = if (originalResult.problemDigest.isDefined) originalResult
else {
attempt._2 match {
case Success(_) => originalResult
case Failure(x) => //noinspection RedundantBlock
{
val problem:Problem = x match {
case e: ErrorFromCrcException => ErrorFromCrcBreakdown(e)
case e: MissingCrCXmlResultException => CannotInterpretCrcBreakdownXml(e)
case NonFatal(e) => {
val summary = s"Unexpected exception while interpreting breakdown response"
ProblemNotYetEncoded(summary, e)
}
}
LoggingProblemHandler.handleProblem(problem)
originalResult.copy(problemDigest = Some(problem.toDigest))
}
}
}
(queryResult,attempt._2)
}
logBreakdownFailures(rawRunQueryResponse, failedBreakdownCountAttemptsWithProblems)
val originalMergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope] = {
val withBreakdownCounts = successfulBreakdownCountAttempts.collect { case (_, Success(queryResultWithBreakdowns)) => queryResultWithBreakdowns }
withBreakdownCounts.map(_.breakdowns).fold(Map.empty)(_ ++ _)
}
- val obfuscatedQueryResults = originalResults.map(Obfuscator.obfuscate)
+ val obfuscatedQueryResults = originalResults.map(obfuscator.obfuscate)
val obfuscatedNonBreakdownQueryResults = obfuscatedQueryResults.filterNot(isBreakdown)
- val obfuscatedMergedBreakdowns = obfuscateBreakdowns(originalMergedBreakdowns)
+ val obfuscatedMergedBreakdowns = originalMergedBreakdowns.mapValues(_.mapValues(obfuscator.obfuscate))
val failedBreakdownTypes = failedBreakdownCountAttemptsWithProblems.flatMap { case (qr, _) => qr.resultType }
dao.storeResults(
authn = authnToUse,
masterId = rawRunQueryResponse.queryId.toString,
networkQueryId = request.networkQueryId,
queryDefinition = request.queryDefinition,
rawQueryResults = originalResults,
obfuscatedQueryResults = obfuscatedQueryResults,
failedBreakdownTypes = failedBreakdownTypes,
mergedBreakdowns = originalMergedBreakdowns,
obfuscatedBreakdowns = obfuscatedMergedBreakdowns)
// at this point the queryResult could be a mix of successes and failures.
// SHRINE reports only the successes. See SHRINE-1567 for details
val queryResults: Seq[QueryResult] = if (doObfuscation) obfuscatedNonBreakdownQueryResults else originalNonBreakDownResults
val breakdownsToReturn: Map[ResultOutputType, I2b2ResultEnvelope] = if (doObfuscation) obfuscatedMergedBreakdowns else originalMergedBreakdowns
//TODO: Will fail in the case of NO non-breakdown QueryResults. Can this ever happen, and is it worth protecting against here?
//can failedBreakdownCountAttempts be mixed back in here?
val resultWithBreakdowns: QueryResult = queryResults.head.withBreakdowns(breakdownsToReturn)
if(debugEnabled) {
def justBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = breakdowns.mapValues(_.data)
val obfuscationMessage = s"obfuscation is ${if(doObfuscation) "ON" else "OFF"}"
debug(s"Returning QueryResult with count ${resultWithBreakdowns.setSize} (original count: ${originalNonBreakDownResults.headOption.map(_.setSize)} ; $obfuscationMessage)")
debug(s"Returning QueryResult with breakdowns ${justBreakdowns(resultWithBreakdowns.breakdowns)} (original breakdowns: ${justBreakdowns(originalMergedBreakdowns)} ; $obfuscationMessage)")
debug(s"Full QueryResult: $resultWithBreakdowns")
}
//if any results had problems, this commented out code can turn it into an error QueryResult
//See SHRINE-1619
//val problem: Option[ProblemDigest] = failedBreakdownCountAttemptsWithProblems.headOption.flatMap(x => x._1.problemDigest)
//val queryResult = problem.fold(resultWithBreakdowns)(pd => QueryResult.errorResult(Some(pd.description),"Error with CRC",pd))
rawRunQueryResponse.toRunQueryResponse.withResult(resultWithBreakdowns)
}
private def getResultFromCrc(parentRequest: RunQueryRequest, networkResultId: Long): Try[ReadResultResponse] = {
def readResultRequest(runQueryReq: RunQueryRequest, networkResultId: Long) = ReadResultRequest(hiveCredentials.projectId, runQueryReq.waitTime, hiveCredentials.toAuthenticationInfo, networkResultId.toString)
Try(XML.loadString(callCrc(readResultRequest(parentRequest, networkResultId)))).flatMap(ReadResultResponse.fromI2b2(breakdownTypes))
}
private[adapter] def attemptToRetrieveCount(runQueryReq: RunQueryRequest, originalCountQueryResult: QueryResult): (QueryResult, Try[QueryResult]) = {
originalCountQueryResult -> (for {
countData <- getResultFromCrc(runQueryReq, originalCountQueryResult.resultId)
} yield originalCountQueryResult.withSetSize(countData.metadata.setSize))
}
private[adapter] def attemptToRetrieveBreakdowns(runQueryReq: RunQueryRequest, breakdownResults: Seq[QueryResult]): Seq[(QueryResult, Try[QueryResult])] = {
breakdownResults.map { origBreakdownResult =>
origBreakdownResult -> (for {
breakdownData <- getResultFromCrc(runQueryReq, origBreakdownResult.resultId).map(_.data)
} yield origBreakdownResult.withBreakdown(breakdownData))
}
}
private[adapter] def logBreakdownFailures(response: RawCrcRunQueryResponse,
failures: Seq[(QueryResult, Try[QueryResult])]) {
for {
(origQueryResult, Failure(e)) <- failures
} {
error(s"Couldn't load breakdown for QueryResult with masterId: ${response.queryId}, instanceId: ${origQueryResult.instanceId}, resultId: ${origQueryResult.resultId}. Asked for result type: ${origQueryResult.resultType}", e)
}
}
private def isLockedOut(authn: AuthenticationInfo): Boolean = {
adapterLockoutAttemptsThreshold match {
case 0 => false
case _ => dao.isUserLockedOut(authn, adapterLockoutAttemptsThreshold)
}
}
private def logStartup(): Unit = {
val message = {
if (runQueriesImmediately) { s"${getClass.getSimpleName} will run queries immediately" }
else { s"${getClass.getSimpleName} will queue queries for later execution" }
}
info(message)
}
}
-object RunQueryAdapter {
- private[adapter] def obfuscateBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Map[ResultOutputType, I2b2ResultEnvelope] = {
- breakdowns.mapValues(_.mapValues(Obfuscator.obfuscate))
- }
-}
-
case class ErrorFromCrcBreakdown(x:ErrorFromCrcException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
override val summary: String = "The CRC reported an error."
override val description = "The CRC reported an internal error."
}
case class CannotInterpretCrcBreakdownXml(x:MissingCrCXmlResultException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
override val summary: String = "SHRINE cannot interpret the CRC response."
override val description = "The CRC responded, but SHRINE could not interpret that response."
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala
index 09fdfc874..4573e63c0 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AbstractQueryRetrievalTestCase.scala
@@ -1,379 +1,380 @@
package net.shrine.adapter
import scala.xml.NodeSeq
import net.shrine.util.ShouldMatchersForJUnit
import ObfuscatorTest.within3
import javax.xml.datatype.XMLGregorianCalendar
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest
import net.shrine.client.HttpClient
import net.shrine.client.HttpResponse
import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, BroadcastMessage, CrcRequest, Credential, DefaultBreakdownResultOutputTypes, ErrorResponse, HiveCredentials, I2b2ResultEnvelope, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse, ShrineRequest, ShrineResponse}
import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_AGE_COUNT_XML
import net.shrine.protocol.ResultOutputType.PATIENT_COUNT_XML
import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML
import net.shrine.protocol.query.{QueryDefinition, Term}
import net.shrine.util.XmlDateHelper
import net.shrine.util.XmlDateHelper.now
import net.shrine.util.XmlGcEnrichments
import net.shrine.client.Poster
import net.shrine.adapter.translators.QueryDefinitionTranslator
import net.shrine.adapter.translators.ExpressionTranslator
import net.shrine.problem.TestProblem
import scala.util.Success
/**
* @author clint
* @since Nov 8, 2012
*/
//noinspection UnitMethodIsParameterless
abstract class AbstractQueryRetrievalTestCase[R <: BaseShrineResponse](
makeAdapter: (AdapterDao, HttpClient) => WithHiveCredentialsAdapter,
makeRequest: (Long, AuthenticationInfo) => BaseShrineRequest,
extractor: R => Option[(Long, QueryResult)]) extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit {
private val authn = AuthenticationInfo("some-domain", "some-user", Credential("alskdjlkasd", false))
def doTestProcessRequestMissingQuery {
val adapter = makeAdapter(dao, MockHttpClient)
val response = adapter.processRequest(BroadcastMessage(0L, authn, makeRequest(-1L, authn)))
response.isInstanceOf[ErrorResponse] should be(true)
}
def doTestProcessInvalidRequest {
val adapter = makeAdapter(dao, MockHttpClient)
intercept[ClassCastException] {
//request must be a type of request we can handle
adapter.processRequest(BroadcastMessage(0L, authn, new AbstractQueryRetrievalTestCase.BogusRequest))
}
}
private val localMasterId = "alksjdkalsdjlasdjlkjsad"
private val shrineNetworkQueryId = 123L
private def doGetResults(adapter: Adapter) = adapter.processRequest(BroadcastMessage(shrineNetworkQueryId, authn, makeRequest(shrineNetworkQueryId, authn)))
private def toMillis(xmlGc: XMLGregorianCalendar): Long = xmlGc.toGregorianCalendar.getTimeInMillis
private val instanceId = 999L
private val setSize = 12345L
private val obfSetSize = setSize + 1
private val queryExpr = Term("foo")
private val topicId = "laskdjlkasd"
private val fooQuery = QueryDefinition("some-query",queryExpr)
def doTestProcessRequestIncompleteQuery(countQueryShouldWork: Boolean = true): Unit = afterCreatingTables {
val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None)
import ResultOutputType._
import XmlDateHelper.now
val breakdowns = Map(PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L)))
val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1))
val startDate = now
val elapsed = 100L
val endDate = {
import XmlGcEnrichments._
import scala.concurrent.duration._
startDate + elapsed.milliseconds
}
val countResultId = 456L
val breakdownResultId = 98237943265436L
val incompleteCountResult = QueryResult(
resultId = countResultId,
instanceId = instanceId,
resultType = Some(PATIENT_COUNT_XML),
setSize = setSize,
startDate = Option(startDate),
endDate = Option(endDate),
description = Some("results from node X"),
statusType = QueryResult.StatusType.Processing,
statusMessage = None,
breakdowns = breakdowns)
val breakdownResult = breakdowns.head match {
case (resultType, data) => incompleteCountResult.withId(breakdownResultId).withBreakdowns(Map(resultType -> data)).withResultType(resultType)
}
val queryStartDate = now
val idsByResultType = dao.insertQueryResults(dbQueryId, incompleteCountResult :: breakdownResult :: Nil)
final class MightWorkMockHttpClient(expectedHiveCredentials: HiveCredentials) extends HttpClient {
override def post(input: String, url: String): HttpResponse = {
def makeFinished(queryResult: QueryResult) = queryResult.copy(statusType = QueryResult.StatusType.Finished)
def validateAuthnAndProjectId(req: ShrineRequest) {
req.authn should equal(expectedHiveCredentials.toAuthenticationInfo)
req.projectId should equal(expectedHiveCredentials.projectId)
}
val response = CrcRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input) match {
case Success(req: ReadResultRequest) if req.localResultId == countResultId.toString => {
validateAuthnAndProjectId(req)
if (countQueryShouldWork) {
ReadResultResponse(123L, makeFinished(incompleteCountResult), I2b2ResultEnvelope(PATIENT_COUNT_XML, Map(PATIENT_COUNT_XML.name -> incompleteCountResult.setSize)))
} else {
ErrorResponse(TestProblem(summary = "Retrieving count result failed"))
}
}
case Success(req: ReadResultRequest) if req.localResultId == breakdownResultId.toString => {
validateAuthnAndProjectId(req)
ReadResultResponse(123L, makeFinished(breakdownResult), breakdowns.head._2)
}
case _ => fail(s"Unknown input: $input")
}
HttpResponse.ok(response.toI2b2String)
}
}
val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, new MightWorkMockHttpClient(AbstractQueryRetrievalTestCase.hiveCredentials))
def getResults = doGetResults(adapter)
getResults.isInstanceOf[ErrorResponse] should be(true)
dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize)
dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns)
//The query shouldn't be 'done', since its status is PROCESSING
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Processing)
//Now, calling processRequest (via getResults) should cause the query to be re-retrieved from the CRC
val result = getResults.asInstanceOf[R]
//Which should cause the query to be re-stored with a 'done' status (since that's what our mock CRC returns)
val expectedStatusType = if (countQueryShouldWork) QueryResult.StatusType.Finished else QueryResult.StatusType.Processing
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(expectedStatusType)
if (!countQueryShouldWork) {
result.isInstanceOf[ErrorResponse] should be(true)
} else {
val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result)
actualNetworkQueryId should equal(shrineNetworkQueryId)
import ObfuscatorTest.within3
actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML))
within3(setSize, actualQueryResult.setSize) should be(true)
actualQueryResult.description should be(Some("results from node X"))
actualQueryResult.statusType should equal(QueryResult.StatusType.Finished)
actualQueryResult.statusMessage should be(Some(QueryResult.StatusType.Finished.name))
actualQueryResult.breakdowns.foreach {
case (rt, I2b2ResultEnvelope(_, data)) => {
data.forall { case (key, value) => within3(value, breakdowns.get(rt).get.data.get(key).get) }
}
}
for {
startDate <- actualQueryResult.startDate
endDate <- actualQueryResult.endDate
} {
val actualElapsed = toMillis(endDate) - toMillis(startDate)
actualElapsed should equal(elapsed)
}
}
}
def doTestProcessRequestQueuedQuery: Unit = afterCreatingTables {
import ResultOutputType._
import XmlDateHelper.now
val startDate = now
val elapsed = 100L
val endDate = {
import XmlGcEnrichments._
import scala.concurrent.duration._
startDate + elapsed.milliseconds
}
val countResultId = 456L
val incompleteCountResult = QueryResult(-1L, -1L, Some(PATIENT_COUNT_XML), -1L, Option(startDate), Option(endDate), Some("results from node X"), QueryResult.StatusType.Queued, None)
dao.inTransaction {
val insertedQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None)
//NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in
//AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete
//queries, will work.
val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(incompleteCountResult))
val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head
dao.insertCountResult(countQueryResultId, -1L, -1L)
}
val queryStartDate = now
object MockHttpClient extends HttpClient {
override def post(input: String, url: String): HttpResponse = ???
}
val adapter: WithHiveCredentialsAdapter = makeAdapter(dao, MockHttpClient)
def getResults = doGetResults(adapter)
getResults.isInstanceOf[ErrorResponse] should be(true)
//The query shouldn't be 'done', since its status is QUEUED
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued)
//Now, calling processRequest (via getResults) should NOT cause the query to be re-retrieved from the CRC, because the query was previously queued
val result = getResults
result.isInstanceOf[ErrorResponse] should be(true)
dao.findResultsFor(shrineNetworkQueryId).get.count.statusType should be(QueryResult.StatusType.Queued)
}
def doTestProcessRequest = afterCreatingTables {
val adapter = makeAdapter(dao, MockHttpClient)
def getResults = doGetResults(adapter)
getResults match {
case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryNotFound].getName)
case x => fail(s"Got $x, not an ErrorResponse")
}
val dbQueryId = dao.insertQuery(localMasterId, shrineNetworkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = false, flagMessage = None)
getResults match {
case errorResponse:ErrorResponse => errorResponse.problemDigest.codec should be (classOf[QueryResultNotAvailable].getName)
case x => fail(s"Got $x, not an ErrorResponse")
}
import ResultOutputType._
import XmlDateHelper.now
val breakdowns = Map(
PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("a" -> 1L, "b" -> 2L)),
PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("x" -> 3L, "y" -> 4L)))
val obfscBreakdowns = breakdowns.mapValues(_.mapValues(_ + 1))
val startDate = now
val elapsed = 100L
val endDate = {
import XmlGcEnrichments._
import scala.concurrent.duration._
startDate + elapsed.milliseconds
}
val countResult = QueryResult(
resultId = 456L,
instanceId = instanceId,
resultType = Some(PATIENT_COUNT_XML),
setSize = setSize,
startDate = Option(startDate),
endDate = Option(endDate),
description = Some("results from node X"),
statusType = QueryResult.StatusType.Finished,
statusMessage = None,
breakdowns = breakdowns
)
val breakdownResults = breakdowns.map {
case (resultType, data) =>
countResult.withBreakdowns(Map(resultType -> data)).withResultType(resultType)
}.toSeq
val queryStartDate = now
val idsByResultType = dao.insertQueryResults(dbQueryId, countResult +: breakdownResults)
getResults.isInstanceOf[ErrorResponse] should be(true)
dao.insertCountResult(idsByResultType(PATIENT_COUNT_XML).head, setSize, obfSetSize)
dao.insertBreakdownResults(idsByResultType, breakdowns, obfscBreakdowns)
val result = getResults.asInstanceOf[R]
val Some((actualNetworkQueryId, actualQueryResult)) = extractor(result)
actualNetworkQueryId should equal(shrineNetworkQueryId)
actualQueryResult.resultType should equal(Some(PATIENT_COUNT_XML))
actualQueryResult.setSize should equal(obfSetSize)
actualQueryResult.description should be(None) //TODO: This is probably wrong
actualQueryResult.statusType should equal(QueryResult.StatusType.Finished)
actualQueryResult.statusMessage should be(None)
actualQueryResult.breakdowns should equal(obfscBreakdowns)
for {
startDate <- actualQueryResult.startDate
endDate <- actualQueryResult.endDate
} {
val actualElapsed = toMillis(endDate) - toMillis(startDate)
actualElapsed should equal(elapsed)
}
}
}
object AbstractQueryRetrievalTestCase {
val hiveCredentials = HiveCredentials("some-hive-domain", "hive-username", "hive-password", "hive-project")
val doObfuscation = true
def runQueryAdapter(dao: AdapterDao, poster: Poster): RunQueryAdapter = {
val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("foo" -> Set("bar"))))
new RunQueryAdapter(
poster = poster,
dao = dao,
hiveCredentials = AbstractQueryRetrievalTestCase.hiveCredentials,
conceptTranslator = translator,
adapterLockoutAttemptsThreshold = 10000,
doObfuscation = doObfuscation,
runQueriesImmediately = true,
breakdownTypes = DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false,
- botCountTimeThresholds = Seq.empty
+ botCountTimeThresholds = Seq.empty,
+ obfuscator = Obfuscator(5,6.5,10)
)
}
import scala.concurrent.duration._
final class BogusRequest extends ShrineRequest("fooProject", 1.second, null) {
override val requestType = null
protected override def i2b2MessageBody: NodeSeq =
override def toXml =
}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ObfuscatorTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ObfuscatorTest.scala
index ad8417d94..5bada7bfb 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ObfuscatorTest.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ObfuscatorTest.scala
@@ -1,95 +1,97 @@
package net.shrine.adapter
import org.junit.Test
import net.shrine.protocol.QueryResult
import net.shrine.protocol.ResultOutputType
import net.shrine.protocol.I2b2ResultEnvelope
import net.shrine.util.ShouldMatchersForJUnit
import net.shrine.protocol.DefaultBreakdownResultOutputTypes
/**
* @author clint
* @since Oct 22, 2012
*/
object ObfuscatorTest {
private def within(range: Long)(a: Long, b: Long) = scala.math.abs(b - a) <= range
def within3 = within(3) _
def within1 = within(1) _
}
final class ObfuscatorTest extends ShouldMatchersForJUnit {
import ObfuscatorTest._
-
+
+ val obfuscator = Obfuscator(1,1.3,3)
+
@Test
def testObfuscateLong() {
val l = 12345L
- val obfuscated = Obfuscator.obfuscate(l)
+ val obfuscated = obfuscator.obfuscate(l)
within3(l, obfuscated) should be(right = true)
}
@Test
def testObfuscateQueryResult() {
import DefaultBreakdownResultOutputTypes._
import ResultOutputType._
val breakdowns = Map(
PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("x" -> 1, "y" -> 42)),
PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("a" -> 123, "b" -> 456)))
def queryResult(resultId: Long, setSize: Long) = QueryResult(
resultId = resultId,
instanceId = 123L,
resultType = Some(PATIENT_COUNT_XML),
setSize = setSize,
startDate = None,
endDate = None,
description = None,
statusType = QueryResult.StatusType.Finished,
statusMessage = None,
breakdowns = breakdowns
)
val resultId1 = 12345L
val setSize1 = 123L
//No breakdowns
{
val noBreakdowns = queryResult(resultId1, setSize1).copy(breakdowns = Map.empty)
noBreakdowns.setSize should equal(setSize1)
noBreakdowns.breakdowns should equal(Map.empty)
- val obfuscated = Obfuscator.obfuscate(noBreakdowns)
+ val obfuscated = obfuscator.obfuscate(noBreakdowns)
within3(noBreakdowns.setSize, obfuscated.setSize)
obfuscated.breakdowns should equal(Map.empty)
}
//breakdowns
{
- val QueryResult(_, _, _, obfscSetSize1, _, _, _, _, _, _, obfscBreakdowns) = Obfuscator.obfuscate(queryResult(resultId1, setSize1))
+ val QueryResult(_, _, _, obfscSetSize1, _, _, _, _, _, _, obfscBreakdowns) = obfuscator.obfuscate(queryResult(resultId1, setSize1))
within3(setSize1, obfscSetSize1) should be(right = true)
breakdowns.keySet should equal(obfscBreakdowns.keySet)
for {
(resultType, obfscEnv) <- obfscBreakdowns
env <- breakdowns.get(resultType)
} {
env.data.keySet should equal(obfscEnv.data.keySet)
for {
(key, value) <- env.data
obfscValue <- obfscEnv.data.get(key)
} {
within3(value, obfscValue) should be(right = true)
}
}
}
}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala
index 1d5a2864b..d192e2342 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadInstanceResultsAdapterTest.scala
@@ -1,44 +1,45 @@
package net.shrine.adapter
import scala.concurrent.duration.DurationInt
import org.junit.Test
import net.shrine.client.Poster
import net.shrine.protocol.ReadInstanceResultsRequest
import net.shrine.protocol.ReadInstanceResultsResponse
import net.shrine.adapter.dao.AdapterDao
import net.shrine.protocol.DefaultBreakdownResultOutputTypes
/**
* @author clint
* @since Nov 7, 2012
*/
final class ReadInstanceResultsAdapterTest extends
AbstractQueryRetrievalTestCase(
(dao, httpClient) => new ReadInstanceResultsAdapter(
Poster("", httpClient),
AbstractQueryRetrievalTestCase.hiveCredentials,
dao,
true,
DefaultBreakdownResultOutputTypes.toSet,
- collectAdapterAudit = false
+ collectAdapterAudit = false,
+ obfuscator = Obfuscator(1,1.3,3)
),
(queryId, authn) => ReadInstanceResultsRequest("some-project-id", 10.seconds, authn, queryId),
ReadInstanceResultsResponse.unapply) {
@Test
def testProcessInvalidRequest = doTestProcessInvalidRequest
@Test
def testProcessRequest = doTestProcessRequest
@Test
def testProcessRequestMissingQuery = doTestProcessRequestMissingQuery
@Test
def testProcessRequestIncompleteQuery = doTestProcessRequestIncompleteQuery(true)
@Test
def testProcessRequestIncompleteQueryCountResultRetrievalFails = doTestProcessRequestIncompleteQuery(false)
@Test
def testProcessRequestQueuedQuery = doTestProcessRequestQueuedQuery
}
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryResultAdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryResultAdapterTest.scala
index d9542ae44..cfbafd58c 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryResultAdapterTest.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/ReadQueryResultAdapterTest.scala
@@ -1,45 +1,46 @@
package net.shrine.adapter
import junit.framework.TestCase
import net.shrine.util.ShouldMatchersForJUnit
import net.shrine.protocol.ReadQueryResultRequest
import net.shrine.protocol.ReadQueryResultResponse
import org.junit.Test
import scala.concurrent.duration._
import net.shrine.client.Poster
import net.shrine.protocol.DefaultBreakdownResultOutputTypes
/**
* @author clint
* @date Nov 7, 2012
*/
final class ReadQueryResultAdapterTest extends
AbstractQueryRetrievalTestCase(
(dao, httpClient) => new ReadQueryResultAdapter(
Poster("", httpClient),
AbstractQueryRetrievalTestCase.hiveCredentials,
dao,
true,
DefaultBreakdownResultOutputTypes.toSet,
- collectAdapterAudit = false
+ collectAdapterAudit = false,
+ obfuscator = Obfuscator(1,1.3,3)
),
(queryId, authn) => ReadQueryResultRequest("some-project-id", 10.seconds, authn, queryId),
ReadQueryResultResponse.unapply) {
@Test
def testProcessInvalidRequest = doTestProcessInvalidRequest
@Test
def testProcessRequest = doTestProcessRequest
@Test
def testProcessRequestMissingQuery = doTestProcessRequestMissingQuery
@Test
def testProcessRequestIncompleteQuery = doTestProcessRequestIncompleteQuery(true)
@Test
def testProcessRequestIncompleteQueryCountResultRetrievalFails = doTestProcessRequestIncompleteQuery(false)
@Test
def testProcessRequestQueuedQuery = doTestProcessRequestQueuedQuery
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/RunQueryAdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/RunQueryAdapterTest.scala
index 9d574b9cf..07338becd 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/RunQueryAdapterTest.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/RunQueryAdapterTest.scala
@@ -1,974 +1,951 @@
package net.shrine.adapter
import scala.concurrent.duration.DurationInt
import org.junit.Test
import net.shrine.util.ShouldMatchersForJUnit
import ObfuscatorTest.within3
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest
import net.shrine.adapter.translators.ExpressionTranslator
import net.shrine.adapter.translators.QueryDefinitionTranslator
import net.shrine.client.HttpClient
import net.shrine.client.HttpResponse
import net.shrine.client.Poster
import net.shrine.protocol.{AuthenticationInfo, BaseShrineResponse, BroadcastMessage, CrcRequest, Credential, DefaultBreakdownResultOutputTypes, ErrorResponse, HiveCredentials, I2b2ResultEnvelope, QueryResult, RawCrcRunQueryResponse, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse}
import net.shrine.protocol.RawCrcRunQueryResponse.toQueryResultMap
import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_AGE_COUNT_XML
import net.shrine.protocol.ResultOutputType.PATIENT_COUNT_XML
import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML
import net.shrine.protocol.DefaultBreakdownResultOutputTypes.PATIENT_RACE_COUNT_XML
import net.shrine.protocol.query.OccuranceLimited
import net.shrine.protocol.query.Or
import net.shrine.protocol.query.QueryDefinition
import net.shrine.protocol.query.Term
import net.shrine.util.XmlDateHelper
import net.shrine.util.XmlUtil
import scala.util.Success
import net.shrine.dao.squeryl.SquerylEntryPoint
import scala.concurrent.duration.Duration
import net.shrine.adapter.dao.model.ShrineError
import net.shrine.adapter.dao.model.QueryResultRow
import net.shrine.problem.TestProblem
/**
* @author Bill Simons
* @author Clint Gilbert
* @since 4/19/11
* @see http://cbmi.med.harvard.edu
*/
final class RunQueryAdapterTest extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit {
private val queryDef = QueryDefinition("foo", Term("foo"))
private val broadcastMessageId = 1234563789L
private val queryId = 123L
private val expectedNetworkQueryId = 999L
private val expectedLocalMasterId = queryId.toString
private val masterId = 99L
private val instanceId = 456L
private val resultId = 42L
private val projectId = "projectId"
private val setSize = 17L
private val xmlResultId = 98765L
private val userId = "userId"
private val groupId = "groupId"
private val topicId = "some-topic-id-123-foo"
private val topicName = "Topic Name"
private val justCounts = Set(PATIENT_COUNT_XML)
private val now = XmlDateHelper.now
private val countQueryResult = QueryResult(resultId, instanceId, Some(PATIENT_COUNT_XML), setSize, Some(now), Some(now), None, QueryResult.StatusType.Finished, None)
private val dummyBreakdownData = Map("x" -> 99L, "y" -> 42L, "z" -> 3000L)
private val hiveCredentials = HiveCredentials("some-hive-domain", "hive-username", "hive-password", "hive-project")
private val authn = AuthenticationInfo("some-domain", "username", Credential("jksafhkjaf", false))
private val adapterLockoutThreshold = 99
private val altI2b2ErrorXml = XmlUtil.stripWhitespace {
1.1
2.4
edu.harvard.i2b2.crc
1.5
i2b2 Hive
i2b2_QueryTool
0.2
i2b2 Hive
1
i2b2
Log information
DONE
Query result instance id 3126 not found
}.toString
private val otherNetworkId: Long = 12345L
@Test
def testProcessRawCrcRunQueryResponseCountQueryOnly: Unit = afterCreatingTables{
val outputTypes = Set(PATIENT_COUNT_XML)
val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("network" -> Set("local1a", "local1b"))))
val adapter = new RunQueryAdapter(
Poster("crc-url", null),
dao,
hiveCredentials,
translator,
adapterLockoutThreshold,
doObfuscation = false,
runQueriesImmediately = true,
breakdownTypes = DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false,
- botCountTimeThresholds = Seq.empty
+ botCountTimeThresholds = Seq.empty,
+ obfuscator = Obfuscator(1,1.3,3)
)
val request = RunQueryRequest(projectId, 1.second, authn, expectedNetworkQueryId, Option(topicId), Option(topicName), outputTypes, queryDef)
val networkAuthn = AuthenticationInfo("some-domain", "username", Credential("sadasdasdasd", false))
val broadcastMessage = BroadcastMessage(queryId, networkAuthn, request)
val rawRunQueryResponse = RawCrcRunQueryResponse(
queryId = queryId,
createDate = XmlDateHelper.now,
userId = request.authn.username,
groupId = request.authn.domain,
requestXml = request.queryDefinition,
queryInstanceId = otherNetworkId,
singleNodeResults = toQueryResultMap(Seq(countQueryResult)))
val resp = adapter.processRawCrcRunQueryResponse(networkAuthn, request, rawRunQueryResponse).asInstanceOf[RunQueryResponse]
resp should not be (null)
//Validate the response
resp.createDate should not be(null)
resp.groupId should be(request.authn.domain)
resp.userId should be(request.authn.username)
resp.queryId should be(queryId)
resp.queryInstanceId should be(otherNetworkId)
resp.requestXml should equal(request.queryDefinition)
(countQueryResult eq resp.singleNodeResult) should be(false)
within3(resp.singleNodeResult.setSize, countQueryResult.setSize) should be(true)
resp.singleNodeResult.resultType.get should equal(PATIENT_COUNT_XML)
resp.singleNodeResult.breakdowns should equal(Map.empty)
//validate the DB
val expectedNetworkTerm = queryDef.expr.get.asInstanceOf[Term]
//We should have one row in the shrine_query table, for the query just performed
val Seq(queryRow) = list(queryRows)
{
queryRow.dateCreated should not be (null)
queryRow.domain should equal(request.authn.domain)
queryRow.name should equal(queryDef.name)
queryRow.localId should equal(expectedLocalMasterId)
queryRow.networkId should equal(expectedNetworkQueryId)
queryRow.username should equal(authn.username)
queryRow.queryDefinition.expr.get should equal(expectedNetworkTerm)
}
//We should have one row in the count_result table, with the right obfuscated value, which is within the expected amount from the original count
val Seq(countRow) = list(countResultRows)
{
countRow.creationDate should not be (null)
countRow.originalValue should equal(countQueryResult.setSize)
within3(countRow.obfuscatedValue, countRow.originalValue) should be(true)
}
}
@Test
def testProcessRawCrcRunQueryResponseCountAndBreakdownQuery: Unit = afterCreatingTables {
val allBreakdownTypes = DefaultBreakdownResultOutputTypes.toSet
val breakdownTypes = Seq(PATIENT_GENDER_COUNT_XML)
val outputTypes = Set(PATIENT_COUNT_XML) ++ breakdownTypes
val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("network" -> Set("local1a", "local1b"))))
val request = RunQueryRequest(projectId, 1.second, authn, expectedNetworkQueryId, Option(topicId), Option(topicName), outputTypes, queryDef)
val networkAuthn = AuthenticationInfo("some-domain", "username", Credential("sadasdasdasd", false))
val broadcastMessage = BroadcastMessage(queryId, networkAuthn, request)
val breakdownQueryResults = breakdownTypes.zipWithIndex.map {
case (rt, i) =>
countQueryResult.withId(resultId + i + 1).withResultType(rt)
}
val singleNodeResults = toQueryResultMap(countQueryResult +: breakdownQueryResults)
val rawRunQueryResponse = RawCrcRunQueryResponse(
queryId = queryId,
createDate = XmlDateHelper.now,
userId = request.authn.username,
groupId = request.authn.domain,
requestXml = request.queryDefinition,
queryInstanceId = otherNetworkId,
singleNodeResults = singleNodeResults)
//Set up our mock CRC
val poster = Poster("crc-url", new HttpClient {
def post(input: String, url: String): HttpResponse = HttpResponse.ok {
(RunQueryRequest.fromI2b2String(allBreakdownTypes)(input) orElse ReadResultRequest.fromI2b2String(allBreakdownTypes)(input)).get match {
case runQueryReq: RunQueryRequest => rawRunQueryResponse.toI2b2String
case readResultReq: ReadResultRequest => ReadResultResponse(xmlResultId = 42L, metadata = breakdownQueryResults.head, data = I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, dummyBreakdownData)).toI2b2String
case _ => sys.error(s"Unknown request: '$input'") //Fail loudly
}
}
})
val adapter = RunQueryAdapter(
poster = poster,
dao = dao,
hiveCredentials = hiveCredentials,
conceptTranslator = translator,
adapterLockoutAttemptsThreshold = adapterLockoutThreshold,
doObfuscation = false,
runQueriesImmediately = true,
breakdownTypes = DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false,
- botCountTimeThresholds = Seq.empty
+ botCountTimeThresholds = Seq.empty,
+ obfuscator = Obfuscator(1,1.3,3)
)
val resp = adapter.processRawCrcRunQueryResponse(networkAuthn, request, rawRunQueryResponse).asInstanceOf[RunQueryResponse]
resp should not be (null)
//Validate the response
resp.createDate should not be(null)
resp.groupId should be(request.authn.domain)
resp.userId should be(request.authn.username)
resp.queryId should be(queryId)
resp.queryInstanceId should be(otherNetworkId)
resp.requestXml should equal(request.queryDefinition)
(countQueryResult eq resp.singleNodeResult) should be(false)
within3(resp.singleNodeResult.setSize, countQueryResult.setSize) should be(true)
resp.singleNodeResult.resultType.get should equal(PATIENT_COUNT_XML)
resp.singleNodeResult.breakdowns.keySet should equal(Set(PATIENT_GENDER_COUNT_XML))
val breakdownEnvelope = resp.singleNodeResult.breakdowns.values.head
breakdownEnvelope.resultType should equal(PATIENT_GENDER_COUNT_XML)
breakdownEnvelope.data.keySet should equal(dummyBreakdownData.keySet)
//All breakdowns are obfuscated
for {
(key, value) <- breakdownEnvelope.data
} {
within3(value, dummyBreakdownData(key)) should be(true)
}
//validate the DB
val expectedNetworkTerm = queryDef.expr.get.asInstanceOf[Term]
//We should have one row in the shrine_query table, for the query just performed
val Seq(queryRow) = list(queryRows)
{
queryRow.dateCreated should not be (null)
queryRow.domain should equal(request.authn.domain)
queryRow.name should equal(queryDef.name)
queryRow.localId should equal(expectedLocalMasterId)
queryRow.networkId should equal(expectedNetworkQueryId)
queryRow.username should equal(authn.username)
queryRow.queryDefinition.expr.get should equal(expectedNetworkTerm)
}
//We should have one row in the count_result table, with the right obfuscated value, which is within the expected amount from the original count
val Seq(countRow) = list(countResultRows)
{
countRow.creationDate should not be (null)
countRow.originalValue should equal(countQueryResult.setSize)
within3(countRow.obfuscatedValue, countRow.originalValue) should be(true)
}
val breakdownRows @ Seq(xRow, yRow, zRow) = list(breakdownResultRows)
breakdownRows.map(_.dataKey).toSet should equal(dummyBreakdownData.keySet)
within3(xRow.obfuscatedValue, xRow.originalValue) should be(true)
xRow.originalValue should be(dummyBreakdownData(xRow.dataKey))
within3(yRow.obfuscatedValue, yRow.originalValue) should be(true)
yRow.originalValue should be(dummyBreakdownData(yRow.dataKey))
within3(zRow.obfuscatedValue, zRow.originalValue) should be(true)
zRow.originalValue should be(dummyBreakdownData(zRow.dataKey))
}
//NB: See https://open.med.harvard.edu/jira/browse/SHRINE-745
@Test
def testParseAltErrorXml {
val adapter = RunQueryAdapter(
poster = Poster("crc-url", null),
dao = null,
hiveCredentials = hiveCredentials,
conceptTranslator = null,
adapterLockoutAttemptsThreshold = adapterLockoutThreshold,
doObfuscation = false,
runQueriesImmediately = false,
breakdownTypes = DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false,
- botCountTimeThresholds = Seq.empty
+ botCountTimeThresholds = Seq.empty,
+ obfuscator = Obfuscator(5,6.5,10)
)
val resp: ErrorResponse = adapter.parseShrineErrorResponseWithFallback(altI2b2ErrorXml).asInstanceOf[ErrorResponse]
resp should not be (null)
resp.errorMessage should be("Query result instance id 3126 not found")
}
@Test
def testParseErrorXml {
val xml = {
1.1
2.4
edu.harvard.i2b2.crc
1.4
i2b2 Hive
i2b2web
1.4
i2b2 Hive
1
Demo
Log information
Message error connecting Project Management cell
admin
0
0
CRC_QRY_runQueryInstance_fromQueryDefinition
Age
0
1
0
0
1
-
2
Age
\\i2b2\i2b2\Demographics\Age\
concept_dimension
concept_path
\i2b2\Demographics\Age\
T
concept_cd
false
}.toString
val adapter = RunQueryAdapter(
poster = Poster("crc-url", null),
dao = null,
hiveCredentials = hiveCredentials,
conceptTranslator = null,
adapterLockoutAttemptsThreshold = adapterLockoutThreshold,
doObfuscation = false,
runQueriesImmediately = true,
breakdownTypes = DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false,
- botCountTimeThresholds = Seq.empty
+ botCountTimeThresholds = Seq.empty,
+ obfuscator = Obfuscator(5,6.5,10)
)
val resp = adapter.parseShrineErrorResponseWithFallback(xml).asInstanceOf[ErrorResponse]
resp should not be (null)
resp.errorMessage should not be ("")
}
- @Test
- def testObfuscateBreakdowns {
- val breakdown1 = I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map.empty)
- val breakdown2 = I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("foo" -> 123, "bar" -> 345))
- val breakdown3 = I2b2ResultEnvelope(PATIENT_RACE_COUNT_XML, Map("x" -> 999, "y" -> 888))
-
- val original = Map.empty ++ Seq(breakdown1, breakdown2, breakdown3).map(env => (env.resultType, env))
-
- val obfuscated = RunQueryAdapter.obfuscateBreakdowns(original)
-
- original.keySet should equal(obfuscated.keySet)
-
- original.keySet.forall(resultType => original(resultType).data.keySet == obfuscated(resultType).data.keySet) should be(true)
-
- val localTerms = Set("local1a", "local1b")
-
- for {
- (resultType, origBreakdown) <- original
-
- mappings = Map("network" -> localTerms)
-
- translator = new QueryDefinitionTranslator(new ExpressionTranslator(mappings))
- obfscBreakdown <- obfuscated.get(resultType)
- key <- origBreakdown.data.keySet
- } {
- (origBreakdown eq obfscBreakdown) should be(false)
-
- ObfuscatorTest.within3(origBreakdown.data(key), obfscBreakdown.data(key)) should be(true)
- }
- }
-
@Test
def testTranslateNetworkToLocalDoesntLeakCredentialsViaException: Unit = {
val mappings = Map.empty[String, Set[String]]
val translator = new QueryDefinitionTranslator(new ExpressionTranslator(mappings))
val adapter = RunQueryAdapter(
poster = Poster("crc-url", MockHttpClient),
dao = null,
hiveCredentials = null,
conceptTranslator = translator,
adapterLockoutAttemptsThreshold = adapterLockoutThreshold,
doObfuscation = false,
runQueriesImmediately = true,
breakdownTypes = DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false,
- botCountTimeThresholds = Seq.empty
+ botCountTimeThresholds = Seq.empty,
+ obfuscator = Obfuscator(5,6.5,10)
)
val queryDefinition = QueryDefinition("foo", Term("blah"))
val authn = AuthenticationInfo("d", "u", Credential("p", false))
val req = RunQueryRequest("projectId", Duration.Inf, authn, otherNetworkId, None, None, Set.empty, queryDef)
try {
adapter.translateNetworkToLocal(req)
fail("Expected an AdapterMappingException")
} catch {
case e: AdapterMappingException => {
e.getMessage.contains(authn.rawToString) should be(false)
e.getMessage.contains(AuthenticationInfo.elided.toString) should be(true)
}
}
}
@Test
def testTranslateQueryDefinitionXml {
val localTerms = Set("local1a", "local1b")
val mappings = Map("network" -> localTerms)
val translator = new QueryDefinitionTranslator(new ExpressionTranslator(mappings))
val adapter = RunQueryAdapter(
poster = Poster("crc-url", MockHttpClient),
dao = null,
hiveCredentials = null,
conceptTranslator = translator,
adapterLockoutAttemptsThreshold = adapterLockoutThreshold,
doObfuscation = false,
runQueriesImmediately = true,
breakdownTypes = DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false,
- botCountTimeThresholds = Seq.empty
+ botCountTimeThresholds = Seq.empty,
+ obfuscator = Obfuscator(5,6.5,10)
)
val queryDefinition = QueryDefinition("10-17 years old@14:39:20", OccuranceLimited(1, Term("network")))
val newDef = adapter.conceptTranslator.translate(queryDefinition)
val expected = QueryDefinition("10-17 years old@14:39:20", Or(Term("local1a"), Term("local1b")))
newDef should equal(expected)
}
@Test
def testQueuedRegularCountQuery: Unit = afterCreatingTables {
val adapter = RunQueryAdapter(
poster = Poster("crc-url", MockHttpClient),
dao = dao,
hiveCredentials = null,
conceptTranslator = null,
adapterLockoutAttemptsThreshold = adapterLockoutThreshold,
doObfuscation = false,
runQueriesImmediately = false,
breakdownTypes = DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false,
- botCountTimeThresholds = Seq.empty
+ botCountTimeThresholds = Seq.empty,
+ obfuscator = Obfuscator(5,6.5,10)
)
val networkAuthn = AuthenticationInfo("nd", "nu", Credential("np", false))
import scala.concurrent.duration._
val req = RunQueryRequest(projectId, 1.second, authn, expectedNetworkQueryId, Option(topicId), Option(topicName), Set(PATIENT_COUNT_XML), queryDef)
val broadcastMessage = BroadcastMessage(queryId, networkAuthn, req)
val resp = adapter.processRequest(broadcastMessage).asInstanceOf[RunQueryResponse]
resp.groupId should equal(networkAuthn.domain)
resp.createDate should not be (null) // :\
resp.queryId should equal(-1L)
resp.queryInstanceId should equal(-1L)
resp.requestXml should equal(queryDef)
resp.userId should equal(networkAuthn.username)
resp.singleNodeResult.breakdowns should equal(Map.empty)
resp.singleNodeResult.description.isDefined should be(true)
resp.singleNodeResult.elapsed should equal(Some(0L))
resp.singleNodeResult.endDate.isDefined should be(true)
resp.singleNodeResult.startDate.isDefined should be(true)
resp.singleNodeResult.instanceId should equal(-1L)
resp.singleNodeResult.isError should be(false)
resp.singleNodeResult.resultId should equal(-1L)
resp.singleNodeResult.resultType should be(Some(PATIENT_COUNT_XML))
resp.singleNodeResult.setSize should equal(-1L)
resp.singleNodeResult.statusMessage.isDefined should be(true)
resp.singleNodeResult.statusType should be(QueryResult.StatusType.Held)
resp.singleNodeResult.endDate.isDefined should be(true)
val Some(storedQuery) = dao.findQueryByNetworkId(expectedNetworkQueryId)
storedQuery.dateCreated should not be (null) // :\
storedQuery.domain should equal(networkAuthn.domain)
storedQuery.isFlagged should equal(false)
storedQuery.localId should equal(-1L.toString)
storedQuery.name should equal(queryDef.name)
storedQuery.networkId should equal(expectedNetworkQueryId)
storedQuery.queryDefinition should equal(queryDef)
storedQuery.username should equal(networkAuthn.username)
}
private def doTestRegularCountQuery(status: QueryResult.StatusType, count: Long) = afterCreatingTables {
require(!status.isError)
val countQueryResultToUse = countQueryResult.copy(statusType = status, setSize = count)
val outputTypes = justCounts
val resp = doQuery(outputTypes) {
import RawCrcRunQueryResponse.toQueryResultMap
RawCrcRunQueryResponse(queryId, now, userId, groupId, queryDef, instanceId, toQueryResultMap(Seq(countQueryResultToUse))).toI2b2String
}.asInstanceOf[RunQueryResponse]
doBasicRunQueryResponseTest(resp)
val firstResult = resp.results.head
resp.results should equal(Seq(firstResult))
val Some(savedQuery) = dao.findResultsFor(expectedNetworkQueryId)
savedQuery.wasRun should equal(true)
savedQuery.isFlagged should equal(false)
savedQuery.networkQueryId should equal(expectedNetworkQueryId)
savedQuery.breakdowns should equal(Nil)
savedQuery.count.creationDate should not be (null)
savedQuery.count.localId should equal(countQueryResultToUse.resultId)
//savedQuery.count.resultId should equal(resultId) TODO: REVISIT
savedQuery.count.statusType should equal(status)
if (status.isDone && !status.isError) {
savedQuery.count.data.get.startDate should not be (null)
savedQuery.count.data.get.endDate should not be (null)
savedQuery.count.data.get.originalValue should be(count)
ObfuscatorTest.within3(savedQuery.count.data.get.obfuscatedValue, count) should be(true)
} else {
savedQuery.count.data should be(None)
}
}
@Test
def testRegularCountQuery = doTestRegularCountQuery(QueryResult.StatusType.Finished, countQueryResult.setSize)
@Test
def testRegularCountQueryComesBackProcessing = doTestRegularCountQuery(QueryResult.StatusType.Processing, -1L)
@Test
def testRegularCountQueryComesBackQueued = doTestRegularCountQuery(QueryResult.StatusType.Queued, -1L)
@Test
def testRegularCountQueryComesBackError = afterCreatingTables {
val errorQueryResult = QueryResult.errorResult(Some("some-description"), "some-status-message",TestProblem())
val outputTypes = justCounts
val resp = doQuery(outputTypes) {
import RawCrcRunQueryResponse.toQueryResultMap
RawCrcRunQueryResponse(queryId, now, userId, groupId, queryDef, instanceId, toQueryResultMap(Seq(errorQueryResult))).toI2b2String
}
doBasicRunQueryResponseTest(resp)
//TODO: Why are status and description messages from CRC dropped when unmarshalling QueryResults?
//resp.results should equal(Seq(errorQueryResult))
resp.asInstanceOf[RunQueryResponse].results.head.statusType should be(QueryResult.StatusType.Error)
dao.findResultsFor(expectedNetworkQueryId) should be(None)
val Some(savedQueryRow) = dao.findQueryByNetworkId(expectedNetworkQueryId)
val Seq(queryResultRow: QueryResultRow) = {
import SquerylEntryPoint._
implicit val breakdownTypes = DefaultBreakdownResultOutputTypes.toSet
inTransaction {
from(tables.queryResults) { row =>
where(row.queryId === savedQueryRow.id).
select(row.toQueryResultRow)
}.toSeq
}
}
val Seq(errorRow: ShrineError) = {
import SquerylEntryPoint._
inTransaction {
from(tables.errorResults) { row =>
where(row.resultId === queryResultRow.id).
select(row.toShrineError)
}.toSeq
}
}
errorRow should not be (null)
//TODO: ErrorMessage
//errorRow.message should equal(errorQueryResult.statusMessage)
}
private def doTestBreakdownsAreObfuscated(result: QueryResult): Unit = {
result.breakdowns.values.map(_.data).foreach { actualBreakdowns =>
actualBreakdowns.keySet should equal(dummyBreakdownData.keySet)
for {
breakdownName <- actualBreakdowns.keySet
} {
within3(actualBreakdowns(breakdownName), dummyBreakdownData(breakdownName)) should be(true)
}
}
}
@Test
def testGetBreakdownsWithRegularCountQuery {
val breakdowns = DefaultBreakdownResultOutputTypes.values.map(breakdownFor)
val resp = doTestGetBreakdowns(breakdowns)
val firstResult = resp.results.head
firstResult.resultType should equal(Some(PATIENT_COUNT_XML))
firstResult.setSize should equal(setSize)
firstResult.description should equal(None)
firstResult.breakdowns.keySet should equal(DefaultBreakdownResultOutputTypes.toSet)
//NB: Verify that breakdowns are obfuscated
doTestBreakdownsAreObfuscated(firstResult)
resp.results.size should equal(1)
}
@Test
def testGetBreakdownsSomeFailures {
val resultTypesExpectedToSucceed = Seq(PATIENT_AGE_COUNT_XML, PATIENT_GENDER_COUNT_XML)
val breakdowns = resultTypesExpectedToSucceed.map(breakdownFor)
val resp = doTestGetBreakdowns(breakdowns)
val firstResult = resp.results.head
firstResult.resultType should equal(Some(PATIENT_COUNT_XML))
firstResult.setSize should equal(setSize)
firstResult.description should equal(None)
firstResult.breakdowns.keySet should equal(resultTypesExpectedToSucceed.toSet)
//NB: Verify that breakdowns are obfuscated
doTestBreakdownsAreObfuscated(firstResult)
resp.results.size should equal(1)
}
@Test
def testErrorResponsesArePassedThrough: Unit = {
val errorResponse = ErrorResponse(TestProblem(summary = "blarg!"))
val resp = doQuery(Set(PATIENT_COUNT_XML)) {
errorResponse.toI2b2String
}
resp should equal(errorResponse)
}
private def breakdownFor(resultType: ResultOutputType) = I2b2ResultEnvelope(resultType, dummyBreakdownData)
private def doTestGetBreakdowns(successfulBreakdowns: Seq[I2b2ResultEnvelope]): RunQueryResponse = {
val outputTypes = justCounts ++ DefaultBreakdownResultOutputTypes.toSet
val resp = doQueryThatReturnsSpecifiedBreakdowns(outputTypes, successfulBreakdowns)
doBasicRunQueryResponseTest(resp)
resp
}
private def doBasicRunQueryResponseTest(r: BaseShrineResponse) {
val resp = r.asInstanceOf[RunQueryResponse]
resp.createDate should equal(now)
resp.groupId should equal(groupId)
resp.queryId should equal(queryId)
resp.queryInstanceId should equal(instanceId)
resp.queryName should equal(queryDef.name)
resp.requestXml should equal(queryDef)
}
private def doQueryThatReturnsSpecifiedBreakdowns(outputTypes: Set[ResultOutputType], successfulBreakdowns: Seq[I2b2ResultEnvelope]): RunQueryResponse = afterCreatingTablesReturn {
val breakdownQueryResults = DefaultBreakdownResultOutputTypes.values.zipWithIndex.map {
case (rt, i) =>
countQueryResult.withId(resultId + i + 1).withResultType(rt)
}
//Need this rigamarole to ensure that resultIds line up such that the type of breakdown the adapter asks for
//(PATIENT_AGE_COUNT_XML, etc) is what the mock HttpClient actually returns. Here, we build up maps of QueryResults
//and I2b2ResultEnvelopes, keyed on resultIds generated in the previous expression, to use to look up values to use
//to build ReadResultResponses
val successfulBreakdownsByType = successfulBreakdowns.map(e => e.resultType -> e).toMap
val successfulBreakdownTypes = successfulBreakdownsByType.keySet
val breakdownQueryResultsByResultId = breakdownQueryResults.collect { case qr if successfulBreakdownTypes(qr.resultType.get) => qr.resultId -> qr }.toMap
val breakdownsToBeReturnedByResultId = breakdownQueryResultsByResultId.map {
case (resultId, queryResult) => (resultId, successfulBreakdownsByType(queryResult.resultType.get))
}
val expectedLocalTerm = Term("bar")
val httpClient = new HttpClient {
override def post(input: String, url: String): HttpResponse = {
val resp = CrcRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input) match {
case Success(req: RunQueryRequest) => {
//NB: Terms should be translated
req.queryDefinition.expr.get should equal(expectedLocalTerm)
//Credentials should be "translated"
req.authn.username should equal(hiveCredentials.username)
req.authn.domain should equal(hiveCredentials.domain)
//I2b2 Project ID should be translated
req.projectId should equal(hiveCredentials.projectId)
val queryResultMap = RawCrcRunQueryResponse.toQueryResultMap(countQueryResult +: breakdownQueryResults)
RawCrcRunQueryResponse(queryId, now, "userId", "groupId", queryDef, instanceId, queryResultMap)
}
//NB: return a ReadResultResponse with new breakdown data each time, but will throw if the asked-for breakdown
//is not one of the ones passed to the enclosing method, simulating an error calling the CRC
case Success(req: ReadResultRequest) => {
val resultId = req.localResultId.toLong
ReadResultResponse(xmlResultId, breakdownQueryResultsByResultId(resultId), breakdownsToBeReturnedByResultId(resultId))
}
case _ => ??? //fail loudly
}
HttpResponse.ok(resp.toI2b2String)
}
}
val result = doQuery(outputTypes, dao, httpClient)
validateDb(successfulBreakdowns, breakdownQueryResultsByResultId)
result.asInstanceOf[RunQueryResponse]
}
private def validateDb(breakdownsReturned: Seq[I2b2ResultEnvelope], breakdownQueryResultsByResultId: Map[Long, QueryResult]) {
val expectedNetworkTerm = Term("foo")
//We should have one row in the shrine_query table, for the query just performed
val queryRow = first(queryRows)
{
queryRow.dateCreated should not be (null)
queryRow.domain should equal(authn.domain)
queryRow.name should equal(queryDef.name)
queryRow.localId should equal(expectedLocalMasterId)
queryRow.networkId should equal(expectedNetworkQueryId)
queryRow.username should equal(authn.username)
queryRow.queryDefinition.expr.get should equal(expectedNetworkTerm)
}
list(queryRows).size should equal(1)
//We should have one row in the count_result table, with the right obfuscated value, which is within the expected amount from the original count
val countRow = first(countResultRows)
{
countRow.creationDate should not be (null)
countRow.originalValue should equal(countQueryResult.setSize)
within3(countRow.obfuscatedValue, countQueryResult.setSize) should be(true)
within3(countRow.obfuscatedValue, countRow.originalValue) should be(true)
}
list(countResultRows).size should equal(1)
//We should have 5 rows in the query_result table, one for the count result and one for each of the 4 requested breakdown types
val queryResults = list(queryResultRows)
{
val countQueryResultRow = queryResults.find(_.resultType == PATIENT_COUNT_XML).get
countQueryResultRow.localId should equal(countQueryResult.resultId)
countQueryResultRow.queryId should equal(queryRow.id)
val resultIdsByResultType = breakdownQueryResultsByResultId.map { case (resultId, queryResult) => queryResult.resultType.get -> resultId }.toMap
for (breakdownType <- DefaultBreakdownResultOutputTypes.values) {
val breakdownQueryResultRow = queryResults.find(_.resultType == breakdownType).get
breakdownQueryResultRow.queryId should equal(queryRow.id)
//We'll have a result id if this breakdown type didn't fail
if (resultIdsByResultType.contains(breakdownQueryResultRow.resultType)) {
breakdownQueryResultRow.localId should equal(resultIdsByResultType(breakdownQueryResultRow.resultType))
}
}
}
queryResults.size should equal(5)
val returnedBreakdownTypes = breakdownsReturned.map(_.resultType).toSet
val notReturnedBreakdownTypes = DefaultBreakdownResultOutputTypes.toSet -- returnedBreakdownTypes
val errorResults = list(errorResultRows)
//We should have a row in the error_result table for each breakdown that COULD NOT be retrieved
{
for {
queryResult <- queryResults
if notReturnedBreakdownTypes.contains(queryResult.resultType)
resultType = queryResult.resultType
resultId = queryResult.id
} {
errorResults.find(_.resultId == resultId).isDefined should be(true)
}
}
errorResults.size should equal(notReturnedBreakdownTypes.size)
//We should have properly-obfuscated rows in the breakdown_result table for each of the breakdown types that COULD be retrieved
val breakdownResults = list(breakdownResultRows)
val bdrs = breakdownResults.toIndexedSeq
{
for {
queryResult <- queryResults
if returnedBreakdownTypes.contains(queryResult.resultType)
resultType = queryResult.resultType
resultId = queryResult.id
} {
//Find all the rows for a particular breakdown type
val rowsWithType = breakdownResults.filter(_.resultId == resultId)
//Combining the rows should give the expected dummy data
rowsWithType.map(row => row.dataKey -> row.originalValue).toMap should equal(dummyBreakdownData)
for (breakdownRow <- rowsWithType) {
within3(breakdownRow.obfuscatedValue, dummyBreakdownData(breakdownRow.dataKey)) should be(true)
}
}
}
}
private def doQuery(outputTypes: Set[ResultOutputType])(i2b2XmlToReturn: => String): BaseShrineResponse = {
doQuery(outputTypes, dao, MockHttpClient(i2b2XmlToReturn))
}
private def doQuery(outputTypes: Set[ResultOutputType], adapterDao: AdapterDao, httpClient: HttpClient): BaseShrineResponse = {
val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("foo" -> Set("bar"))))
//NB: Don't obfuscate, for simpler testing
val adapter = RunQueryAdapter(
poster = Poster("crc-url", httpClient),
dao = adapterDao,
hiveCredentials = hiveCredentials,
conceptTranslator = translator,
adapterLockoutAttemptsThreshold = adapterLockoutThreshold,
doObfuscation = false,
runQueriesImmediately = true,
breakdownTypes = DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false,
- botCountTimeThresholds = Seq.empty
+ botCountTimeThresholds = Seq.empty,
+ obfuscator = Obfuscator(1,1.3,3)
)
import scala.concurrent.duration._
val req = RunQueryRequest(projectId, 1.second, authn, expectedNetworkQueryId, Option(topicId), Option(topicName), outputTypes, queryDef)
val networkAuthn = AuthenticationInfo("some-domain", "username", Credential("sadasdasdasd", false))
val broadcastMessage = BroadcastMessage(queryId, networkAuthn, req)
adapter.processRequest(broadcastMessage)
}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/I2b2AdminResourceEndToEndJaxrsTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/I2b2AdminResourceEndToEndJaxrsTest.scala
index 24889b1b9..b693cc480 100644
--- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/I2b2AdminResourceEndToEndJaxrsTest.scala
+++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/service/I2b2AdminResourceEndToEndJaxrsTest.scala
@@ -1,189 +1,190 @@
package net.shrine.adapter.service
import org.junit.Test
-import net.shrine.adapter.HasI2b2AdminDao
-import net.shrine.protocol.{HiveCredentials, ReadI2b2AdminPreviousQueriesRequest, ReadI2b2AdminQueryingUsersRequest, ReadI2b2AdminQueryingUsersResponse, I2b2AdminUserWithRole, ErrorResponse, RunHeldQueryRequest, RunQueryResponse, RunQueryRequest, ResultOutputType, QueryResult, BroadcastMessage, AuthenticationInfo, Credential, DefaultBreakdownResultOutputTypes}
+import net.shrine.adapter.{HasI2b2AdminDao, Obfuscator, RunQueryAdapter}
+import net.shrine.protocol.{AuthenticationInfo, BroadcastMessage, Credential, DefaultBreakdownResultOutputTypes, ErrorResponse, HiveCredentials, I2b2AdminUserWithRole, QueryResult, ReadI2b2AdminPreviousQueriesRequest, ReadI2b2AdminQueryingUsersRequest, ReadI2b2AdminQueryingUsersResponse, ResultOutputType, RunHeldQueryRequest, RunQueryRequest, RunQueryResponse}
import net.shrine.client.Poster
-import net.shrine.adapter.RunQueryAdapter
import net.shrine.adapter.translators.QueryDefinitionTranslator
import net.shrine.adapter.translators.ExpressionTranslator
import net.shrine.client.HttpClient
import net.shrine.client.HttpResponse
import net.shrine.protocol.query.Term
+
import scala.util.Success
import net.shrine.util.XmlDateHelper
import net.shrine.protocol.query.QueryDefinition
/**
* @author clint
* @since Apr 12, 2013
*
* NB: Ideally we would extend JerseyTest here, but since we have to extend AbstractDependencyInjectionSpringContextTests,
* we get into a diamond-problem when extending JerseyTest as well, even when both of them are extended by shim traits.
*
* We work around this issue by mising in JerseyTestCOmponent, which brings in a JerseyTest by composition, and ensures
* that it is set up and torn down properly.
*/
final class I2b2AdminResourceEndToEndJaxrsTest extends AbstractI2b2AdminResourceJaxrsTest with HasI2b2AdminDao {
private[this] val dummyUrl = "http://example.com"
private[this] val dummyText = "This is dummy text"
private[this] val dummyMasterId = 873456L
private[this] val dummyInstanceId = 99L
private[this] val dummyResultId = 42L
private[this] val dummySetSize = 12345L
private[this] val networkAuthn = AuthenticationInfo("network-domain", "network-username", Credential("network-password", false))
private lazy val runQueryAdapter: RunQueryAdapter = {
val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("n1" -> Set("l1"))))
val poster = new Poster(dummyUrl, new HttpClient {
override def post(input: String, url: String): HttpResponse = {
RunQueryRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input) match {
case Success(req) => {
val queryResult = QueryResult(dummyResultId, dummyInstanceId, Some(ResultOutputType.PATIENT_COUNT_XML), dummySetSize, Some(XmlDateHelper.now), Some(XmlDateHelper.now), Some("desc"), QueryResult.StatusType.Finished, Some("status"))
val resp = RunQueryResponse(dummyMasterId, XmlDateHelper.now, networkAuthn.username, networkAuthn.domain, req.queryDefinition, 123L, queryResult)
HttpResponse.ok(resp.toI2b2String)
}
case _ => ???
}
}
})
RunQueryAdapter(
poster = poster,
dao = dao,
hiveCredentials = HiveCredentials("d", "u", "pwd", "pid"),
conceptTranslator = translator,
adapterLockoutAttemptsThreshold = 1000,
doObfuscation = false,
runQueriesImmediately = true,
breakdownTypes = DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false,
- botCountTimeThresholds = Seq.empty
+ botCountTimeThresholds = Seq.empty,
+ obfuscator = Obfuscator(5,6.5,10)
)
}
override def makeHandler = new I2b2AdminService(dao, i2b2AdminDao, Poster(dummyUrl, AlwaysAuthenticatesMockPmHttpClient), runQueryAdapter)
@Test
def testReadQueryDefinition = afterLoadingTestData {
doTestReadQueryDefinition(networkQueryId1, Some((queryName1, queryDef1)))
}
@Test
def testReadQueryDefinitionUnknownQueryId = afterLoadingTestData {
doTestReadQueryDefinition(87134682364L, None)
}
import ReadI2b2AdminPreviousQueriesRequest.{Username, Category, SortOrder}
import Username._
@Test
def testReadI2b2AdminPreviousQueries = afterLoadingTestData {
val searchString = queryName1
val maxResults = 123
val sortOrder = ReadI2b2AdminPreviousQueriesRequest.SortOrder.Ascending
val categoryToSearchWithin = ReadI2b2AdminPreviousQueriesRequest.Category.All
val searchStrategy = ReadI2b2AdminPreviousQueriesRequest.Strategy.Exact
val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, All, searchString, maxResults, None, sortOrder, searchStrategy, categoryToSearchWithin)
doTestReadI2b2AdminPreviousQueries(request, Seq(queryMaster1))
}
@Test
def testReadI2b2AdminPreviousQueriesNoResultsExpected = afterLoadingTestData {
//A request that won't return anything
val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, All, "askjdhakfgkafgkasf", 123, None)
doTestReadI2b2AdminPreviousQueries(request, Nil)
}
@Test
def testReadI2b2AdminPreviousQueriesExcludeUser: Unit = afterLoadingTestData {
val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, Except(authn2.username), "", 10, None)
doTestReadI2b2AdminPreviousQueries(request, Seq(queryMaster2, queryMaster1))
}
@Test
def testReadI2b2AdminPreviousQueriesOnlyFlagged: Unit = afterLoadingTestData {
val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, All, "", 10, None, categoryToSearchWithin = Category.Flagged)
doTestReadI2b2AdminPreviousQueries(request, Seq(queryMaster4, queryMaster1))
}
@Test
def testReadPreviousQueriesOnlyFlaggedExcludingUser: Unit = afterLoadingTestData {
val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, Except(authn.username), "", 10, None, categoryToSearchWithin = Category.Flagged)
doTestReadI2b2AdminPreviousQueries(request, Seq(queryMaster4))
}
@Test
def testReadPreviousQueriesExcludingUserWithSearchString: Unit = afterLoadingTestData {
val request = ReadI2b2AdminPreviousQueriesRequest(projectId, waitTime, authn, All, queryName1, 10, None, categoryToSearchWithin = Category.Flagged)
doTestReadI2b2AdminPreviousQueries(request, Seq(queryMaster1))
}
@Test
def testReadI2b2QueryingUsers = afterLoadingTestData {
val request = ReadI2b2AdminQueryingUsersRequest(projectId, waitTime, authn, "foo")
val ReadI2b2AdminQueryingUsersResponse(users) = adminClient.readI2b2AdminQueryingUsers(request)
users.toSet should equal(Set(I2b2AdminUserWithRole(shrineProjectId, authn.username, "USER"), I2b2AdminUserWithRole(shrineProjectId, authn2.username, "USER")))
}
@Test
def testReadI2b2QueryingUsersNoResultsExpected = afterCreatingTables {
val request = ReadI2b2AdminQueryingUsersRequest(projectId, waitTime, authn, "foo")
val ReadI2b2AdminQueryingUsersResponse(users) = adminClient.readI2b2AdminQueryingUsers(request)
//DB is empty, so no users will be returned
users should equal(Nil)
}
@Test
def testRunHeldQueryUnknownQuery = afterCreatingTables {
val request = RunHeldQueryRequest(projectId, waitTime, authn, 12345L)
val resp = adminClient.runHeldQuery(request)
resp.isInstanceOf[ErrorResponse] should be(true)
}
@Test
def testRunHeldQueryKnownQuery = afterCreatingTables {
val networkQueryId = 12345L
val request = RunHeldQueryRequest(projectId, waitTime, authn, networkQueryId)
val queryName = "aslkdjasljkd"
val queryExpr = Term("n1")
val runQueryReq = RunQueryRequest(projectId, waitTime, authn, networkQueryId, None, None, Set(ResultOutputType.PATIENT_COUNT_XML), QueryDefinition(queryName, queryExpr))
runQueryAdapter.copy(runQueriesImmediately = false).processRequest(BroadcastMessage(networkAuthn, runQueryReq))
val resp = adminClient.runHeldQuery(request)
val runQueryResp = resp.asInstanceOf[RunQueryResponse]
runQueryResp.createDate should not be(null)
runQueryResp.groupId should be(networkAuthn.domain)
runQueryResp.userId should equal(networkAuthn.username)
runQueryResp.queryId should equal(dummyMasterId)
runQueryResp.singleNodeResult.setSize should equal(dummySetSize)
runQueryResp.singleNodeResult.resultType should equal(Some(ResultOutputType.PATIENT_COUNT_XML)) //TODO
runQueryResp.requestXml.name should equal(queryName)
runQueryResp.requestXml.expr.get should equal(Term("l1"))
}
}
diff --git a/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala b/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala
index a4374634d..7f7658625 100644
--- a/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala
+++ b/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala
@@ -1,339 +1,336 @@
package net.shrine.integration
import java.net.URL
import net.shrine.log.Loggable
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import org.junit.Test
import net.shrine.util.ShouldMatchersForJUnit
-import net.shrine.adapter.AdapterMap
-import net.shrine.adapter.DeleteQueryAdapter
+import net.shrine.adapter.{AdapterMap, DeleteQueryAdapter, FlagQueryAdapter, Obfuscator, ReadQueryResultAdapter, RunQueryAdapter, UnFlagQueryAdapter}
import net.shrine.adapter.client.AdapterClient
import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest
import net.shrine.adapter.service.AdapterRequestHandler
import net.shrine.adapter.service.AdapterService
import net.shrine.broadcaster.AdapterClientBroadcaster
import net.shrine.broadcaster.NodeHandle
import net.shrine.crypto.DefaultSignerVerifier
import net.shrine.crypto.TestKeystore
-import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, Credential, DeleteQueryRequest, DeleteQueryResponse, NodeId, Result, RunQueryRequest, CertId, RequestType, FlagQueryRequest, FlagQueryResponse, RawCrcRunQueryResponse, ResultOutputType, QueryResult, RunQueryResponse, AggregatedRunQueryResponse, UnFlagQueryRequest, UnFlagQueryResponse, DefaultBreakdownResultOutputTypes}
+import net.shrine.protocol.{AggregatedRunQueryResponse, AuthenticationInfo, BroadcastMessage, CertId, Credential, DefaultBreakdownResultOutputTypes, DeleteQueryRequest, DeleteQueryResponse, FlagQueryRequest, FlagQueryResponse, HiveCredentials, NodeId, QueryResult, RawCrcRunQueryResponse, RequestType, Result, ResultOutputType, RunQueryRequest, RunQueryResponse, UnFlagQueryRequest, UnFlagQueryResponse}
import net.shrine.qep.QepService
import net.shrine.broadcaster.SigningBroadcastAndAggregationService
import net.shrine.broadcaster.InJvmBroadcasterClient
-import net.shrine.adapter.FlagQueryAdapter
import net.shrine.protocol.query.Term
-import net.shrine.adapter.RunQueryAdapter
import net.shrine.client.Poster
import net.shrine.client.HttpClient
import net.shrine.client.HttpResponse
import net.shrine.adapter.translators.QueryDefinitionTranslator
import net.shrine.adapter.translators.ExpressionTranslator
import net.shrine.util.XmlDateHelper
-import net.shrine.adapter.ReadQueryResultAdapter
import net.shrine.protocol.query.QueryDefinition
-import net.shrine.adapter.UnFlagQueryAdapter
import net.shrine.crypto.SigningCertStrategy
/**
* @author clint
* @since Nov 27, 2013
*
* An in-JVM simulation of a Shrine network with one hub and 4 downstream adapters.
*
* The hub and adapters are wired up with mock AdapterClients that do in-JVM communication via method calls
* instead of remotely.
*
* The adapters are configured to respond with valid results for DeleteQueryRequests
* only. Other requests could be handled, but that would not provide benefit to offset the effort of wiring
* up more and more-complex Adapters.
*
* The test network is queried, and the final result, as well as the state of each adapter, is inspected to
* ensure that the right messages were sent between elements of the system.
*
*/
final class NetworkSimulationTest extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit {
private val certCollection = TestKeystore.certCollection
private lazy val myCertId: CertId = certCollection.myCertId.get
private lazy val signerVerifier = new DefaultSignerVerifier(certCollection)
private val domain = "test-domain"
private val username = "test-username"
private val password = "test-password"
import NetworkSimulationTest._
import scala.concurrent.duration._
private def deleteQueryAdapter: DeleteQueryAdapter = new DeleteQueryAdapter(dao)
private def flagQueryAdapter: FlagQueryAdapter = new FlagQueryAdapter(dao)
private def unFlagQueryAdapter: UnFlagQueryAdapter = new UnFlagQueryAdapter(dao)
private def mockPoster = Poster("http://example.com", new HttpClient {
override def post(input: String, url: String): HttpResponse = ???
})
private val hiveCredentials = HiveCredentials("d", "u", "pwd", "pid")
private def queuesQueriesRunQueryAdapter: RunQueryAdapter = {
val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("n1" -> Set("l1"))))
RunQueryAdapter(
poster = mockPoster,
dao = dao,
hiveCredentials = hiveCredentials,
conceptTranslator = translator,
adapterLockoutAttemptsThreshold = 10000,
doObfuscation = false,
runQueriesImmediately = false,
breakdownTypes = DefaultBreakdownResultOutputTypes.toSet,
collectAdapterAudit = false,
- botCountTimeThresholds = Seq.empty
+ botCountTimeThresholds = Seq.empty,
+ obfuscator = Obfuscator(5,6.5,10)
)
}
//todo this looks unused
private def immediatelyRunsQueriesRunQueryAdapter(setSize: Long): RunQueryAdapter = {
val mockCrcPoster = Poster("http://example.com", new HttpClient {
override def post(input: String, url: String): HttpResponse = {
val req = RunQueryRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input).get
val now = XmlDateHelper.now
val queryResult = QueryResult(1L, 42L, Some(ResultOutputType.PATIENT_COUNT_XML), setSize, Some(now), Some(now), Some("desc"), QueryResult.StatusType.Finished, Some("status"))
val mockCrcXml = RawCrcRunQueryResponse(req.networkQueryId, XmlDateHelper.now, req.authn.username, req.projectId, req.queryDefinition, 42L, Map(ResultOutputType.PATIENT_COUNT_XML -> Seq(queryResult))).toI2b2String
HttpResponse.ok(mockCrcXml)
}
})
queuesQueriesRunQueryAdapter.copy(poster = mockCrcPoster, runQueriesImmediately = true)
}
private def readQueryResultAdapter(setSize: Long): ReadQueryResultAdapter = {
new ReadQueryResultAdapter(
mockPoster,
hiveCredentials,
dao,
doObfuscation = false,
DefaultBreakdownResultOutputTypes.toSet,
- collectAdapterAudit = false
+ collectAdapterAudit = false,
+ obfuscator = Obfuscator(5,6.5,10)
)
}
private lazy val adaptersByNodeId: Seq[(NodeId, MockAdapterRequestHandler)] = {
import NodeName._
import RequestType.{ MasterDeleteRequest => MasterDeleteRequestRT, FlagQueryRequest => FlagQueryRequestRT, QueryDefinitionRequest => RunQueryRT, GetQueryResult => ReadQueryResultRT, UnFlagQueryRequest => UnFlagQueryRequestRT }
(for {
(childName, setSize) <- Seq((A, 1L), (B, 2L), (C, 3L), (D, 4L))
} yield {
val nodeId = NodeId(childName.name)
val maxSignatureAge = 1.hour
val adapterMap = AdapterMap(Map(
MasterDeleteRequestRT -> deleteQueryAdapter,
FlagQueryRequestRT -> flagQueryAdapter,
UnFlagQueryRequestRT -> unFlagQueryAdapter,
RunQueryRT -> queuesQueriesRunQueryAdapter,
ReadQueryResultRT -> readQueryResultAdapter(setSize)))
nodeId -> MockAdapterRequestHandler(new AdapterService(nodeId, signerVerifier, maxSignatureAge, adapterMap))
})
}
private lazy val shrineService: QepService = {
val destinations: Set[NodeHandle] = {
(for {
(nodeId, adapterRequestHandler) <- adaptersByNodeId
} yield {
NodeHandle(nodeId, MockAdapterClient(nodeId, adapterRequestHandler))
}).toSet
}
QepService(
"example.com",
MockAuditDao,
MockAuthenticator,
MockQueryAuthorizationService,
true,
SigningBroadcastAndAggregationService(InJvmBroadcasterClient(AdapterClientBroadcaster(destinations, MockHubDao)), signerVerifier, SigningCertStrategy.Attach),
1.hour,
DefaultBreakdownResultOutputTypes.toSet,
false)
}
@Test
def testSimulatedNetwork = afterCreatingTables {
val authn = AuthenticationInfo(domain, username, Credential(password, false))
val masterId = 12345L
import scala.concurrent.duration._
val req = DeleteQueryRequest("some-project-id", 1.second, authn, masterId)
val resp = shrineService.deleteQuery(req, true)
for {
(nodeId, mockAdapter) <- adaptersByNodeId
} {
mockAdapter.lastMessage.networkAuthn.domain should equal(authn.domain)
mockAdapter.lastMessage.networkAuthn.username should equal(authn.username)
mockAdapter.lastMessage.request should equal(req)
mockAdapter.lastResult.response should equal(DeleteQueryResponse(masterId))
}
resp should equal(DeleteQueryResponse(masterId))
}
@Test
def testQueueQuery = afterCreatingTables {
val authn = AuthenticationInfo(domain, username, Credential(password, false))
val topicId = "askldjlkas"
val topicName = "Topic Name"
val queryName = "lsadj3028940"
import scala.concurrent.duration._
val runQueryReq = RunQueryRequest("some-project-id", 1.second, authn, 12345L, Some(topicId), Some(topicName), Set(ResultOutputType.PATIENT_COUNT_XML), QueryDefinition(queryName, Term("n1")))
val aggregatedRunQueryResp = shrineService.runQuery(runQueryReq, true).asInstanceOf[AggregatedRunQueryResponse]
var broadcastMessageId: Option[Long] = None
//Broadcast the original run query request; all nodes should queue the query
for {
(nodeId, mockAdapter) <- adaptersByNodeId
} {
broadcastMessageId = Option(mockAdapter.lastMessage.requestId)
mockAdapter.lastMessage.networkAuthn.domain should equal(authn.domain)
mockAdapter.lastMessage.networkAuthn.username should equal(authn.username)
val lastReq = mockAdapter.lastMessage.request.asInstanceOf[RunQueryRequest]
lastReq.authn should equal(runQueryReq.authn)
lastReq.requestType should equal(runQueryReq.requestType)
lastReq.waitTime should equal(runQueryReq.waitTime)
//todo what to do with this check? lastReq.networkQueryId should equal(mockAdapter.lastMessage.requestId)
lastReq.outputTypes should equal(runQueryReq.outputTypes)
lastReq.projectId should equal(runQueryReq.projectId)
lastReq.queryDefinition should equal(runQueryReq.queryDefinition)
lastReq.topicId should equal(runQueryReq.topicId)
val runQueryResp = mockAdapter.lastResult.response.asInstanceOf[RunQueryResponse]
runQueryResp.queryId should equal(-1L)
runQueryResp.singleNodeResult.statusType should equal(QueryResult.StatusType.Held)
runQueryResp.singleNodeResult.setSize should equal(-1L)
}
aggregatedRunQueryResp.queryId should equal(broadcastMessageId.get)
aggregatedRunQueryResp.results.map(_.setSize) should equal(Seq(-1L, -1L, -1L, -1L, -4L))
}
@Test
def testFlagQuery = afterCreatingTables {
val authn = AuthenticationInfo(domain, username, Credential(password, false))
val masterId = 12345L
import scala.concurrent.duration._
val networkQueryId = 9999L
val name = "some query"
val expr = Term("foo")
val fooQuery = QueryDefinition(name,expr)
dao.insertQuery(masterId.toString, networkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None)
dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(false)
dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(None)
val req = FlagQueryRequest("some-project-id", 1.second, authn, networkQueryId, Some("foo"))
val resp = shrineService.flagQuery(req, true)
resp should equal(FlagQueryResponse)
dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(true)
dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(Some("foo"))
}
@Test
def testUnFlagQuery = afterCreatingTables {
val authn = AuthenticationInfo(domain, username, Credential(password, false))
val masterId = 12345L
import scala.concurrent.duration._
val networkQueryId = 9999L
val flagMsg = Some("foo")
val name = "some query"
val expr = Term("foo")
val fooQuery = QueryDefinition(name,expr)
dao.insertQuery(masterId.toString, networkQueryId, authn, fooQuery, isFlagged = true, hasBeenRun = true, flagMessage = flagMsg)
dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(true)
dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(flagMsg)
val req = UnFlagQueryRequest("some-project-id", 1.second, authn, networkQueryId)
val resp = shrineService.unFlagQuery(req, true)
resp should equal(UnFlagQueryResponse)
dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(false)
dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(None)
}
}
object NetworkSimulationTest {
private final case class MockAdapterClient(nodeId: NodeId, adapter: AdapterRequestHandler) extends AdapterClient with Loggable {
import scala.concurrent.ExecutionContext.Implicits.global
override def query(message: BroadcastMessage): Future[Result] = Future.successful {
debug(s"Invoking Adapter $nodeId with $message")
val result = adapter.handleRequest(message)
debug(s"Got result from $nodeId: $result")
result
}
override def url: Option[URL] = ???
}
private final case class MockAdapterRequestHandler(delegate: AdapterRequestHandler) extends AdapterRequestHandler {
@volatile var lastMessage: BroadcastMessage = _
@volatile var lastResult: Result = _
override def handleRequest(request: BroadcastMessage): Result = {
lastMessage = request
val result = delegate.handleRequest(request)
lastResult = result
result
}
}
}