diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala index 7ce5ce381..8b51f1bf4 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala @@ -1,297 +1,298 @@ package net.shrine.adapter import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import net.shrine.adapter.audit.AdapterAuditDb import net.shrine.problem.{AbstractProblem, ProblemSources} import scala.Option.option2Iterable import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.Try import scala.xml.NodeSeq import net.shrine.adapter.Obfuscator.obfuscateResults import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.model.Breakdown import net.shrine.adapter.dao.model.ShrineQueryResult import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, ErrorResponse, HasQueryResults, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse, BaseShrineRequest} import net.shrine.protocol.query.QueryDefinition import net.shrine.util.StackTrace import net.shrine.util.Tries.sequence import scala.concurrent.duration.Duration import net.shrine.client.Poster /** * @author clint * @since Nov 2, 2012 * */ object AbstractReadQueryResultAdapter { private final case class RawResponseAttempts(countResponseAttempt: Try[ReadResultResponse], breakdownResponseAttempts: Seq[Try[ReadResultResponse]]) private final case class SpecificResponseAttempts[R](responseAttempt: Try[R], breakdownResponseAttempts: Seq[Try[ReadResultResponse]]) } abstract class AbstractReadQueryResultAdapter[Req <: BaseShrineRequest, Rsp <: ShrineResponse with HasQueryResults]( poster: Poster, override val hiveCredentials: HiveCredentials, dao: AdapterDao, doObfuscation: Boolean, getQueryId: Req => Long, getProjectId: Req => String, toResponse: (Long, QueryResult) => Rsp, breakdownTypes: Set[ResultOutputType], collectAdapterAudit:Boolean ) extends WithHiveCredentialsAdapter(hiveCredentials) { //TODO: Make this configurable private val numThreads = math.max(5, Runtime.getRuntime.availableProcessors) //TODO: Use scala.concurrent.ExecutionContext.Implicits.global instead? private lazy val executorService = Executors.newFixedThreadPool(numThreads) private lazy val executionContext = ExecutionContext.fromExecutorService(executorService) override def shutdown() { try { executorService.shutdown() executorService.awaitTermination(5, TimeUnit.SECONDS) } finally { executorService.shutdownNow() super.shutdown() } } import AbstractReadQueryResultAdapter._ override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = { val req = message.request.asInstanceOf[Req] val queryId = getQueryId(req) def findShrineQueryRow = dao.findQueryByNetworkId(queryId) def findShrineQueryResults = dao.findResultsFor(queryId) findShrineQueryRow match { case None => { debug(s"Query $queryId not found in the Shrine DB") errorResponse(queryId) } case Some(shrineQueryRow) => { if (shrineQueryRow.hasNotBeenRun) { debug(s"Query $queryId found, but it wasn't run before") findShrineQueryResults.map(makeResponseFrom(queryId, _)).getOrElse { debug(s"Couldn't retrive all results for query $queryId; it's likely the query's status is incomplete (QUEUED, PROCESSING, etc)") errorResponse(queryId) } } else { findShrineQueryResults match { case None => { debug(s"Query $queryId found, and it has been run, but its results are not available yet") //TODO: When precisely can this happen? Should we go back to the CRC here? errorResponse(queryId) } case Some(shrineQueryResult) => { if (shrineQueryResult.isDone) { debug(s"Query $queryId is done and already stored, returning stored results") makeResponseFrom(queryId, shrineQueryResult) } else { debug(s"Query $queryId is incomplete, asking CRC for results") val result: ShrineResponse = retrieveQueryResults(queryId, req, shrineQueryResult, message) if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(queryId,result) result } } } } } } } private def errorResponse(queryId: Long) = ErrorResponse(QueryNotFound(queryId)) private def makeResponseFrom(queryId: Long, shrineQueryResult: ShrineQueryResult): ShrineResponse = { shrineQueryResult.toQueryResults(doObfuscation).map(toResponse(queryId, _)).getOrElse(errorResponse(queryId)) } private def retrieveQueryResults(queryId: Long, req: Req, shrineQueryResult: ShrineQueryResult, message: BroadcastMessage): ShrineResponse = { //NB: If the requested query was not finished executing on the i2b2 side when Shrine recorded it, attempt to //retrieve it and all its sub-components (breakdown results, if any) in parallel. Asking for the results in //parallel is quite possibly too clever, but may be faster than asking for them serially. //TODO: Review this. //Make requests for results in parallel val futureResponses = scatter(message.networkAuthn, req, shrineQueryResult) //Gather all the results (block until they're all returned) val SpecificResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = gather(queryId, futureResponses, req.waitTime) countResponseAttempt match { //If we successfully received the parent response (the one with query type PATIENT_COUNT_XML), re-store it along //with any retrieved breakdowns before returning it. case Success(countResponse) => { //NB: Only store the result if needed, that is, if all results are done //TODO: REVIEW THIS storeResultIfNecessary(shrineQueryResult, countResponse, req.authn, queryId, getFailedBreakdownTypes(breakdownResponseAttempts)) countResponse } case Failure(e) => ErrorResponse(s"Couldn't retrieve query with id '$queryId' from the CRC: exception message follows: ${e.getMessage} stack trace: ${StackTrace.stackTraceAsString(e)}") } } private def scatter(authn: AuthenticationInfo, req: Req, shrineQueryResult: ShrineQueryResult): Future[RawResponseAttempts] = { def makeRequest(localResultId: Long) = ReadResultRequest(hiveCredentials.projectId, req.waitTime, hiveCredentials.toAuthenticationInfo, localResultId.toString) def process(localResultId: Long): ShrineResponse = { delegateResultRetrievingAdapter.process(authn, makeRequest(localResultId)) } implicit val executionContext = this.executionContext import scala.concurrent.blocking def futureBlockingAttempt[T](f: => T): Future[Try[T]] = Future(blocking(Try(f))) val futureCountAttempt: Future[Try[ShrineResponse]] = futureBlockingAttempt { process(shrineQueryResult.count.localId) } val futureBreakdownAttempts = Future.sequence(for { Breakdown(_, localResultId, resultType, data) <- shrineQueryResult.breakdowns } yield futureBlockingAttempt { process(localResultId) }) //Log errors retrieving count futureCountAttempt.collect { case Success(e: ErrorResponse) => error(s"Error requesting count result from the CRC: '$e'") case Failure(e) => error(s"Error requesting count result from the CRC: ", e) } //Log errors retrieving breakdown for { breakdownResponseAttempts <- futureBreakdownAttempts } { breakdownResponseAttempts.collect { case Success(e: ErrorResponse) => error(s"Error requesting breakdown result from the CRC: '$e'") case Failure(e) => error(s"Error requesting breakdown result from the CRC: ", e) } } //"Filter" for non-ErrorResponses val futureNonErrorCountAttempt: Future[Try[ReadResultResponse]] = futureCountAttempt.collect { case Success(resp: ReadResultResponse) => Success(resp) //NB: Need to repackage response here to avoid ugly, obscure, superfluous cast case unexpected => Failure(new Exception(s"Getting count result failed. Response is: '$unexpected'")) } //"Filter" for non-ErrorResponses val futureNonErrorBreakdownResponseAttempts: Future[Seq[Try[ReadResultResponse]]] = for { breakdownResponseAttempts <- futureBreakdownAttempts } yield { breakdownResponseAttempts.collect { case Success(resp: ReadResultResponse) => Try(resp) } } for { countResponseAttempt <- futureNonErrorCountAttempt breakdownResponseAttempts <- futureNonErrorBreakdownResponseAttempts } yield { RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) } } private def gather(queryId: Long, futureResponses: Future[RawResponseAttempts], waitTime: Duration): SpecificResponseAttempts[Rsp] = { val RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = Await.result(futureResponses, waitTime) //Log any failures (countResponseAttempt +: breakdownResponseAttempts).collect { case Failure(e) => e }.foreach(error("Error retrieving result from the CRC: ", _)) //NB: Count response and ALL breakdown responses must be available (not Failures) or else a Failure will be returned val responseAttempt = for { countResponse: ReadResultResponse <- countResponseAttempt countQueryResult = countResponse.metadata breakdownResponses: Seq[ReadResultResponse] <- sequence(breakdownResponseAttempts) } yield { val localCountResultId = countResponse.metadata.resultId val breakdownsByType = (for { breakdownResponse <- breakdownResponses resultType <- breakdownResponse.metadata.resultType } yield resultType -> breakdownResponse.data).toMap val queryResultWithBreakdowns = countQueryResult.withBreakdowns(breakdownsByType) val queryResultToReturn = if(doObfuscation) Obfuscator.obfuscate(queryResultWithBreakdowns) else queryResultWithBreakdowns toResponse(queryId, queryResultToReturn) } SpecificResponseAttempts(responseAttempt, breakdownResponseAttempts) } private def getFailedBreakdownTypes(attempts: Seq[Try[ReadResultResponse]]): Set[ResultOutputType] = { val successfulBreakdownTypes = attempts.collect { case Success(ReadResultResponse(_, metadata, _)) => metadata.resultType }.flatten breakdownTypes -- successfulBreakdownTypes } private def storeResultIfNecessary(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) { val responseIsDone = response.results.forall(_.statusType.isDone) if (responseIsDone) { storeResult(shrineQueryResult, response, authn, queryId, failedBreakdownTypes) } } private def storeResult(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) { val rawResults = response.results val obfuscatedResults = obfuscateResults(doObfuscation)(response.results) for { shrineQuery <- dao.findQueryByNetworkId(queryId) queryResult <- rawResults.headOption obfuscatedQueryResult <- obfuscatedResults.headOption } { val queryDefinition = QueryDefinition(shrineQuery.name, shrineQuery.queryDefinition.expr) dao.inTransaction { dao.deleteQuery(queryId) dao.storeResults(authn, shrineQueryResult.localId, queryId, queryDefinition, rawResults, obfuscatedResults, failedBreakdownTypes.toSeq, queryResult.breakdowns, obfuscatedQueryResult.breakdowns) } } } private type Unmarshaller[R] = Set[ResultOutputType] => NodeSeq => Try[R] private final class DelegateAdapter[Rqst <: ShrineRequest, Rspns <: ShrineResponse](unmarshaller: Unmarshaller[Rspns]) extends CrcAdapter[Rqst, Rspns](poster, hiveCredentials) { def process(authn: AuthenticationInfo, req: Rqst): Rspns = processRequest(BroadcastMessage(authn, req)).asInstanceOf[Rspns] override protected def parseShrineResponse(xml: NodeSeq): ShrineResponse = unmarshaller(breakdownTypes)(xml).get //TODO: Avoid .get call } private lazy val delegateResultRetrievingAdapter = new DelegateAdapter[ReadResultRequest, ReadResultResponse](ReadResultResponse.fromI2b2 _) } case class QueryNotFound(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) { override def summary: String = s"Query with id '$queryId' not found" + override def description:String = s"No query with id $queryId found on ${stamp.host}" } diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala index 4992216cf..ed06fe2e2 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala @@ -1,75 +1,81 @@ package net.shrine.adapter import net.shrine.log.Loggable import net.shrine.problem.{Problem, ProblemNotYetEncoded, LoggingProblemHandler, ProblemSources, AbstractProblem} import net.shrine.protocol.{ShrineRequest, BroadcastMessage, ErrorResponse, BaseShrineResponse, AuthenticationInfo} /** * @author Bill Simons * @since 4/8/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ abstract class Adapter extends Loggable { final def perform(message: BroadcastMessage): BaseShrineResponse = { def problemToErrorResponse(problem:Problem):ErrorResponse = { LoggingProblemHandler.handleProblem(problem) ErrorResponse(problem) } val shrineResponse = try { processRequest(message) } catch { - case e: AdapterLockoutException => { - problemToErrorResponse(AdapterLockout(message.request.authn,e)) - } - case e @ CrcInvocationException(invokedCrcUrl, request, cause) => { - problemToErrorResponse(CrcCouldNotBeInvoked(invokedCrcUrl,request,e)) - } - case e: AdapterMappingException => { - problemToErrorResponse(AdapterMappingProblem(e)) - } - case e: Exception => { + case e: AdapterLockoutException => problemToErrorResponse(AdapterLockout(message.request.authn,e)) + + case e @ CrcInvocationException(invokedCrcUrl, request, cause) => problemToErrorResponse(CrcCouldNotBeInvoked(invokedCrcUrl,request,e)) + case e: AdapterMappingException => problemToErrorResponse(AdapterMappingProblem(e)) + + //noinspection RedundantBlock + case e: Exception => { val summary = if(message == null) "Unknown problem in Adapter.perform with null BroadcastMessage" else s"Unexpected exception in Adapter" problemToErrorResponse(ProblemNotYetEncoded(summary,e)) } } shrineResponse } protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse //NOOP, may be overridden by subclasses def shutdown(): Unit = () } case class AdapterLockout(authn:AuthenticationInfo,x:AdapterLockoutException) extends AbstractProblem(ProblemSources.Hub) { - override def summary: String = s"User '${authn.domain}:${authn.username}' is locked out" + override val throwable = Some(x) + override val summary: String = s"User '${authn.domain}:${authn.username}' is locked out" + override val description:String = s"User '${authn.domain}:${authn.username}' has run too many queries with the same result at ${x.url}" - override def throwable = Some(x) } case class CrcCouldNotBeInvoked(crcUrl:String,request:ShrineRequest,x:CrcInvocationException) extends AbstractProblem(ProblemSources.Hub) { - override def summary: String = s"Error invoking the CRC at '$crcUrl' with request $request ." - - override def throwable = Some(x) + override val throwable = Some(x) + override val summary: String = s"Error invoking the CRC at '$crcUrl' due to ${throwable.get}} ." + override val description: String = s"Error invoking the CRC at '$crcUrl' with a ${request.getClass.getSimpleName} ." + override val details:String = + s""" + |${super.details} + """.stripMargin } case class AdapterMappingProblem(x:AdapterMappingException) extends AbstractProblem(ProblemSources.Hub) { - override def summary: String = s"Error mapping query terms on ${stamp.host} for query ${x.queryDefinition}" - - override def description = s"${super.description} ${x.getMessage}" - - override def throwable = Some(x) + override val throwable = Some(x) + override val summary: String = s"Error mapping query terms on ${stamp.host}" + override val description = s"The Shrine Adapter on ${stamp.host} cannot map this query to its local terms. Running query ${x.runQueryRequest.queryDefinition} caused ${x.cause}. This error must be corrected at the queried site." + override val details = + s"""${stamp.pretty} + |Query Defitiontion is ${x.runQueryRequest.queryDefinition} + |${throwableDetail.getOrElse("")} + |RunQueryRequest is ${x.runQueryRequest.elideAuthenticationInfo} + """.stripMargin } case class ExceptionInAdapter() \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterLockoutException.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterLockoutException.scala index c5885fa5c..1507d4015 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterLockoutException.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterLockoutException.scala @@ -1,13 +1,13 @@ package net.shrine.adapter import net.shrine.protocol.AuthenticationInfo /** * @author Andrew McMurry * @author clint - * @date Jan 6, 2010 - * @date Nov 21, 2012 (Scala Port) + * @since Jan 6, 2010 + * @since Nov 21, 2012 (Scala Port) */ -final class AdapterLockoutException(lockedOutAuthn: AuthenticationInfo) extends AdapterException { - override def getMessage = s"AdapterLockoutException(domain=${lockedOutAuthn.domain}, username=${lockedOutAuthn.username})" +final case class AdapterLockoutException(lockedOutAuthn: AuthenticationInfo,url:String) extends AdapterException { + override def getMessage = s"AdapterLockoutException(domain=${lockedOutAuthn.domain}, username=${lockedOutAuthn.username}) on $url" } diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterMappingException.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterMappingException.scala index 0412927d3..79af225fe 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterMappingException.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AdapterMappingException.scala @@ -1,11 +1,13 @@ package net.shrine.adapter -import net.shrine.protocol.query.QueryDefinition + +import net.shrine.protocol.RunQueryRequest /** * @author Andrew McMurry * @author clint * @since ??? * @since Nov 21, 2012 (Scala port) */ -final case class AdapterMappingException(queryDefinition:QueryDefinition, message: String, cause: Throwable) extends AdapterException(message, cause) +final case class AdapterMappingException(runQueryRequest: RunQueryRequest, message: String, cause: Throwable) extends + AdapterException(s"$message for request ${runQueryRequest.elideAuthenticationInfo}", cause) diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala index ccf0d2551..1eb1e4cfa 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala @@ -1,238 +1,238 @@ package net.shrine.adapter import net.shrine.adapter.audit.AdapterAuditDb import scala.util.Failure import scala.util.Success import scala.util.Try import scala.xml.NodeSeq import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, Credential, I2b2ResultEnvelope, QueryResult, RawCrcRunQueryResponse, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse, ErrorResponse, ShrineResponse} import net.shrine.client.Poster import scala.util.control.NonFatal import net.shrine.util.XmlDateHelper import scala.xml.XML /** * @author Bill Simons * @author clint * @since 4/15/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ final case class RunQueryAdapter( poster: Poster, dao: AdapterDao, override val hiveCredentials: HiveCredentials, conceptTranslator: QueryDefinitionTranslator, adapterLockoutAttemptsThreshold: Int, doObfuscation: Boolean, runQueriesImmediately: Boolean, breakdownTypes: Set[ResultOutputType], collectAdapterAudit:Boolean ) extends CrcAdapter[RunQueryRequest, RunQueryResponse](poster, hiveCredentials) { logStartup() import RunQueryAdapter._ override protected[adapter] def parseShrineResponse(xml: NodeSeq) = RawCrcRunQueryResponse.fromI2b2(breakdownTypes)(xml).get //TODO: Avoid .get call override protected[adapter] def translateNetworkToLocal(request: RunQueryRequest): RunQueryRequest = { try { request.mapQueryDefinition(conceptTranslator.translate) } catch { - case NonFatal(e) => throw new AdapterMappingException(request.queryDefinition,s"Error mapping query terms from network to local forms. request: ${request.elideAuthenticationInfo}", e) + case NonFatal(e) => throw new AdapterMappingException(request,s"Error mapping query terms from network to local forms.", e) } } override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = { if (collectAdapterAudit) AdapterAuditDb.db.insertQueryReceived(message) if (isLockedOut(message.networkAuthn)) { - throw new AdapterLockoutException(message.networkAuthn) + throw new AdapterLockoutException(message.networkAuthn,poster.url) } val runQueryReq = message.request.asInstanceOf[RunQueryRequest] //We need to use the network identity from the BroadcastMessage, since that will have the network username //(ie, ecommons) of the querying user. Using the AuthenticationInfo from the incoming request breaks the fetching //of previous queries on deployed systems where the credentials in the identity param to this method and the authn //field of the incoming request are different, like the HMS Shrine deployment. //NB: Credential field is wiped out to preserve old behavior -Clint 14 Nov, 2013 val authnToUse = message.networkAuthn.copy(credential = Credential("", false)) if (!runQueriesImmediately) { debug(s"Queueing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}") storeQuery(authnToUse, message, runQueryReq) } else { debug(s"Performing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}") val result: ShrineResponse = runQuery(authnToUse, message.copy(request = runQueryReq.withAuthn(authnToUse)), runQueryReq.withAuthn(authnToUse)) if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(runQueryReq.networkQueryId,result) result } } private def storeQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): RunQueryResponse = { //Use dummy ids for what we would have received from the CRC val masterId: Long = -1L val queryInstanceId: Long = -1L val resultId: Long = -1L //TODO: is this right?? Or maybe it's project id? val groupId = authnToUse.domain val invalidSetSize = -1L val now = XmlDateHelper.now val queryResult = QueryResult(resultId, queryInstanceId, Some(ResultOutputType.PATIENT_COUNT_XML), invalidSetSize, Some(now), Some(now), Some("Query enqueued for later processing"), QueryResult.StatusType.Held, Some("Query enqueued for later processing")) dao.inTransaction { val insertedQueryId = dao.insertQuery(masterId.toString, request.networkQueryId, authnToUse, request.queryDefinition, isFlagged = false, hasBeenRun = false, flagMessage = None) val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(queryResult)) //NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in //AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete //queries, will work. val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head dao.insertCountResult(countQueryResultId, -1L, -1L) } RunQueryResponse(masterId, XmlDateHelper.now, authnToUse.username, groupId, request.queryDefinition, queryInstanceId, queryResult) } private def runQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): ShrineResponse = { if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionStarted(request) //NB: Pass through ErrorResponses received from the CRC. //See: https://open.med.harvard.edu/jira/browse/SHRINE-794 val result = super.processRequest(message) match { case e: ErrorResponse => e case rawRunQueryResponse: RawCrcRunQueryResponse => { processRawCrcRunQueryResponse(authnToUse, request, rawRunQueryResponse) } } if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionCompletedShrineResponse(request,result) result } private[adapter] def processRawCrcRunQueryResponse(authnToUse: AuthenticationInfo, request: RunQueryRequest, rawRunQueryResponse: RawCrcRunQueryResponse): RunQueryResponse = { def isBreakdown(result: QueryResult) = result.resultType.map(_.isBreakdown).getOrElse(false) val originalResults = rawRunQueryResponse.results val (originalBreakdownResults, originalNonBreakDownResults) = originalResults.partition(isBreakdown) val originalBreakdownCountAttempts = attemptToRetrieveBreakdowns(request, originalBreakdownResults) val (successfulBreakdownCountAttempts, failedBreakdownCountAttempts) = originalBreakdownCountAttempts.partition { case (_, t) => t.isSuccess } logBreakdownFailures(rawRunQueryResponse, failedBreakdownCountAttempts) val originalMergedBreakdowns = { val withBreakdownCounts = successfulBreakdownCountAttempts.collect { case (_, Success(queryResultWithBreakdowns)) => queryResultWithBreakdowns } withBreakdownCounts.map(_.breakdowns).fold(Map.empty)(_ ++ _) } val obfuscatedQueryResults = originalResults.map(Obfuscator.obfuscate) val obfuscatedNonBreakdownQueryResults = obfuscatedQueryResults.filterNot(isBreakdown) val obfuscatedMergedBreakdowns = obfuscateBreakdowns(originalMergedBreakdowns) val failedBreakdownTypes = failedBreakdownCountAttempts.flatMap { case (queryResult, _) => queryResult.resultType } dao.storeResults( authn = authnToUse, masterId = rawRunQueryResponse.queryId.toString, networkQueryId = request.networkQueryId, queryDefinition = request.queryDefinition, rawQueryResults = originalResults, obfuscatedQueryResults = obfuscatedQueryResults, failedBreakdownTypes = failedBreakdownTypes, mergedBreakdowns = originalMergedBreakdowns, obfuscatedBreakdowns = obfuscatedMergedBreakdowns) val queryResults: Seq[QueryResult] = if (doObfuscation) obfuscatedNonBreakdownQueryResults else originalNonBreakDownResults val breakdownsToReturn: Map[ResultOutputType, I2b2ResultEnvelope] = if (doObfuscation) obfuscatedMergedBreakdowns else originalMergedBreakdowns //TODO: Will fail in the case of NO non-breakdown QueryResults. Can this ever happen, and is it worth protecting against here? val resultWithBreakdowns = queryResults.head.withBreakdowns(breakdownsToReturn) if(debugEnabled) { def justBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = breakdowns.mapValues(_.data) val obfuscationMessage = s"obfuscation is ${if(doObfuscation) "ON" else "OFF"}" debug(s"Returning QueryResult with count ${resultWithBreakdowns.setSize} (original count: ${originalNonBreakDownResults.headOption.map(_.setSize)} ; $obfuscationMessage)") debug(s"Returning QueryResult with breakdowns ${justBreakdowns(resultWithBreakdowns.breakdowns)} (original breakdowns: ${justBreakdowns(originalMergedBreakdowns)} ; $obfuscationMessage)") debug(s"Full QueryResult: $resultWithBreakdowns") } rawRunQueryResponse.toRunQueryResponse.withResult(resultWithBreakdowns) } private def getResultFromCrc(parentRequest: RunQueryRequest, networkResultId: Long): Try[ReadResultResponse] = { def readResultRequest(runQueryReq: RunQueryRequest, networkResultId: Long) = ReadResultRequest(hiveCredentials.projectId, runQueryReq.waitTime, hiveCredentials.toAuthenticationInfo, networkResultId.toString) Try(XML.loadString(callCrc(readResultRequest(parentRequest, networkResultId)))).flatMap(ReadResultResponse.fromI2b2(breakdownTypes)) } private[adapter] def attemptToRetrieveCount(runQueryReq: RunQueryRequest, originalCountQueryResult: QueryResult): (QueryResult, Try[QueryResult]) = { originalCountQueryResult -> (for { countData <- getResultFromCrc(runQueryReq, originalCountQueryResult.resultId) } yield originalCountQueryResult.withSetSize(countData.metadata.setSize)) } private[adapter] def attemptToRetrieveBreakdowns(runQueryReq: RunQueryRequest, breakdownResults: Seq[QueryResult]): Seq[(QueryResult, Try[QueryResult])] = { breakdownResults.map { origBreakdownResult => origBreakdownResult -> (for { breakdownData <- getResultFromCrc(runQueryReq, origBreakdownResult.resultId).map(_.data) } yield origBreakdownResult.withBreakdown(breakdownData)) } } private[adapter] def logBreakdownFailures(response: RawCrcRunQueryResponse, failures: Seq[(QueryResult, Try[QueryResult])]) { for { (origQueryResult, Failure(e)) <- failures } { error(s"Couldn't load breakdown for QueryResult with masterId: ${response.queryId}, instanceId: ${origQueryResult.instanceId}, resultId: ${origQueryResult.resultId}. Asked for result type: ${origQueryResult.resultType}", e) } } private def isLockedOut(authn: AuthenticationInfo): Boolean = { adapterLockoutAttemptsThreshold match { case 0 => false case _ => dao.isUserLockedOut(authn, adapterLockoutAttemptsThreshold) } } private def logStartup(): Unit = { val message = { if (runQueriesImmediately) { s"${getClass.getSimpleName} will run queries immediately" } else { s"${getClass.getSimpleName} will queue queries for later execution" } } info(message) } } object RunQueryAdapter { private[adapter] def obfuscateBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Map[ResultOutputType, I2b2ResultEnvelope] = { breakdowns.mapValues(_.mapValues(Obfuscator.obfuscate)) } } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala index 80784f2c8..ce0ff8e90 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala @@ -1,38 +1,40 @@ package net.shrine.adapter.components import net.shrine.adapter.dao.AdapterDao import net.shrine.problem.{ProblemSources, AbstractProblem} import net.shrine.protocol.ShrineResponse import net.shrine.protocol.ReadQueryDefinitionRequest import net.shrine.protocol.ReadQueryDefinitionResponse import net.shrine.protocol.ErrorResponse import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.AbstractReadQueryDefinitionRequest /** * @author clint * @since Apr 4, 2013 * * NB: Tested by ReadQueryDefinitionAdapterTest */ final case class QueryDefinitions[Req <: AbstractReadQueryDefinitionRequest](dao: AdapterDao) { def get(request: Req): ShrineResponse = { val resultOption = for { shrineQuery <- dao.findQueryByNetworkId(request.queryId) } yield { ReadQueryDefinitionResponse( shrineQuery.networkId, shrineQuery.name, shrineQuery.username, shrineQuery.dateCreated, //TODO: I2b2 or Shrine format? shrineQuery.queryDefinition.toI2b2String) } resultOption.getOrElse(ErrorResponse(QueryNotInDatabase(request))) } } case class QueryNotInDatabase(request:AbstractReadQueryDefinitionRequest) extends AbstractProblem(ProblemSources.Hub) { - override def summary: String = s"Couldn't find query with network id: ${request.queryId}" + //todo on which adapter? + override val summary: String = s"Couldn't find query with network id: ${request.queryId}" + override val description:String = "" } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala index c4ac7fe46..9b25ed779 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala @@ -1,458 +1,459 @@ package net.shrine.adapter.dao.squeryl import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemSources} import org.squeryl.Query import javax.xml.datatype.XMLGregorianCalendar import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.model.BreakdownResultRow import net.shrine.adapter.dao.model.CountRow import net.shrine.adapter.dao.model.ObfuscatedPair import net.shrine.adapter.dao.model.PrivilegedUser import net.shrine.adapter.dao.model.QueryResultRow import net.shrine.adapter.dao.model.ShrineError import net.shrine.adapter.dao.model.ShrineQuery import net.shrine.adapter.dao.model.ShrineQueryResult import net.shrine.adapter.dao.model.squeryl.SquerylBreakdownResultRow import net.shrine.adapter.dao.model.squeryl.SquerylCountRow import net.shrine.adapter.dao.model.squeryl.SquerylQueryResultRow import net.shrine.adapter.dao.model.squeryl.SquerylShrineError import net.shrine.adapter.dao.model.squeryl.SquerylShrineQuery import net.shrine.adapter.dao.squeryl.tables.Tables import net.shrine.dao.DateHelpers import net.shrine.dao.squeryl.SquerylInitializer import net.shrine.dao.squeryl.SquerylEntryPoint import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.I2b2ResultEnvelope import net.shrine.protocol.QueryResult import net.shrine.protocol.ResultOutputType import net.shrine.protocol.query.QueryDefinition import net.shrine.util.XmlDateHelper import scala.util.Try import net.shrine.adapter.dao.model.squeryl.SquerylQueryResultRow import net.shrine.adapter.dao.model.squeryl.SquerylPrivilegedUser /** * @author clint * @since May 22, 2013 */ final class SquerylAdapterDao(initializer: SquerylInitializer, tables: Tables)(implicit breakdownTypes: Set[ResultOutputType]) extends AdapterDao with Loggable { initializer.init() override def inTransaction[T](f: => T): T = SquerylEntryPoint.inTransaction { f } import SquerylEntryPoint._ override def flagQuery(networkQueryId: Long, flagMessage: Option[String]): Unit = mutateFlagField(networkQueryId, newIsFlagged = true, flagMessage) override def unFlagQuery(networkQueryId: Long): Unit = mutateFlagField(networkQueryId, newIsFlagged = false, None) private def mutateFlagField(networkQueryId: Long, newIsFlagged: Boolean, newFlagMessage: Option[String]): Unit = { inTransaction { update(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId). set(queryRow.isFlagged := newIsFlagged, queryRow.flagMessage := newFlagMessage) } } } override def storeResults( authn: AuthenticationInfo, masterId: String, networkQueryId: Long, queryDefinition: QueryDefinition, rawQueryResults: Seq[QueryResult], obfuscatedQueryResults: Seq[QueryResult], failedBreakdownTypes: Seq[ResultOutputType], mergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Unit = { inTransaction { val insertedQueryId = insertQuery(masterId, networkQueryId, authn, queryDefinition, isFlagged = false, hasBeenRun = true, flagMessage = None) val insertedQueryResultIds = insertQueryResults(insertedQueryId, rawQueryResults) storeCountResults(rawQueryResults, obfuscatedQueryResults, insertedQueryResultIds) storeErrorResults(rawQueryResults, insertedQueryResultIds) storeBreakdownFailures(failedBreakdownTypes.toSet, insertedQueryResultIds) insertBreakdownResults(insertedQueryResultIds, mergedBreakdowns, obfuscatedBreakdowns) } } private[adapter] def storeCountResults(raw: Seq[QueryResult], obfuscated: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val notErrors = raw.filter(!_.isError) val obfuscatedNotErrors = obfuscated.filter(!_.isError) if(notErrors.size > 1) { warn(s"Got ${notErrors.size} raw (hopefully-)count results; more than 1 is unusual.") } if(obfuscatedNotErrors.size > 1) { warn(s"Got ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; more than 1 is unusual.") } if(notErrors.size != obfuscatedNotErrors.size) { warn(s"Got ${notErrors.size} raw and ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; that these numbers are different is unusual.") } import ResultOutputType.PATIENT_COUNT_XML def isCount(qr: QueryResult): Boolean = qr.resultType == Some(PATIENT_COUNT_XML) inTransaction { //NB: Take the count/setSize from the FIRST PATIENT_COUNT_XML QueryResult, //though the same count should be there for all of them, if there are more than one for { Seq(insertedCountQueryResultId) <- insertedIds.get(PATIENT_COUNT_XML) notError <- notErrors.find(isCount) //NB: Find a count result, just to be sure obfuscatedNotError <- obfuscatedNotErrors.find(isCount) //NB: Find a count result, just to be sure } { insertCountResult(insertedCountQueryResultId, notError.setSize, obfuscatedNotError.setSize) } } } private[adapter] def storeErrorResults(results: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val errors = results.filter(_.isError) val insertedErrorResultIds = insertedIds.getOrElse(ResultOutputType.ERROR,Nil) val insertedIdsToErrors = insertedErrorResultIds zip errors inTransaction { for { (insertedErrorResultId, errorQueryResult) <- insertedIdsToErrors } { val pd = errorQueryResult.problemDigest.get //it's an error so it will have a problem digest insertErrorResult( insertedErrorResultId, errorQueryResult.statusMessage.getOrElse("Unknown failure"), pd.codec, pd.summary, pd.description, pd.details ) } } } private[adapter] def storeBreakdownFailures(failedBreakdownTypes: Set[ResultOutputType], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val insertedIdsForFailedBreakdownTypes = insertedIds.filterKeys(failedBreakdownTypes.contains) inTransaction { for { (failedBreakdownType, Seq(resultId)) <- insertedIdsForFailedBreakdownTypes } { //todo propagate backwards to teh breakdown failure to create the corect problem object BreakdownFailure extends AbstractProblem(ProblemSources.Adapter) { - override def summary: String = s"Couldn't retrieve breakdown of type '$failedBreakdownType'" + override val summary: String = s"Couldn't retrieve breakdown of type '$failedBreakdownType'" + override val description:String = "" } val pd = BreakdownFailure.toDigest insertErrorResult( resultId, s"Couldn't retrieve breakdown of type '$failedBreakdownType'", pd.codec, pd.summary, pd.description, pd.details ) } } } override def findRecentQueries(howMany: Int): Seq[ShrineQuery] = { inTransaction { Queries.queriesForAllUsers.take(howMany).map(_.toShrineQuery).toSeq } } def findAllCounts():Seq[SquerylCountRow] = { inTransaction{ Queries.allCountResults.toSeq } } override def renameQuery(networkQueryId: Long, newName: String) { inTransaction { update(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId). set(queryRow.name := newName) } } } override def deleteQuery(networkQueryId: Long): Unit = { inTransaction { tables.shrineQueries.deleteWhere(_.networkId === networkQueryId) } } override def deleteQueryResultsFor(networkQueryId: Long): Unit = { inTransaction { val resultIdsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) => where(queryRow.networkId === networkQueryId). select(resultRow.id). on(queryRow.id === resultRow.queryId) }.toSet tables.queryResults.deleteWhere(_.id in resultIdsForNetworkQueryId) } } override def isUserLockedOut(authn: AuthenticationInfo, defaultThreshold: Int): Boolean = Try { inTransaction { val privilegedUserOption = Queries.privilegedUsers(authn.domain, authn.username).singleOption val threshold = privilegedUserOption.map(_.threshold).getOrElse(defaultThreshold.intValue) val thirtyDaysInThePast: XMLGregorianCalendar = DateHelpers.daysFromNow(-30) val overrideDate: XMLGregorianCalendar = privilegedUserOption.map(_.toPrivilegedUser).flatMap(_.overrideDate).getOrElse(thirtyDaysInThePast) val counts: Seq[Long] = Queries.repeatedResults(authn.domain, authn.username, overrideDate).toSeq.sorted val repeatedResultCount: Long = counts.lastOption.getOrElse(0L) val result = repeatedResultCount > threshold debug(s"User ${authn.domain}:${authn.username} locked out? $result") result } }.getOrElse(false) override def insertQuery(localMasterId: String, networkId: Long, authn: AuthenticationInfo, queryDefinition: QueryDefinition, isFlagged: Boolean, hasBeenRun: Boolean, flagMessage: Option[String]): Int = { inTransaction { val inserted = tables.shrineQueries.insert(new SquerylShrineQuery( 0, localMasterId, networkId, authn.username, authn.domain, XmlDateHelper.now, isFlagged, flagMessage, hasBeenRun, queryDefinition)) inserted.id } } /** * Insert rows into QueryResults, one for each QueryResult in the passed RunQueryResponse * Inserted rows are 'children' of the passed ShrineQuery (ie, they are the results of the query) */ override def insertQueryResults(parentQueryId: Int, results: Seq[QueryResult]): Map[ResultOutputType, Seq[Int]] = { def execTime(result: QueryResult): Option[Long] = { //TODO: How are locales handled here? Do we care? def toMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis for { start <- result.startDate end <- result.endDate } yield toMillis(end) - toMillis(start) } val typeToIdTuples = inTransaction { for { result <- results resultType = result.resultType.getOrElse(ResultOutputType.ERROR) //TODO: under what circumstances can QueryResults NOT have start and end dates set? elapsed = execTime(result) } yield { val lastInsertedQueryResultRow = tables.queryResults.insert(new SquerylQueryResultRow(0, result.resultId, parentQueryId, resultType, result.statusType, elapsed, XmlDateHelper.now)) (resultType, lastInsertedQueryResultRow.id) } } typeToIdTuples.groupBy { case (resultType, _) => resultType }.mapValues(_.map { case (_, count) => count }) } override def insertCountResult(resultId: Int, originalCount: Long, obfuscatedCount: Long) { //NB: Squeryl steers us toward inserting with dummy ids :( inTransaction { tables.countResults.insert(new SquerylCountRow(0, resultId, originalCount, obfuscatedCount, XmlDateHelper.now)) } } override def insertBreakdownResults(parentResultIds: Map[ResultOutputType, Seq[Int]], originalBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) { def merge(original: I2b2ResultEnvelope, obfuscated: I2b2ResultEnvelope): Map[String, ObfuscatedPair] = { Map.empty ++ (for { (key, originalValue) <- original.data obfuscatedValue <- obfuscated.data.get(key) } yield (key, ObfuscatedPair(originalValue, obfuscatedValue))) } inTransaction { for { (resultType, Seq(resultId)) <- parentResultIds if resultType.isBreakdown originalBreakdown <- originalBreakdowns.get(resultType) obfuscatedBreakdown <- obfuscatedBreakdowns.get(resultType) (key, ObfuscatedPair(original, obfuscated)) <- merge(originalBreakdown, obfuscatedBreakdown) } { tables.breakdownResults.insert(SquerylBreakdownResultRow(0, resultId, key, original, obfuscated)) } } } override def insertErrorResult(parentResultId: Int, errorMessage: String, codec:String, summary:String, digestDescription:String,details:String) { //NB: Squeryl steers us toward inserting with dummy ids :( inTransaction { tables.errorResults.insert(SquerylShrineError(0, parentResultId, errorMessage, codec, summary, digestDescription, details)) } } override def findQueryByNetworkId(networkQueryId: Long): Option[ShrineQuery] = { inTransaction { Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery) } } override def findQueriesByUserAndDomain(domain: String, username: String, howMany: Int): Seq[ShrineQuery] = { inTransaction { Queries.queriesForUser(username, domain).take(howMany).toSeq.map(_.toShrineQuery) } } override def findResultsFor(networkQueryId: Long): Option[ShrineQueryResult] = { inTransaction { val breakdownRowsByType = Queries.breakdownResults(networkQueryId).toSeq.groupBy { case (outputType, _) => outputType.toQueryResultRow.resultType }.mapValues(_.map { case (_, row) => row.toBreakdownResultRow }) val queryRowOption = Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery) val countRowOption = Queries.countResults(networkQueryId).headOption.map(_.toCountRow) val queryResultRows = Queries.resultsForQuery(networkQueryId).toSeq.map(_.toQueryResultRow) val errorResultRows = Queries.errorResults(networkQueryId).toSeq.map(_.toShrineError) for { queryRow <- queryRowOption countRow <- countRowOption shrineQueryResult <- ShrineQueryResult.fromRows(queryRow, queryResultRows, countRow, breakdownRowsByType, errorResultRows) } yield { shrineQueryResult } } } /** * @author clint * @since Nov 19, 2012 */ object Queries { def privilegedUsers(domain: String, username: String): Query[SquerylPrivilegedUser] = { from(tables.privilegedUsers) { user => where(user.username === username and user.domain === domain).select(user) } } def repeatedResults(domain: String, username: String, overrideDate: XMLGregorianCalendar): Query[Long] = { val counts = join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) => where(queryRow.username === username and queryRow.domain === domain and (countRow.originalValue <> 0L) and queryRow.dateCreated > DateHelpers.toTimestamp(overrideDate)). groupBy(countRow.originalValue). compute(count(countRow.originalValue)). on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId) } //Filter for result counts > 1 from(counts) { cnt => where(cnt.measures gt 1).select(cnt.measures) } } val queriesForAllUsers: Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => select(queryRow).orderBy(queryRow.dateCreated.desc) } } //TODO: Find a way to parameterize on limit, to avoid building the query every time //TODO: limit def queriesForUser(username: String, domain: String): Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => where(queryRow.domain === domain and queryRow.username === username). select(queryRow). orderBy(queryRow.dateCreated.desc) } } val allCountResults: Query[SquerylCountRow] = { from(tables.countResults) { queryRow => select(queryRow) } } def queriesByNetworkId(networkQueryId: Long): Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId).select(queryRow) } } //TODO: Find out how to compose queries, to re-use queriesByNetworkId def queryNamesByNetworkId(networkQueryId: Long): Query[String] = { from(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId).select(queryRow.name) } } def resultsForQuery(networkQueryId: Long): Query[SquerylQueryResultRow] = { val resultsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) => where(queryRow.networkId === networkQueryId). select(resultRow). on(queryRow.id === resultRow.queryId) } from(resultsForNetworkQueryId)(select(_)) } def countResults(networkQueryId: Long): Query[SquerylCountRow] = { join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) => where(queryRow.networkId === networkQueryId). select(countRow). on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId) } } def errorResults(networkQueryId: Long): Query[SquerylShrineError] = { join(tables.shrineQueries, tables.queryResults, tables.errorResults) { (queryRow, resultRow, errorRow) => where(queryRow.networkId === networkQueryId). select(errorRow). on(queryRow.id === resultRow.queryId, resultRow.id === errorRow.resultId) } } //NB: using groupBy here is too much of a pain; do it 'manually' later def breakdownResults(networkQueryId: Long): Query[(SquerylQueryResultRow, SquerylBreakdownResultRow)] = { join(tables.shrineQueries, tables.queryResults, tables.breakdownResults) { (queryRow, resultRow, breakdownRow) => where(queryRow.networkId === networkQueryId). select((resultRow, breakdownRow)). on(queryRow.id === resultRow.queryId, resultRow.id === breakdownRow.resultId) } } } } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala index e76c6d895..3e0b995d2 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala @@ -1,90 +1,91 @@ package net.shrine.adapter.service import net.shrine.log.Loggable import net.shrine.problem.{ProblemSources, AbstractProblem} import net.shrine.protocol.NodeId import net.shrine.protocol.Result import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.ErrorResponse import net.shrine.adapter.AdapterMap import net.shrine.crypto.Verifier import scala.concurrent.duration.Duration import scala.concurrent.duration._ import net.shrine.protocol.BaseShrineResponse /** * Heart of the adapter. * * @author clint * @since Nov 14, 2013 */ final class AdapterService( nodeId: NodeId, signatureVerifier: Verifier, maxSignatureAge: Duration, adapterMap: AdapterMap) extends AdapterRequestHandler with Loggable { import AdapterService._ logStartup(adapterMap) override def handleRequest(message: BroadcastMessage): Result = { handleInvalidSignature(message).orElse { for { adapter <- adapterMap.adapterFor(message.request.requestType) } yield time(nodeId) { adapter.perform(message) } }.getOrElse { Result(nodeId, 0.milliseconds, ErrorResponse(s"Unknown request type '${message.request.requestType}'")) } } /** * @return None if the signature is fine, Some(result with an ErrorResponse) if not */ private def handleInvalidSignature(message: BroadcastMessage): Option[Result] = { val (sigIsValid, elapsed) = time(signatureVerifier.verifySig(message, maxSignatureAge)) if(sigIsValid) { None } else { info(s"Incoming message had invalid signature: $message") Some(Result(nodeId, elapsed.milliseconds, ErrorResponse(CouldNotVerifySignature(message)))) } } } object AdapterService extends Loggable { private def logStartup(adapterMap: AdapterMap) { info("Adapter service initialized, will respond to the following queries: ") val sortedByReqType = adapterMap.requestsToAdapters.toSeq.sortBy { case (k, _) => k } sortedByReqType.foreach { case (requestType, adapter) => info(s" $requestType:\t(${adapter.getClass.getSimpleName})") } } private[service] def time[T](f: => T): (T, Long) = { val start = System.currentTimeMillis val result = f val elapsed = System.currentTimeMillis - start (result, elapsed) } private[service] def time(nodeId: NodeId)(f: => BaseShrineResponse): Result = { val (response, elapsed) = time(f) Result(nodeId, elapsed.milliseconds, response) } } case class CouldNotVerifySignature(message: BroadcastMessage) extends AbstractProblem(ProblemSources.Adapter){ - override def summary: String = s"Incoming message had invalid signature." - override def details: String = s"Signature:\n${message.signature}" + override val summary: String = s"Incoming message had invalid signature." + override val description: String = s"An incoming message from the hub had an invalid signature." + override val details: String = s"${super.details}\nSignature:\n${message.signature}" } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala index 561c873fb..410c82705 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/AdapterTest.scala @@ -1,78 +1,78 @@ package net.shrine.adapter import net.shrine.problem.ProblemNotYetEncoded import net.shrine.protocol.query.QueryDefinition import net.shrine.util.ShouldMatchersForJUnit -import net.shrine.protocol.BaseShrineResponse -import net.shrine.protocol.BroadcastMessage +import net.shrine.protocol.{RunQueryRequest, BaseShrineResponse, BroadcastMessage, DeleteQueryResponse, AuthenticationInfo, Credential, ErrorResponse, DeleteQueryRequest} import org.junit.Test -import net.shrine.protocol.DeleteQueryResponse -import net.shrine.protocol.AuthenticationInfo -import net.shrine.protocol.Credential -import net.shrine.protocol.ErrorResponse -import net.shrine.protocol.DeleteQueryRequest /** * @author clint * @since Mar 31, 2014 */ //noinspection UnitMethodIsParameterless final class AdapterTest extends ShouldMatchersForJUnit { private final class MockAdapter(toReturn: => BaseShrineResponse) extends Adapter { override protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse = toReturn } import scala.concurrent.duration._ private val lockedOutAuthn = AuthenticationInfo("d", "u", Credential("p", isToken = false)) private val networkAuthn = AuthenticationInfo("nd", "nu", Credential("np", isToken = false)) private val req = DeleteQueryRequest("pid", 1.second, lockedOutAuthn, 12345L) private val resp = DeleteQueryResponse(12345) @Test def testHandlesNonFailureCase: Unit = { val adapter = new MockAdapter(resp) adapter.perform(null) should equal(resp) } @Test def testHandlesLockoutCase: Unit = { - doErrorResponseTest(new AdapterLockoutException(lockedOutAuthn),classOf[AdapterLockout]) + doErrorResponseTest(new AdapterLockoutException(lockedOutAuthn,"test.com"),classOf[AdapterLockout]) } @Test def testHandlesCrcFailureCase: Unit = { val url = "http://example.com" doErrorResponseTest(CrcInvocationException(url, req, new Exception),classOf[CrcCouldNotBeInvoked]) } @Test def testHandlesMappingFailureCase: Unit = { - doErrorResponseTest(new AdapterMappingException(QueryDefinition("test query",None),"blarg", new Exception),classOf[AdapterMappingProblem]) + + val authn = AuthenticationInfo("some-domain", "some-user", Credential("some-password", isToken = false)) + val projectId = "projectId" + val queryDef = QueryDefinition("test query",None) + val runQueryRequest = RunQueryRequest(projectId, 1.millisecond, authn, Some("topicId"), Some("Topic Name"), Set.empty, queryDef) + + doErrorResponseTest(new AdapterMappingException(runQueryRequest,"blarg", new Exception),classOf[AdapterMappingProblem]) } @Test def testHandlesGeneralFailureCase: Unit = { doErrorResponseTest(new Exception("blerg"),classOf[ProblemNotYetEncoded]) } //noinspection ScalaUnreachableCode,RedundantBlock private def doErrorResponseTest(exception: Throwable,problemClass:Class[_]) = { val adapter = new MockAdapter(throw exception) val response = adapter.perform(BroadcastMessage(networkAuthn, req)) response match { case errorResponse:ErrorResponse => { val pd = errorResponse.problemDigest pd.codec should be (problemClass.getName) } case x => fail(s"$x is not an ErrorResponse") } } } \ No newline at end of file diff --git a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala index 14b9a33c6..91dfa92ae 100644 --- a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala +++ b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala @@ -1,100 +1,112 @@ package net.shrine.authorization import net.shrine.log.Loggable import net.shrine.problem.{LoggingProblemHandler, Problem, ProblemSources, AbstractProblem, ProblemDigest} -import scala.util.Try +import scala.util.{Failure, Success, Try} import net.shrine.client.HttpResponse import net.shrine.i2b2.protocol.pm.GetUserConfigurationRequest import net.shrine.i2b2.protocol.pm.User import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.ErrorResponse import scala.util.control.NonFatal /** * @author clint * @since Apr 5, 2013 */ trait PmAuthorizerComponent { self: PmHttpClientComponent with Loggable => import PmAuthorizerComponent._ //noinspection RedundantBlock object Pm { def parsePmResult(authn: AuthenticationInfo)(httpResponse: HttpResponse): Try[Either[ErrorResponse, User]] = { User.fromI2b2(httpResponse.body).map(Right(_)).recoverWith { case NonFatal(e) => { debug(s"Couldn't extract a User from '$httpResponse'") Try(Left(ErrorResponse.fromI2b2(httpResponse.body))) } }.recover { case NonFatal(e) => { val problem = CouldNotInterpretResponseFromPmCell(pmPoster.url,authn,httpResponse,e) LoggingProblemHandler.handleProblem(problem) Left(ErrorResponse(problem.summary,Some(problem))) } } } def authorize(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo): AuthorizationStatus = { val request = GetUserConfigurationRequest(authn) - val responseAttempt = Try { + val responseAttempt: Try[HttpResponse] = Try { debug(s"Authorizing with PM cell at ${pmPoster.url}") pmPoster.post(request.toI2b2String) } - val authStatusAttempt = responseAttempt.flatMap(parsePmResult(authn)).map { + val authStatusAttempt: Try[AuthorizationStatus with Product with Serializable] = responseAttempt.flatMap(parsePmResult(authn)).map { case Right(user) => { val managerUserOption = for { roles <- user.rolesByProject.get(projectId) if neededRoles.forall(roles.contains) } yield user managerUserOption.map(Authorized).getOrElse { NotAuthorized(MissingRequiredRoles(projectId,neededRoles,authn)) } } case Left(errorResponse) => { //todo remove when ErrorResponse gets its message info(s"ErrorResponse message '${errorResponse.errorMessage}' may not have carried through to the NotAuthorized object") NotAuthorized(errorResponse.problemDigest) } } - authStatusAttempt.getOrElse { - NotAuthorized(CouldNotReachPmCell(pmPoster.url,authn)) + authStatusAttempt match { + case Success(s) => s + case Failure(x) => NotAuthorized(CouldNotReachPmCell(pmPoster.url,authn,x)) } } } } object PmAuthorizerComponent { sealed trait AuthorizationStatus case class Authorized(user: User) extends AuthorizationStatus case class NotAuthorized(problemDigest: ProblemDigest) extends AuthorizationStatus { def toErrorResponse = ErrorResponse(problemDigest.summary,problemDigest) } object NotAuthorized { def apply(problem:Problem):NotAuthorized = NotAuthorized(problem.toDigest) } } case class MissingRequiredRoles(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep) { - override val summary: String = s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'" + override val summary: String = s"User ${authn.domain}:${authn.username} is missing roles in project '$projectId'" + + override val description:String = s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'" } -case class CouldNotReachPmCell(pmUrl:String,authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep) { - override def summary: String = s"Could not reach PM cell at $pmUrl for ${authn.domain}:${authn.username}" +case class CouldNotReachPmCell(pmUrl:String,authn: AuthenticationInfo,x:Throwable) extends AbstractProblem(ProblemSources.Qep) { + override val throwable = Option(x) + override val summary: String = s"Could not reach PM cell at $pmUrl for ${authn.domain}:${authn.username}" + override val description:String = s"Shrine encountered ${throwable.get} while attempting to reach the PM cell at $pmUrl for ${authn.domain}:${authn.username}." } case class CouldNotInterpretResponseFromPmCell(pmUrl:String,authn: AuthenticationInfo,httpResponse: HttpResponse,x:Throwable) extends AbstractProblem(ProblemSources.Qep) { - override def summary: String = s"Exception while interpreting response from PM cell at ${pmUrl} for ${authn.domain}:${authn.username}: $httpResponse" + override val throwable = Some(x) + override def summary: String = s"Could not interpret response from PM cell at ${pmUrl} for ${authn.domain}:${authn.username}" + + override def description: String = s"Shrine could not interpret the response from the PM cell at ${pmUrl} for ${authn.domain}:${authn.username}: due to ${throwable.get}" - override def throwable = Some(x) + override val details:String = + s"""${stamp.pretty} + |Response is $httpResponse + |${throwableDetail.getOrElse("")} + """.stripMargin } \ No newline at end of file diff --git a/commons/protocol/src/test/scala/net/shrine/protocol/QueryResultTest.scala b/commons/protocol/src/test/scala/net/shrine/protocol/QueryResultTest.scala index ef9c4099d..5b719b07e 100644 --- a/commons/protocol/src/test/scala/net/shrine/protocol/QueryResultTest.scala +++ b/commons/protocol/src/test/scala/net/shrine/protocol/QueryResultTest.scala @@ -1,497 +1,497 @@ package net.shrine.protocol import net.shrine.problem.{ProblemSources, AbstractProblem} import net.shrine.util.ShouldMatchersForJUnit import org.junit.Test import net.shrine.util.XmlUtil import net.shrine.util.XmlDateHelper import net.shrine.util.XmlGcEnrichments import scala.xml.NodeSeq /** * @author Bill Simons * @author clint * @since 8/19/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ //noinspection EmptyParenMethodAccessedAsParameterless,NameBooleanParameters final class QueryResultTest extends ShouldMatchersForJUnit with XmlRoundTripper[QueryResult] with I2b2SerializableValidator { private val date = XmlDateHelper.now private val resultId = 1L private val instanceId = 2L private val resultType = ResultOutputType.PATIENTSET private val setSize = 12L private val statusType = QueryResult.StatusType.Finished private val description = "description" private val statusMessage = "lakjdalsjd" private val queryResult = QueryResult(resultId, instanceId, Some(resultType), setSize, Option(date), Option(date), Option(description), statusType, Option(statusType.name)) import DefaultBreakdownResultOutputTypes.{ values => breakdownTypes, _ } private val resultWithBreakDowns = queryResult.copy( statusMessage = Some(statusMessage), breakdowns = Map(PATIENT_AGE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_AGE_COUNT_XML, Map("foo" -> 1L, "bar" -> 2L)), PATIENT_RACE_COUNT_XML -> I2b2ResultEnvelope(PATIENT_RACE_COUNT_XML, Map("nuh" -> 3L, "zuh" -> 4L)), PATIENT_VITALSTATUS_COUNT_XML -> I2b2ResultEnvelope(PATIENT_VITALSTATUS_COUNT_XML, Map("blarg" -> 5L, "glarg" -> 6L)), PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(PATIENT_GENDER_COUNT_XML, Map("huh" -> 7L, "yeah" -> 8)))) private val expectedWhenBreakdownsArePresent = XmlUtil.stripWhitespace { { resultId } { instanceId } { resultType.toXml } { setSize } { date } { date } { description } { statusType } { statusMessage } { PATIENT_AGE_COUNT_XML } bar 2 foo 1 { PATIENT_GENDER_COUNT_XML } huh 7 yeah 8 { PATIENT_RACE_COUNT_XML } nuh 3 zuh 4 { PATIENT_VITALSTATUS_COUNT_XML } blarg 5 glarg 6 }.toString private val expectedI2b2Xml = XmlUtil.stripWhitespace { { resultId } { instanceId } { description } 1 { resultType } LISTLAPatient set { setSize } { date } { date } { statusType } 3FINISHED }.toString private val expectedI2b2XmlWithBreakdowns = XmlUtil.stripWhitespace { { resultId } { instanceId } { description } { resultType.toI2b2 } { setSize } { date } { date } { statusType } 3FINISHED { PATIENT_AGE_COUNT_XML } bar 2 foo 1 { PATIENT_GENDER_COUNT_XML } huh 7 yeah 8 { PATIENT_RACE_COUNT_XML } nuh 3 zuh 4 { PATIENT_VITALSTATUS_COUNT_XML } blarg 5 glarg 6 }.toString private val expectedI2b2ErrorXml = XmlUtil.stripWhitespace { 0 0 { description } 0 ERROR { statusMessage } }.toString //NB: See https://open.med.harvard.edu/jira/browse/SHRINE-745 private val expectedI2b2IncompleteXml = XmlUtil.stripWhitespace { 0 0 { description } 0 INCOMPLETE { statusMessage } }.toString import scala.xml.XML.loadString //NB: See https://open.med.harvard.edu/jira/browse/SHRINE-745 @Test def testParseIncomplete() { val qr = QueryResult.fromI2b2(breakdownTypes.toSet)(loadString(expectedI2b2IncompleteXml)) qr.statusType should be(QueryResult.StatusType.Incomplete) } @Test def testElapsed() { queryResult.copy(startDate = None).elapsed should be(None) queryResult.copy(endDate = None).elapsed should be(None) queryResult.copy(startDate = None, endDate = None).elapsed should be(None) { val now = XmlDateHelper.now queryResult.copy(startDate = Some(now), endDate = Some(now)).elapsed should equal(Some(0L)) } { val start = XmlDateHelper.now val delta = 123L import XmlGcEnrichments._ import scala.concurrent.duration._ val end = start + delta.milliseconds queryResult.copy(startDate = Some(start), endDate = Some(end)).elapsed should equal(Some(delta)) } } @Test def testIsError() { queryResult.isError should be(false) queryResult.copy(statusType = QueryResult.StatusType.Processing).isError should be(false) queryResult.copy(statusType = QueryResult.StatusType.Finished).isError should be(false) queryResult.copy(statusType = QueryResult.StatusType.Queued).isError should be(false) queryResult.copy(statusType = QueryResult.StatusType.Incomplete).isError should be(false) queryResult.copy(statusType = QueryResult.StatusType.Error).isError should be(true) } @Test def testToXml() { val queryResultForShrine = queryResult.copy(statusMessage = Some(statusMessage)) val expectedWhenNoBreakdowns = XmlUtil.stripWhitespace { { resultId } { instanceId } { resultType.toXml } { setSize } { date } { date } { description } { statusType } { statusMessage } }.toString queryResultForShrine.copy(statusMessage = Some(statusMessage)).toXmlString should equal(expectedWhenNoBreakdowns) val expectedWhenNoStartDate = XmlUtil.stripWhitespace { { resultId } { instanceId } { resultType.toXml } { setSize } { date } { description } { statusType } { statusMessage } }.toString queryResultForShrine.copy(startDate = None).toXmlString should equal(expectedWhenNoStartDate) val expectedWhenNoEndDate = XmlUtil.stripWhitespace { { resultId } { instanceId } { resultType.toXml } { setSize } { date } { description } { statusType } { statusMessage } }.toString queryResultForShrine.copy(endDate = None).toXmlString should equal(expectedWhenNoEndDate) val expectedWhenNoDescription = XmlUtil.stripWhitespace { { resultId } { instanceId } { resultType.toXml } { setSize } { date } { date } { statusType } { statusMessage } }.toString queryResultForShrine.copy(description = None).toXmlString should equal(expectedWhenNoDescription) val expectedWhenNoStatusMessage = XmlUtil.stripWhitespace { { resultId } { instanceId } { resultType.toXml } { setSize } { date } { date } { description } { statusType } }.toString queryResult.copy(statusMessage = None).toXmlString should equal(expectedWhenNoStatusMessage) resultWithBreakDowns.toXmlString should equal(expectedWhenBreakdownsArePresent) } @Test def testFromXml() { QueryResult.fromXml(breakdownTypes.toSet)(loadString(expectedWhenBreakdownsArePresent)) should equal(resultWithBreakDowns) } @Test def testShrineRoundTrip() = { QueryResult.fromXml(breakdownTypes.toSet)(resultWithBreakDowns.toXml) should equal(resultWithBreakDowns) } private def compareIgnoringBreakdowns(actual: QueryResult, expected: QueryResult) { //Ignore breakdowns field, since this can't be serialized to i2b2 format as part of a actual.breakdowns should equal(Map.empty) actual.description should equal(expected.description) actual.endDate should equal(expected.endDate) actual.instanceId should equal(expected.instanceId) actual.resultId should equal(expected.resultId) actual.resultType should equal(expected.resultType) actual.setSize should equal(expected.setSize) actual.startDate should equal(expected.startDate) actual.statusMessage should equal(expected.statusMessage) actual.statusType should equal(expected.statusType) } @Test def testI2b2RoundTrip() = { //NB: Needed because i2b2 handles status messages differently. In the error case, statusMessage is //descriptive; otherwise, it's the all-caps name of the status type. This is different from how //Shrine creates and parses statusMessage XML, so we need a new QueryResult here. (Previously, we //could use the same one, since we were ignoring statusMessage and description when unmarshalling //from i2b2 format.) val newStatusMessage = Some(resultWithBreakDowns.statusType.name) val resultWithBreakDownsForI2b2 = resultWithBreakDowns.copy(statusMessage = newStatusMessage) val unmarshalled = QueryResult.fromI2b2(breakdownTypes.toSet)(resultWithBreakDownsForI2b2.toI2b2) compareIgnoringBreakdowns(unmarshalled, resultWithBreakDownsForI2b2) } @Test def testFromI2b2() { compareIgnoringBreakdowns(QueryResult.fromI2b2(breakdownTypes.toSet)(loadString(expectedI2b2Xml)), queryResult) } @Test def testFromI2b2WithErrors() { val errorResult = QueryResult.errorResult(Some(description), statusMessage) val actual = QueryResult.fromI2b2(breakdownTypes.toSet)(loadString(expectedI2b2ErrorXml)) compareIgnoringBreakdowns(actual, errorResult) } @Test def testToI2b2() { queryResult.toI2b2String should equal(expectedI2b2Xml) } @Test def testToI2b2WithBreakdowns() { resultWithBreakDowns.toI2b2String should equal(expectedI2b2XmlWithBreakdowns) } @Test def testToI2b2AllStatusTypes(): Unit = { def doTest(statusType: QueryResult.StatusType) { val expectedI2b2Xml = XmlUtil.stripWhitespace { { resultId } { instanceId } { description } { resultType.toI2b2 } { setSize } { date } { date } { statusType } { statusType.i2b2Id.get }{ statusType } }.toString val result = queryResult.copy(statusType = statusType) result.toI2b2String should equal(expectedI2b2Xml) } import QueryResult.StatusType //NB: Error is tested by testToI2b2WithErrors() val nonErrorStatuses = StatusType.values.toSet - StatusType.Error for (statusType <- nonErrorStatuses) { doTest(statusType) } } @Test def testToI2b2WithErrors(): Unit = { QueryResult.errorResult(Some(description), statusMessage).toI2b2String } @Test def testWithErrorsAndProblemDigest():Unit = { - case class TestProblem(override val summary: String = "test summary") extends AbstractProblem(ProblemSources.Unknown) + case class TestProblem(override val summary: String = "test summary",override val description:String = "test description") extends AbstractProblem(ProblemSources.Unknown) val testProblem:TestProblem = new TestProblem() val expected: NodeSeq = 0 0 description 0 ERROR lakjdalsjd {testProblem.problemName}

{testProblem.summary} {testProblem.description}
{testProblem.details}
val actual = QueryResult.errorResult( Some(description), statusMessage, Option(testProblem)) val i2b2Xml: NodeSeq = actual.toI2b2 val pretty = new scala.xml.PrettyPrinter(80, 2) pretty.formatNodes(i2b2Xml) should equal(pretty.formatNodes(expected)) val fromI2b2 = QueryResult.fromI2b2(Set.empty)(i2b2Xml) fromI2b2 should equal(actual) val xml = actual.toXml val fromXml = QueryResult.fromXml(Set.empty)(xml) fromXml should equal(actual) } } \ No newline at end of file diff --git a/commons/util/src/main/scala/net/shrine/problem/Problem.scala b/commons/util/src/main/scala/net/shrine/problem/Problem.scala index ba95112a7..84e6630e5 100644 --- a/commons/util/src/main/scala/net/shrine/problem/Problem.scala +++ b/commons/util/src/main/scala/net/shrine/problem/Problem.scala @@ -1,129 +1,128 @@ package net.shrine.problem import java.net.InetAddress import java.util.Date import net.shrine.log.Loggable import net.shrine.serialization.{XmlUnmarshaller, XmlMarshaller} import scala.xml.{Node, NodeSeq} /** * Describes what information we have about a problem at the site in code where we discover it. * * @author david * @since 8/6/15 */ trait Problem { def summary:String def problemName = getClass.getName def throwable:Option[Throwable] = None def stamp:Stamp - def description = s"${stamp.pretty}" + def description:String //todo stack trace as xml elements? would be easy def throwableDetail = throwable.map(x => s"${x.getClass.getName} ${x.getMessage}\n${x.getStackTrace.mkString(sys.props("line.separator"))}") - def details:String = s"${throwableDetail.getOrElse("")}" + def details:String = s"${stamp.pretty}\n${throwableDetail.getOrElse("")}" def toDigest:ProblemDigest = ProblemDigest(problemName,summary,description,details) } case class ProblemDigest(codec:String,summary:String,description:String,details:String) extends XmlMarshaller { override def toXml: Node = { {codec} {summary} {description}
{details}
} } object ProblemDigest extends XmlUnmarshaller[ProblemDigest] with Loggable { def apply(oldMessage:String):ProblemDigest = { val ex = new IllegalStateException(s"'$oldMessage' detected, not in codec. Please report this problem and stack trace to Shrine dev.") ex.fillInStackTrace() warn(ex) ProblemDigest("ProblemNotInCodec",oldMessage,"","") } override def fromXml(xml: NodeSeq): ProblemDigest = { val problemNode = xml \ "problem" require(problemNode.nonEmpty,s"No problem tag in $xml") def extractText(tagName:String) = (problemNode \ tagName).text val codec = extractText("codec") val summary = extractText("summary") val description = extractText("description") val details = extractText("details") ProblemDigest(codec,summary,description,details) } } case class Stamp(host:InetAddress,time:Long,source:ProblemSources.ProblemSource) { def pretty = s"at ${new Date(time)} on $host ${source.pretty}" } object Stamp { def apply(source:ProblemSources.ProblemSource): Stamp = Stamp(InetAddress.getLocalHost,System.currentTimeMillis(),source) } abstract class AbstractProblem(source:ProblemSources.ProblemSource) extends Problem { val stamp = Stamp(source) } trait ProblemHandler { def handleProblem(problem:Problem) } /** * An example problem handler */ object LoggingProblemHandler extends ProblemHandler with Loggable { override def handleProblem(problem: Problem): Unit = { problem.throwable.fold(error(problem.toString))(throwable => error(problem.toString,throwable) ) } } object ProblemSources{ sealed trait ProblemSource { -//todo name without $ - def pretty = getClass.getSimpleName.drop(1) + def pretty = getClass.getSimpleName.dropRight(1) } case object Adapter extends ProblemSource case object Hub extends ProblemSource case object Qep extends ProblemSource case object Dsa extends ProblemSource case object Unknown extends ProblemSource def problemSources = Set(Adapter,Hub,Qep,Dsa,Unknown) } case class ProblemNotYetEncoded(summary:String,t:Throwable) extends AbstractProblem(ProblemSources.Unknown){ override val throwable = Some(t) - override val description = s"${super.description} . This error is not yet in the codec. Please report the stack trace to the Shrine development team at TODO" + override val description = s"This problem has not yet been codified in Shrine. Please report all information including the stack trace to the Shrine development team at TODO" } object ProblemNotYetEncoded { def apply(summary:String):ProblemNotYetEncoded = { - val x = new IllegalStateException(s"$summary , is not yet in the codec.") + val x = new IllegalStateException(s"$summary , not yet codified in Shrine.") x.fillInStackTrace() new ProblemNotYetEncoded(summary,x) } } diff --git a/commons/util/src/test/scala/net/shrine/problem/ProblemDigestTest.scala b/commons/util/src/test/scala/net/shrine/problem/ProblemDigestTest.scala index 7e1feee37..b48430369 100644 --- a/commons/util/src/test/scala/net/shrine/problem/ProblemDigestTest.scala +++ b/commons/util/src/test/scala/net/shrine/problem/ProblemDigestTest.scala @@ -1,24 +1,26 @@ package net.shrine.problem import net.shrine.util.ShouldMatchersForJUnit /** * * @author dwalend * @since 1.20 */ final class ProblemDigestTest extends ShouldMatchersForJUnit { def testRoundTrip() = { val problemDigest = ProblemDigest(getClass.getName,"Test problem","A problem for testing","We use this problem for testing. Don't worry about it") val xml = problemDigest.toXml val fromXml = ProblemDigest.fromXml(xml) fromXml should be(problemDigest) } } object TestProblem extends AbstractProblem(ProblemSources.Unknown) { - override def summary: String = "Test Problem" + override def summary: String = "Test problem summary" + + override def description: String = "Test problem description" } diff --git a/commons/util/src/test/scala/net/shrine/problem/ProblemHandlerTest.scala b/commons/util/src/test/scala/net/shrine/problem/ProblemHandlerTest.scala index 0d693e5f4..533bfbf19 100644 --- a/commons/util/src/test/scala/net/shrine/problem/ProblemHandlerTest.scala +++ b/commons/util/src/test/scala/net/shrine/problem/ProblemHandlerTest.scala @@ -1,67 +1,69 @@ package net.shrine.problem import net.shrine.log.Loggable import net.shrine.util.ShouldMatchersForJUnit import org.apache.log4j.Level import scala.collection.mutable.{Map => MMap} /** * * @author dwalend * @since 1.20 */ final class ProblemHandlerTest extends ShouldMatchersForJUnit { def testProblemHandler() = { val loggable = new MockLoggable case class TestProblem(nodeName:String,summary:String,exception:Exception) extends AbstractProblem(ProblemSources.Hub) { val message = s"TestProblem involving $nodeName" + val description = summary + message + override def throwable = Some(exception) } val fakeException = new RuntimeException("test exception") fakeException.fillInStackTrace() val problem = TestProblem("testProblem","Problem created for testing",fakeException) val problemHandler = LoggingProblemHandler problemHandler.handleProblem(problem) loggable.loggedAt(Level.ERROR) should be (right = true) } private final class MockLoggable extends Loggable { val loggedAt: MMap[Level, Boolean] = MMap.empty override def debug(s: => Any) { loggedAt(Level.DEBUG) = true super.debug(s) } override def info(s: => Any) { loggedAt(Level.INFO) = true super.info(s) } override def warn(s: => Any) { loggedAt(Level.WARN) = true super.warn(s) } override def error(s: => Any) { loggedAt(Level.ERROR) = true super.error(s) } } } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala index 3a797c3a5..e5766e97a 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala @@ -1,115 +1,116 @@ package net.shrine.aggregation import com.sun.mail.iap.ConnectionException import net.shrine.broadcaster.CouldNotParseResultsException import net.shrine.log.Loggable import net.shrine.problem.{ProblemNotYetEncoded, ProblemSources, AbstractProblem} import scala.concurrent.duration.Duration import net.shrine.protocol.ErrorResponse import net.shrine.protocol.Failure import net.shrine.protocol.NodeId import net.shrine.protocol.Result import net.shrine.protocol.SingleNodeResult import net.shrine.protocol.Timeout import net.shrine.protocol.BaseShrineResponse /** * * @author Clint Gilbert * @since Sep 16, 2011 * * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Represents the basic aggregation strategy shared by several aggregators: * - Parses a sequence of SpinResultEntries into a sequence of some * combination of valid responses, ErrorResponses, and invalid * responses (cases where ShrineResponse.fromXml returns None) * - Filters the valid responses, weeding out responses that aren't of * the expected type * Invokes an abstract method with the valid responses, errors, and * invalid responses. * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class BasicAggregator[T <: BaseShrineResponse: Manifest] extends Aggregator with Loggable { private[aggregation] def isAggregatable(response: BaseShrineResponse): Boolean = { manifest[T].runtimeClass.isAssignableFrom(response.getClass) } import BasicAggregator._ override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): BaseShrineResponse = { val resultsOrErrors: Iterable[ParsedResult[T]] = { for { result <- results } yield { val parsedResponse: ParsedResult[T] = result match { case Result(origin, _, errorResponse: ErrorResponse) => Error(Option(origin), errorResponse) case Result(origin, elapsed, response: T) if isAggregatable(response) => Valid(origin, elapsed, response) case Timeout(origin) => Error(Option(origin), ErrorResponse(s"Timed out querying node '${origin.name}'")) //todo failure becomes an ErrorResponse and Error status type here. And the stack trace gets eaten. case Failure(origin, cause) => { cause match { case cx: ConnectionException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, cx))) case cnprx:CouldNotParseResultsException => { if(cnprx.statusCode >= 400) Error(Option(origin), ErrorResponse(HttpErrorResponseProblem(cnprx))) else Error(Option(origin), ErrorResponse(CouldNotParseResultsProblem(cnprx))) } case x => Error(Option(origin), ErrorResponse(ProblemNotYetEncoded(s"Failure querying node ${origin.name}",x))) } } case _ => Invalid(None, s"Unexpected response in $getClass:\r\n $result") } parsedResponse } } val invalidResponses = resultsOrErrors.collect { case invalid: Invalid => invalid } val validResponses = resultsOrErrors.collect { case valid: Valid[T] => valid } val errorResponses: Iterable[Error] = resultsOrErrors.collect { case error: Error => error } //Log all parsing errors invalidResponses.map(_.errorMessage).foreach(this.error(_)) val previouslyDetectedErrors = errors.map(Error(None, _)) makeResponseFrom(validResponses, errorResponses ++ previouslyDetectedErrors, invalidResponses) } private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse } object BasicAggregator { private[aggregation] sealed abstract class ParsedResult[+T] private[aggregation] final case class Valid[T](origin: NodeId, elapsed: Duration, response: T) extends ParsedResult[T] private[aggregation] final case class Error(origin: Option[NodeId], response: ErrorResponse) extends ParsedResult[Nothing] private[aggregation] final case class Invalid(origin: Option[NodeId], errorMessage: String) extends ParsedResult[Nothing] } case class CouldNotConnectToAdapter(origin:NodeId,cx:ConnectionException) extends AbstractProblem(ProblemSources.Hub) { - override val summary: String = s"Could not connect to adapter at ${origin.name}." override val throwable = Some(cx) + override val summary: String = s"Shrine could not connect to the adapter at ${origin.name}." + override val description: String = s"Shrine could not connect to the adapter at ${origin.name} due to ${throwable.get}" } case class CouldNotParseResultsProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) { - override val summary: String = s"Caught a ${cnrpx.cause.getClass.getSimpleName} while parsing a response from ${cnrpx.url}" override val throwable = Some(cnrpx) - override val description = s"${super.description} While parsing a response from ${cnrpx.url} with http code ${cnrpx.url} caught '${cnrpx.cause.getMessage}'" + override val summary: String = s"Caught a ${cnrpx.cause.getClass.getSimpleName} while parsing a response from ${cnrpx.url}" + override val description = s"While parsing a response from ${cnrpx.url} with http code ${cnrpx.url} caught '${cnrpx.cause.getMessage}'" override val details = s"${super.details}\n\nMessage body is: \n ${cnrpx.body}" } case class HttpErrorResponseProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) { - override val summary: String = s"Observed ${cnrpx.statusCode} and caught a ${cnrpx.cause.getClass.getSimpleName} while parsing a response from ${cnrpx.url}" override val throwable = Some(cnrpx) - override val description = s"${super.description} While parsing a response from ${cnrpx.url} with http code ${cnrpx.url} caught '${cnrpx.cause.getMessage}'" + override val summary: String = s"Observed ${cnrpx.statusCode} and caught a ${cnrpx.cause.getClass.getSimpleName} while parsing a response from ${cnrpx.url}" + override val description = s"Observed http status code ${cnrpx.statusCode} from ${cnrpx.url} and caught '${cnrpx.cause}'" override val details = s"${super.details}\n\nMessage body is: \n ${cnrpx.body}" } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala index 02d60db5f..53d723a0c 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala @@ -1,41 +1,43 @@ package net.shrine.aggregation import net.shrine.aggregation.BasicAggregator.{Invalid, Error, Valid} import net.shrine.problem.{ProblemSources, AbstractProblem} import net.shrine.protocol.ErrorResponse import net.shrine.protocol.BaseShrineResponse /** * * @author Clint Gilbert * @since Sep 16, 2011 * * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Extends BasicAggregator to ignore Errors and Invalid responses * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class IgnoresErrorsAggregator[T <: BaseShrineResponse : Manifest] extends BasicAggregator[T] { private[aggregation] override def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse = { //Filter out errors and invalid responses makeResponseFrom(validResponses) } //Default implementation, just returns first valid response, or if there are none, an ErrorResponse private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]]): BaseShrineResponse = { validResponses.map(_.response).toSet.headOption.getOrElse{ val problem = NoValidResponsesToAggregate() ErrorResponse(problem.summary,Some(problem)) } } } case class NoValidResponsesToAggregate() extends AbstractProblem(ProblemSources.Hub) { - override def summary: String = "No valid responses to aggregate" + override val summary: String = "No valid responses to aggregate" + + override val description:String = "The hub received no valid responses to aggregate" } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala index 10c0d530f..60cc945c3 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala @@ -1,54 +1,55 @@ package net.shrine.aggregation import net.shrine.problem.{ProblemSources, AbstractProblem} import net.shrine.protocol.ShrineResponse import net.shrine.aggregation.BasicAggregator.{Invalid, Error, Valid} import net.shrine.protocol.QueryResult /** * * @author Clint Gilbert * @since Sep 16, 2011 * * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Extends BasicAggregator to package Errors and Invalid responses into QueryResults * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class PackagesErrorsAggregator[T <: ShrineResponse : Manifest]( errorMessage: Option[String] = None, invalidMessage: Option[String] = None) extends BasicAggregator[T] { private[aggregation] def makeErrorResult(error: Error): QueryResult = { val Error(originOption, errorResponse) = error //Use node name as the description, to avoid giving the web UI more data than it can display val desc = originOption.map(_.name) QueryResult.errorResult(desc, errorMessage.getOrElse(errorResponse.errorMessage),errorResponse.problemDigest) } private[aggregation] def makeInvalidResult(invalid: Invalid): QueryResult = { val Invalid(originOption, errorMessage) = invalid //Use node name as the description, to avoid giving the web UI more data than it can display val desc = originOption.map(_.name) QueryResult.errorResult(desc, invalidMessage.getOrElse(errorMessage),Option(InvalidResultProblem(invalid))) } private[aggregation] final override def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): ShrineResponse = { makeResponse(validResponses, errorResponses.map(makeErrorResult), invalidResponses.map(makeInvalidResult)) } private[aggregation] def makeResponse(validResponses: Iterable[Valid[T]], errorResponses: Iterable[QueryResult], invalidResponses: Iterable[QueryResult]): ShrineResponse } //todo Problem these two should really propagate problems from the Error or the Invalid result case class InvalidResultProblem(invalid:Invalid) extends AbstractProblem(ProblemSources.Hub) { - override def summary: String = s"${invalid.errorMessage} on ${invalid.origin.getOrElse("unknown node")}" + override def summary: String = s"The hub received an invalid response from ${invalid.origin.getOrElse("an unknown node")}" + override def description: String = s"${invalid.errorMessage} from ${invalid.origin.getOrElse("an unknown node")}" } \ No newline at end of file