diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala
index 531095823..f04f6f482 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala
@@ -1,111 +1,110 @@
package net.shrine.adapter
import org.xml.sax.SAXParseException
import scala.xml.NodeSeq
import scala.xml.XML
import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, BroadcastMessage, Credential, ErrorResponse, HiveCredentials, ShrineRequest, ShrineResponse, TranslatableRequest}
import net.shrine.util.XmlDateHelper
import net.shrine.client.Poster
import net.shrine.problem.{AbstractProblem, ProblemSources}
import scala.util.Try
import scala.util.control.NonFatal
/**
* @author Bill Simons
* @since 4/11/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
abstract class CrcAdapter[T <: ShrineRequest, V <: ShrineResponse](
poster: Poster,
override protected val hiveCredentials: HiveCredentials) extends WithHiveCredentialsAdapter(hiveCredentials) {
protected def parseShrineResponse(nodeSeq: NodeSeq): ShrineResponse
private[adapter] def parseShrineErrorResponseWithFallback(xmlResponseFromCrc: String): ShrineResponse = {
//NB: See https://open.med.harvard.edu/jira/browse/SHRINE-534
//NB: https://open.med.harvard.edu/jira/browse/SHRINE-745
val shrineResponseAttempt = for {
crcXml <- Try(XML.loadString(xmlResponseFromCrc))
shrineResponse <- Try(parseShrineResponse(crcXml)).recover { case NonFatal(e) =>
info(s"Exception while parsing $crcXml",e)
ErrorResponse.fromI2b2(crcXml)
} //todo pass the exception to build a proper error response, and log the exception
} yield shrineResponse
shrineResponseAttempt.recover {
case saxx:SAXParseException => ErrorResponse(CannotParseXmlFromCrc(saxx,xmlResponseFromCrc))
case NonFatal(e) =>
error(s"Error parsing response from CRC: ", e)
ErrorResponse(ExceptionWhileLoadingCrcResponse(e,xmlResponseFromCrc))
}.get
}
//NB: default is a noop; only RunQueryAdapter needs this for now
protected[adapter] def translateNetworkToLocal(request: T): T = request
protected[adapter] override def processRequest(message: BroadcastMessage): BaseShrineResponse = {
val i2b2Response = callCrc(translateRequest(message.request))
parseShrineErrorResponseWithFallback(i2b2Response)
}
protected def callCrc(request: ShrineRequest): String = {
debug(s"Sending Shrine-formatted request to the CRC at '${poster.url}': $request")
val crcRequest = request.toI2b2String
val crcResponse = XmlDateHelper.time(s"Calling the CRC at '${poster.url}'")(debug(_)) {
//Wrap exceptions in a more descriptive form, to enable sending better error messages back to the legacy web client
try { poster.post(crcRequest) }
catch {
case NonFatal(e) => throw CrcInvocationException(poster.url, request, e)
}
}
crcResponse.body
}
private[adapter] def translateRequest(request: BaseShrineRequest): ShrineRequest = request match {
case transReq: TranslatableRequest[T] => //noinspection RedundantBlock
{
val HiveCredentials(domain, username, password, project) = hiveCredentials
- //val authInfo = AuthenticationInfo(domain, username, Credential(password, isToken = false))
+ val authInfo = AuthenticationInfo(domain, username, Credential(password, isToken = false))
// Note: request.authn.domain, request.authn.username and request.authn.credential contain the web-client user, not the shrine user
- val authInfo = request.authn
+ //val authInfo = request.authn
- //translateNetworkToLocal(transReq.withAuthn(authInfo).withProject(project).asRequest)
translateNetworkToLocal(transReq.withAuthn(authInfo).withProject(project).asRequest)
}
case req: ShrineRequest => req
case _ => throw new IllegalArgumentException(s"Unexpected request: $request")
}
}
case class CannotParseXmlFromCrc(saxx:SAXParseException,xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(saxx)
override val summary: String = "Could not parse response from CRC."
override val description:String = s"${saxx.getMessage} while parsing the response from the CRC."
override val detailsXml =
{throwableDetail.getOrElse("")}
Response is {xmlResponseFromCrc}
}
case class ExceptionWhileLoadingCrcResponse(t:Throwable,xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(t)
override val summary: String = "Unanticipated exception with response from CRC."
override val description:String = s"${t.getMessage} while parsing the response from the CRC."
override val detailsXml =
{throwableDetail.getOrElse("")}
Response is {xmlResponseFromCrc}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala
index 98c0c0a28..c1775fd82 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala
@@ -1,284 +1,284 @@
package net.shrine.adapter
import net.shrine.adapter.audit.AdapterAuditDb
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.xml.NodeSeq
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.translators.QueryDefinitionTranslator
import net.shrine.protocol.{AuthenticationInfo, BroadcastMessage, Credential, ErrorFromCrcException, ErrorResponse, HiveCredentials, I2b2ResultEnvelope, MissingCrCXmlResultException, QueryResult, RawCrcRunQueryResponse, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse, ShrineResponse}
import net.shrine.client.Poster
import net.shrine.problem.{AbstractProblem, LoggingProblemHandler, Problem, ProblemNotYetEncoded, ProblemSources}
import scala.util.control.NonFatal
import net.shrine.util.XmlDateHelper
import scala.concurrent.duration.Duration
import scala.xml.XML
/**
* @author Bill Simons
* @author clint
* @since 4/15/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
final case class RunQueryAdapter(
poster: Poster,
dao: AdapterDao,
override val hiveCredentials: HiveCredentials,
conceptTranslator: QueryDefinitionTranslator,
adapterLockoutAttemptsThreshold: Int, //Set to 0 to disable lockout. todo remove in SHRINE 1.24
doObfuscation: Boolean,
runQueriesImmediately: Boolean,
breakdownTypes: Set[ResultOutputType],
collectAdapterAudit:Boolean,
botCountTimeThresholds:Seq[(Long,Duration)],
obfuscator: Obfuscator
) extends CrcAdapter[RunQueryRequest, RunQueryResponse](poster, hiveCredentials) {
logStartup()
import RunQueryAdapter._
override protected[adapter] def parseShrineResponse(xml: NodeSeq) = RawCrcRunQueryResponse.fromI2b2(breakdownTypes)(xml).get //TODO: Avoid .get call
override protected[adapter] def translateNetworkToLocal(request: RunQueryRequest): RunQueryRequest = {
try { request.mapQueryDefinition(conceptTranslator.translate) }
catch {
case NonFatal(e) => throw new AdapterMappingException(request,s"Error mapping query terms from network to local forms.", e)
}
}
override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = {
if (collectAdapterAudit) AdapterAuditDb.db.insertQueryReceived(message)
if (isLockedOut(message.networkAuthn)) {
throw new AdapterLockoutException(message.networkAuthn,poster.url)
}
dao.checkIfBot(message.networkAuthn,botCountTimeThresholds)
val runQueryReq = message.request.asInstanceOf[RunQueryRequest]
//We need to use the network identity from the BroadcastMessage, since that will have the network username
//(ie, ecommons) of the querying user. Using the AuthenticationInfo from the incoming request breaks the fetching
//of previous queries on deployed systems where the credentials in the identity param to this method and the authn
//field of the incoming request are different, like the HMS Shrine deployment.
//NB: Credential field is wiped out to preserve old behavior -Clint 14 Nov, 2013
- val authnToUse = message.networkAuthn//.copy(credential = Credential("", isToken = false))
+ val authnToUse = message.networkAuthn.copy(credential = Credential("", isToken = false))
if (!runQueriesImmediately) {
debug(s"Queueing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}")
storeQuery(authnToUse, message, runQueryReq)
} else {
debug(s"Performing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}")
val result: ShrineResponse = runQuery(authnToUse, message.copy(request = runQueryReq.withAuthn(authnToUse)), runQueryReq.withAuthn(authnToUse))
if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(runQueryReq.networkQueryId,result)
result
}
}
private def storeQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): RunQueryResponse = {
//Use dummy ids for what we would have received from the CRC
val masterId: Long = -1L
val queryInstanceId: Long = -1L
val resultId: Long = -1L
//TODO: is this right?? Or maybe it's project id?
val groupId = authnToUse.domain
val invalidSetSize = -1L
val now = XmlDateHelper.now
val queryResult = QueryResult(resultId, queryInstanceId, Some(ResultOutputType.PATIENT_COUNT_XML), invalidSetSize, Some(now), Some(now), Some("Query enqueued for later processing"), QueryResult.StatusType.Held, Some("Query enqueued for later processing"))
dao.inTransaction {
val insertedQueryId = dao.insertQuery(masterId.toString, request.networkQueryId, authnToUse, request.queryDefinition, isFlagged = false, hasBeenRun = false, flagMessage = None)
val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(queryResult))
//NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in
//AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete
//queries, will work.
val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head
dao.insertCountResult(countQueryResultId, -1L, -1L)
}
RunQueryResponse(masterId, XmlDateHelper.now, authnToUse.username, groupId, request.queryDefinition, queryInstanceId, queryResult)
}
private def runQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): ShrineResponse = {
if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionStarted(request)
//NB: Pass through ErrorResponses received from the CRC.
//See: https://open.med.harvard.edu/jira/browse/SHRINE-794
val result = super.processRequest(message) match {
case e: ErrorResponse => e
case rawRunQueryResponse: RawCrcRunQueryResponse => processRawCrcRunQueryResponse(authnToUse, request, rawRunQueryResponse)
}
if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionCompletedShrineResponse(request,result)
result
}
private[adapter] def processRawCrcRunQueryResponse(authnToUse: AuthenticationInfo, request: RunQueryRequest, rawRunQueryResponse: RawCrcRunQueryResponse): RunQueryResponse = {
def isBreakdown(result: QueryResult) = result.resultType.exists(_.isBreakdown)
val originalResults: Seq[QueryResult] = rawRunQueryResponse.results
val (originalBreakdownResults, originalNonBreakDownResults): (Seq[QueryResult],Seq[QueryResult]) = originalResults.partition(isBreakdown)
val originalBreakdownCountAttempts: Seq[(QueryResult, Try[QueryResult])] = attemptToRetrieveBreakdowns(request, originalBreakdownResults)
val (successfulBreakdownCountAttempts, failedBreakdownCountAttempts) = originalBreakdownCountAttempts.partition { case (_, t) => t.isSuccess }
val failedBreakdownCountAttemptsWithProblems = failedBreakdownCountAttempts.map { attempt =>
val originalResult: QueryResult = attempt._1
val queryResult:QueryResult = if (originalResult.problemDigest.isDefined) originalResult
else {
attempt._2 match {
case Success(_) => originalResult
case Failure(x) => //noinspection RedundantBlock
{
val problem:Problem = x match {
case e: ErrorFromCrcException => ErrorFromCrcBreakdown(e)
case e: MissingCrCXmlResultException => CannotInterpretCrcBreakdownXml(e)
case NonFatal(e) => {
val summary = s"Unexpected exception while interpreting breakdown response"
ProblemNotYetEncoded(summary, e)
}
}
//TODO: is this needed? LoggingProblemHandler.handleProblem(problem)
originalResult.copy(problemDigest = Some(problem.toDigest))
}
}
}
(queryResult,attempt._2)
}
logBreakdownFailures(rawRunQueryResponse, failedBreakdownCountAttemptsWithProblems)
val originalMergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope] = {
val withBreakdownCounts = successfulBreakdownCountAttempts.collect { case (_, Success(queryResultWithBreakdowns)) => queryResultWithBreakdowns }
withBreakdownCounts.map(_.breakdowns).fold(Map.empty)(_ ++ _)
}
val obfuscatedQueryResults = originalResults.map(obfuscator.obfuscate)
val obfuscatedNonBreakdownQueryResults = obfuscatedQueryResults.filterNot(isBreakdown)
val obfuscatedMergedBreakdowns = originalMergedBreakdowns.mapValues(_.mapValues(obfuscator.obfuscate))
val failedBreakdownTypes = failedBreakdownCountAttemptsWithProblems.flatMap { case (qr, _) => qr.resultType }
dao.storeResults(
authn = authnToUse,
masterId = rawRunQueryResponse.queryId.toString,
networkQueryId = request.networkQueryId,
queryDefinition = request.queryDefinition,
rawQueryResults = originalResults,
obfuscatedQueryResults = obfuscatedQueryResults,
failedBreakdownTypes = failedBreakdownTypes,
mergedBreakdowns = originalMergedBreakdowns,
obfuscatedBreakdowns = obfuscatedMergedBreakdowns)
// at this point the queryResult could be a mix of successes and failures.
// SHRINE reports only the successes. See SHRINE-1567 for details
val queryResults: Seq[QueryResult] = if (doObfuscation) obfuscatedNonBreakdownQueryResults else originalNonBreakDownResults
val breakdownsToReturn: Map[ResultOutputType, I2b2ResultEnvelope] = if (doObfuscation) obfuscatedMergedBreakdowns else originalMergedBreakdowns
//TODO: Will fail in the case of NO non-breakdown QueryResults. Can this ever happen, and is it worth protecting against here?
//can failedBreakdownCountAttempts be mixed back in here?
val resultWithBreakdowns: QueryResult = queryResults.head.withBreakdowns(breakdownsToReturn)
if(debugEnabled) {
def justBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = breakdowns.mapValues(_.data)
val obfuscationMessage = s"obfuscation is ${if(doObfuscation) "ON" else "OFF"}"
debug(s"Returning QueryResult with count ${resultWithBreakdowns.setSize} (original count: ${originalNonBreakDownResults.headOption.map(_.setSize)} ; $obfuscationMessage)")
debug(s"Returning QueryResult with breakdowns ${justBreakdowns(resultWithBreakdowns.breakdowns)} (original breakdowns: ${justBreakdowns(originalMergedBreakdowns)} ; $obfuscationMessage)")
debug(s"Full QueryResult: $resultWithBreakdowns")
}
//if any results had problems, this commented out code can turn it into an error QueryResult
//See SHRINE-1619
//val problem: Option[ProblemDigest] = failedBreakdownCountAttemptsWithProblems.headOption.flatMap(x => x._1.problemDigest)
//val queryResult = problem.fold(resultWithBreakdowns)(pd => QueryResult.errorResult(Some(pd.description),"Error with CRC",pd))
rawRunQueryResponse.toRunQueryResponse.withResult(resultWithBreakdowns)
}
private def getResultFromCrc(parentRequest: RunQueryRequest, networkResultId: Long): Try[ReadResultResponse] = {
def readResultRequest(runQueryReq: RunQueryRequest, networkResultId: Long) = ReadResultRequest(hiveCredentials.projectId, runQueryReq.waitTime, hiveCredentials.toAuthenticationInfo, networkResultId.toString)
Try(XML.loadString(callCrc(readResultRequest(parentRequest, networkResultId)))).flatMap(ReadResultResponse.fromI2b2(breakdownTypes))
}
private[adapter] def attemptToRetrieveCount(runQueryReq: RunQueryRequest, originalCountQueryResult: QueryResult): (QueryResult, Try[QueryResult]) = {
originalCountQueryResult -> (for {
countData <- getResultFromCrc(runQueryReq, originalCountQueryResult.resultId)
} yield originalCountQueryResult.withSetSize(countData.metadata.setSize))
}
private[adapter] def attemptToRetrieveBreakdowns(runQueryReq: RunQueryRequest, breakdownResults: Seq[QueryResult]): Seq[(QueryResult, Try[QueryResult])] = {
breakdownResults.map { origBreakdownResult =>
origBreakdownResult -> (for {
breakdownData <- getResultFromCrc(runQueryReq, origBreakdownResult.resultId).map(_.data)
} yield origBreakdownResult.withBreakdown(breakdownData))
}
}
private[adapter] def logBreakdownFailures(response: RawCrcRunQueryResponse,
failures: Seq[(QueryResult, Try[QueryResult])]) {
for {
(origQueryResult, Failure(e)) <- failures
} {
error(s"Couldn't load breakdown for QueryResult with masterId: ${response.queryId}, instanceId: ${origQueryResult.instanceId}, resultId: ${origQueryResult.resultId}. Asked for result type: ${origQueryResult.resultType}", e)
}
}
private def isLockedOut(authn: AuthenticationInfo): Boolean = {
adapterLockoutAttemptsThreshold match {
case 0 => false
case _ => dao.isUserLockedOut(authn, adapterLockoutAttemptsThreshold)
}
}
private def logStartup(): Unit = {
val message = {
if (runQueriesImmediately) { s"${getClass.getSimpleName} will run queries immediately" }
else { s"${getClass.getSimpleName} will queue queries for later execution" }
}
info(message)
}
}
case class ErrorFromCrcBreakdown(x:ErrorFromCrcException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
override val summary: String = "The CRC reported an error."
override val description = "The CRC reported an internal error."
}
case class CannotInterpretCrcBreakdownXml(x:MissingCrCXmlResultException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
override val summary: String = "SHRINE cannot interpret the CRC response."
override val description = "The CRC responded, but SHRINE could not interpret that response."
}
\ No newline at end of file