diff --git a/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala b/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala index fe825dee0..5d956e957 100644 --- a/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala +++ b/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala @@ -1,133 +1,131 @@ package net.shrine.adapter.client import java.net.{SocketTimeoutException, URL} import org.xml.sax.SAXParseException import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.blocking import scala.concurrent.duration.Duration import scala.concurrent.duration.DurationInt import scala.util.control.NonFatal import scala.xml.{NodeSeq, XML} import com.sun.jersey.api.client.ClientHandlerException import net.shrine.client.{HttpResponse, Poster, TimeoutException} import net.shrine.problem.{AbstractProblem, ProblemNotYetEncoded, ProblemSources} import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.ErrorResponse import net.shrine.protocol.NodeId import net.shrine.protocol.Result import scala.util.{Failure, Success, Try} import net.shrine.protocol.ResultOutputType /** * @author clint * @since Nov 15, 2013 * * */ final class RemoteAdapterClient private (nodeId:NodeId,val poster: Poster, val breakdownTypes: Set[ResultOutputType]) extends AdapterClient { import RemoteAdapterClient._ //NB: Overriding apply in the companion object screws up case-class code generation for some reason, so //we add the would-have-been-generated methods here override def toString = s"RemoteAdapterClient($poster)" override def hashCode: Int = 31 * (if(poster == null) 1 else poster.hashCode) override def equals(other: Any): Boolean = other match { case that: RemoteAdapterClient if that != null => poster == that.poster case _ => false } def url:Option[URL] = Some(new URL(poster.url)) //TODO: Revisit this import scala.concurrent.ExecutionContext.Implicits.global override def query(request: BroadcastMessage): Future[Result] = { val requestXml = request.toXml Future { blocking { val response: HttpResponse = poster.post(requestXml.toString()) interpretResponse(response) } }.recover { case e if isTimeout(e) => throw new TimeoutException(s"Invoking adapter at ${poster.url} timed out", e) } } def interpretResponse(response:HttpResponse):Result = { if(response.statusCode <= 400){ val responseXml = response.body import scala.concurrent.duration._ //Should we know the NodeID here? It would let us make a better error response. Try(XML.loadString(responseXml)).flatMap(Result.fromXml(breakdownTypes)) match { case Success(result) => result case Failure(x) => { val errorResponse = x match { case sx: SAXParseException => ErrorResponse(CouldNotParseXmlFromAdapter(poster.url,response.statusCode,responseXml,sx)) case _ => ErrorResponse(ProblemNotYetEncoded(s"Couldn't understand response from adapter at '${poster.url}': $responseXml", x)) } Result(nodeId, 0.milliseconds, errorResponse) } } } else { Result(nodeId,0.milliseconds,ErrorResponse(HttpErrorCodeFromAdapter(poster.url,response.statusCode,response.body))) } } } object RemoteAdapterClient { def apply(nodeId:NodeId,poster: Poster, breakdownTypes: Set[ResultOutputType]): RemoteAdapterClient = { //NB: Replicate URL-munging that used to be performed by JerseyAdapterClient val posterToUse = { if(poster.url.endsWith("requests")) { poster } else { poster.mapUrl(_ + "/requests") } } new RemoteAdapterClient(nodeId,posterToUse, breakdownTypes) } def isTimeout(e: Throwable): Boolean = e match { case e: SocketTimeoutException => true case e: ClientHandlerException => { val cause = e.getCause cause != null && cause.isInstanceOf[SocketTimeoutException] } case _ => false } } case class HttpErrorCodeFromAdapter(url:String,statusCode:Int,responseBody:String) extends AbstractProblem(ProblemSources.Adapter) { override def summary: String = "Hub received a fatal error response" override def description: String = s"Hub received error code $statusCode from the adapter at $url" override def detailsXml:NodeSeq =
{s"Http response body was $responseBody"}
- createAndLog } case class CouldNotParseXmlFromAdapter(url:String,statusCode:Int,responseBody:String,saxx: SAXParseException) extends AbstractProblem(ProblemSources.Adapter) { override def throwable = Some(saxx) override def summary: String = s"Hub could not parse response from adapter" override def description: String = s"Hub could not parse xml from $url due to ${saxx.toString}" override def detailsXml:NodeSeq =
{s"Http response code was $statusCode and the body was $responseBody"} {throwableDetail}
- createAndLog } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala index 5432cda0a..6e948f3e2 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala @@ -1,301 +1,298 @@ package net.shrine.adapter import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import net.shrine.adapter.audit.AdapterAuditDb import scala.Option.option2Iterable import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.Try import scala.xml.NodeSeq import net.shrine.adapter.Obfuscator.obfuscateResults import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.model.Breakdown import net.shrine.adapter.dao.model.ShrineQueryResult import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BroadcastMessage, ErrorResponse, HasQueryResults, HiveCredentials, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse} import net.shrine.protocol.query.QueryDefinition import net.shrine.util.StackTrace import net.shrine.util.Tries.sequence import scala.concurrent.duration.Duration import net.shrine.client.Poster import net.shrine.problem.{AbstractProblem, ProblemSources} /** * @author clint * @since Nov 2, 2012 * */ object AbstractReadQueryResultAdapter { private final case class RawResponseAttempts(countResponseAttempt: Try[ReadResultResponse], breakdownResponseAttempts: Seq[Try[ReadResultResponse]]) private final case class SpecificResponseAttempts[R](responseAttempt: Try[R], breakdownResponseAttempts: Seq[Try[ReadResultResponse]]) } //noinspection RedundantBlock abstract class AbstractReadQueryResultAdapter[Req <: BaseShrineRequest, Rsp <: ShrineResponse with HasQueryResults]( poster: Poster, override val hiveCredentials: HiveCredentials, dao: AdapterDao, doObfuscation: Boolean, getQueryId: Req => Long, getProjectId: Req => String, toResponse: (Long, QueryResult) => Rsp, breakdownTypes: Set[ResultOutputType], collectAdapterAudit:Boolean ) extends WithHiveCredentialsAdapter(hiveCredentials) { //TODO: Make this configurable private val numThreads = math.max(5, Runtime.getRuntime.availableProcessors) //TODO: Use scala.concurrent.ExecutionContext.Implicits.global instead? private lazy val executorService = Executors.newFixedThreadPool(numThreads) private lazy val executionContext = ExecutionContext.fromExecutorService(executorService) override def shutdown() { try { executorService.shutdown() executorService.awaitTermination(5, TimeUnit.SECONDS) } finally { executorService.shutdownNow() super.shutdown() } } import AbstractReadQueryResultAdapter._ override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = { val req = message.request.asInstanceOf[Req] val queryId = getQueryId(req) def findShrineQueryRow = dao.findQueryByNetworkId(queryId) def findShrineQueryResults = dao.findResultsFor(queryId) findShrineQueryRow match { case None => { debug(s"Query $queryId not found in the Shrine DB") ErrorResponse(QueryNotFound(queryId)) } case Some(shrineQueryRow) => { findShrineQueryResults match { case None => { debug(s"Query $queryId found but its results are not available") //TODO: When precisely can this happen? Should we go back to the CRC here? ErrorResponse(QueryResultNotAvailable(queryId)) } case Some(shrineQueryResult) => { if (shrineQueryResult.isDone) { debug(s"Query $queryId is done and already stored, returning stored results") makeResponseFrom(queryId, shrineQueryResult) } else { debug(s"Query $queryId is incomplete, asking CRC for results") val result: ShrineResponse = retrieveQueryResults(queryId, req, shrineQueryResult, message) if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(queryId,result) result } } } } } } private def makeResponseFrom(queryId: Long, shrineQueryResult: ShrineQueryResult): ShrineResponse = { shrineQueryResult.toQueryResults(doObfuscation).map(toResponse(queryId, _)).getOrElse(ErrorResponse(QueryNotFound(queryId))) } private def retrieveQueryResults(queryId: Long, req: Req, shrineQueryResult: ShrineQueryResult, message: BroadcastMessage): ShrineResponse = { //NB: If the requested query was not finished executing on the i2b2 side when Shrine recorded it, attempt to //retrieve it and all its sub-components (breakdown results, if any) in parallel. Asking for the results in //parallel is quite possibly too clever, but may be faster than asking for them serially. //TODO: Review this. //Make requests for results in parallel val futureResponses = scatter(message.networkAuthn, req, shrineQueryResult) //Gather all the results (block until they're all returned) val SpecificResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = gather(queryId, futureResponses, req.waitTime) countResponseAttempt match { //If we successfully received the parent response (the one with query type PATIENT_COUNT_XML), re-store it along //with any retrieved breakdowns before returning it. case Success(countResponse) => { //NB: Only store the result if needed, that is, if all results are done //TODO: REVIEW THIS storeResultIfNecessary(shrineQueryResult, countResponse, req.authn, queryId, getFailedBreakdownTypes(breakdownResponseAttempts)) countResponse } case Failure(e) => ErrorResponse(CouldNotRetrieveQueryFromCrc(queryId,e)) } } private def scatter(authn: AuthenticationInfo, req: Req, shrineQueryResult: ShrineQueryResult): Future[RawResponseAttempts] = { def makeRequest(localResultId: Long) = ReadResultRequest(hiveCredentials.projectId, req.waitTime, hiveCredentials.toAuthenticationInfo, localResultId.toString) def process(localResultId: Long): ShrineResponse = { delegateResultRetrievingAdapter.process(authn, makeRequest(localResultId)) } implicit val executionContext = this.executionContext import scala.concurrent.blocking def futureBlockingAttempt[T](f: => T): Future[Try[T]] = Future(blocking(Try(f))) val futureCountAttempt: Future[Try[ShrineResponse]] = futureBlockingAttempt { process(shrineQueryResult.count.localId) } val futureBreakdownAttempts = Future.sequence(for { Breakdown(_, localResultId, resultType, data) <- shrineQueryResult.breakdowns } yield futureBlockingAttempt { process(localResultId) }) //Log errors retrieving count futureCountAttempt.collect { case Success(e: ErrorResponse) => error(s"Error requesting count result from the CRC: '$e'") case Failure(e) => error(s"Error requesting count result from the CRC: ", e) } //Log errors retrieving breakdown for { breakdownResponseAttempts <- futureBreakdownAttempts } { breakdownResponseAttempts.collect { case Success(e: ErrorResponse) => error(s"Error requesting breakdown result from the CRC: '$e'") case Failure(e) => error(s"Error requesting breakdown result from the CRC: ", e) } } //"Filter" for non-ErrorResponses val futureNonErrorCountAttempt: Future[Try[ReadResultResponse]] = futureCountAttempt.collect { case Success(resp: ReadResultResponse) => Success(resp) //NB: Need to repackage response here to avoid ugly, obscure, superfluous cast case unexpected => Failure(new Exception(s"Getting count result failed. Response is: '$unexpected'")) } //"Filter" for non-ErrorResponses val futureNonErrorBreakdownResponseAttempts: Future[Seq[Try[ReadResultResponse]]] = for { breakdownResponseAttempts <- futureBreakdownAttempts } yield { breakdownResponseAttempts.collect { case Success(resp: ReadResultResponse) => Try(resp) } } for { countResponseAttempt <- futureNonErrorCountAttempt breakdownResponseAttempts <- futureNonErrorBreakdownResponseAttempts } yield { RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) } } private def gather(queryId: Long, futureResponses: Future[RawResponseAttempts], waitTime: Duration): SpecificResponseAttempts[Rsp] = { val RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = Await.result(futureResponses, waitTime) //Log any failures (countResponseAttempt +: breakdownResponseAttempts).collect { case Failure(e) => e }.foreach(error("Error retrieving result from the CRC: ", _)) //NB: Count response and ALL breakdown responses must be available (not Failures) or else a Failure will be returned val responseAttempt = for { countResponse: ReadResultResponse <- countResponseAttempt countQueryResult = countResponse.metadata breakdownResponses: Seq[ReadResultResponse] <- sequence(breakdownResponseAttempts) } yield { val breakdownsByType = (for { breakdownResponse <- breakdownResponses resultType <- breakdownResponse.metadata.resultType } yield resultType -> breakdownResponse.data).toMap val queryResultWithBreakdowns = countQueryResult.withBreakdowns(breakdownsByType) val queryResultToReturn = if(doObfuscation) Obfuscator.obfuscate(queryResultWithBreakdowns) else queryResultWithBreakdowns toResponse(queryId, queryResultToReturn) } SpecificResponseAttempts(responseAttempt, breakdownResponseAttempts) } private def getFailedBreakdownTypes(attempts: Seq[Try[ReadResultResponse]]): Set[ResultOutputType] = { val successfulBreakdownTypes = attempts.collect { case Success(ReadResultResponse(_, metadata, _)) => metadata.resultType }.flatten breakdownTypes -- successfulBreakdownTypes } private def storeResultIfNecessary(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) { val responseIsDone = response.results.forall(_.statusType.isDone) if (responseIsDone) { storeResult(shrineQueryResult, response, authn, queryId, failedBreakdownTypes) } } private def storeResult(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) { val rawResults = response.results val obfuscatedResults = obfuscateResults(doObfuscation)(response.results) for { shrineQuery <- dao.findQueryByNetworkId(queryId) queryResult <- rawResults.headOption obfuscatedQueryResult <- obfuscatedResults.headOption } { val queryDefinition = QueryDefinition(shrineQuery.name, shrineQuery.queryDefinition.expr) dao.inTransaction { dao.deleteQuery(queryId) dao.storeResults(authn, shrineQueryResult.localId, queryId, queryDefinition, rawResults, obfuscatedResults, failedBreakdownTypes.toSeq, queryResult.breakdowns, obfuscatedQueryResult.breakdowns) } } } private type Unmarshaller[R] = Set[ResultOutputType] => NodeSeq => Try[R] private final class DelegateAdapter[Rqst <: ShrineRequest, Rspns <: ShrineResponse](unmarshaller: Unmarshaller[Rspns]) extends CrcAdapter[Rqst, Rspns](poster, hiveCredentials) { def process(authn: AuthenticationInfo, req: Rqst): Rspns = processRequest(BroadcastMessage(authn, req)).asInstanceOf[Rspns] override protected def parseShrineResponse(xml: NodeSeq): ShrineResponse = unmarshaller(breakdownTypes)(xml).get //TODO: Avoid .get call } private lazy val delegateResultRetrievingAdapter = new DelegateAdapter[ReadResultRequest, ReadResultResponse](ReadResultResponse.fromI2b2 _) } case class QueryNotFound(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) { override def summary: String = s"Query not found" override def description:String = s"No query with id $queryId found on ${stamp.host.getHostName}" - createAndLog } case class QueryResultNotAvailable(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) { override def summary: String = s"Query $queryId found but its results are not available yet" override def description:String = s"Query $queryId found but its results are not available yet on ${stamp.host.getHostName}" - createAndLog } case class CouldNotRetrieveQueryFromCrc(queryId:Long,x: Throwable) extends AbstractProblem(ProblemSources.Adapter) { override def summary: String = s"Could not retrieve query $queryId from the CRC" override def description:String = s"Unhandled exception while retrieving query $queryId while retrieving it from the CRC on ${stamp.host.getHostName}" override def throwable = Some(x) - createAndLog } diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala index 3914fedc6..d6592184c 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala @@ -1,104 +1,100 @@ package net.shrine.adapter import java.sql.SQLException import java.util.Date import net.shrine.adapter.dao.BotDetectedException import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, LoggingProblemHandler, Problem, ProblemNotYetEncoded, ProblemSources} import net.shrine.protocol.{AuthenticationInfo, BaseShrineResponse, BroadcastMessage, ErrorResponse, ShrineRequest} import scala.util.control.NonFatal /** * @author Bill Simons * @since 4/8/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ abstract class Adapter extends Loggable { //noinspection RedundantBlock final def perform(message: BroadcastMessage): BaseShrineResponse = { def problemToErrorResponse(problem:Problem):ErrorResponse = { LoggingProblemHandler.handleProblem(problem) ErrorResponse(problem) } val shrineResponse = try { processRequest(message) } catch { case e: AdapterLockoutException => problemToErrorResponse(AdapterLockout(message.request.authn,e)) case e: BotDetectedException => problemToErrorResponse(BotDetected(e)) case e @ CrcInvocationException(invokedCrcUrl, request, cause) => problemToErrorResponse(CrcCouldNotBeInvoked(invokedCrcUrl,request,e)) case e: AdapterMappingException => problemToErrorResponse(AdapterMappingProblem(e)) case e: SQLException => problemToErrorResponse(AdapterDatabaseProblem(e)) case NonFatal(e) => { val summary = if(message == null) "Unknown problem in Adapter.perform with null BroadcastMessage" else s"Unexpected exception in Adapter" problemToErrorResponse(ProblemNotYetEncoded(summary,e)) } } shrineResponse } protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse //NOOP, may be overridden by subclasses def shutdown(): Unit = () } case class AdapterLockout(authn:AuthenticationInfo,x:AdapterLockoutException) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) override val summary: String = s"User '${authn.domain}:${authn.username}' locked out." override val description:String = s"User '${authn.domain}:${authn.username}' has run too many queries that produce the same result at ${x.url} ." - createAndLog } case class CrcCouldNotBeInvoked(crcUrl:String,request:ShrineRequest,x:CrcInvocationException) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) override val summary: String = s"Error communicating with I2B2 CRC." override val description: String = s"Error invoking the CRC at '$crcUrl' with a ${request.getClass.getSimpleName} due to ${throwable.get}." override val detailsXml =

Request is {request} {throwableDetail.getOrElse("")}
- createAndLog } case class AdapterMappingProblem(x:AdapterMappingException) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) override val summary: String = "Could not map query term(s)." override val description = s"The Shrine Adapter on ${stamp.host.getHostName} cannot map this query to its local terms." override val detailsXml =
Query Defitiontion is {x.runQueryRequest.queryDefinition} RunQueryRequest is ${x.runQueryRequest.elideAuthenticationInfo} {throwableDetail.getOrElse("")}
- createAndLog } case class AdapterDatabaseProblem(x:SQLException) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) override val summary: String = "Problem using the Adapter database." override val description = "The Shrine Adapter encountered a problem using a database." - createAndLog } case class BotDetected(bdx:BotDetectedException) extends AbstractProblem(ProblemSources.Adapter) { override val summary: String = s"A user has run enough queries in a short period of time the adapter suspects a bot." override val description: String = s"${bdx.domain}:${bdx.username} has run ${bdx.detectedCount} queries since ${new Date(bdx.sinceMs)}, more than the limit of ${bdx.limit} allowed in this time frame." } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala index 1d6175726..df6ef54f2 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala @@ -1,110 +1,108 @@ package net.shrine.adapter import org.xml.sax.SAXParseException import scala.xml.NodeSeq import scala.xml.XML import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, BroadcastMessage, Credential, ErrorResponse, HiveCredentials, ShrineRequest, ShrineResponse, TranslatableRequest} import net.shrine.util.XmlDateHelper import net.shrine.client.Poster import net.shrine.problem.{AbstractProblem, ProblemSources} import scala.util.Try import scala.util.control.NonFatal /** * @author Bill Simons * @since 4/11/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ abstract class CrcAdapter[T <: ShrineRequest, V <: ShrineResponse]( poster: Poster, override protected val hiveCredentials: HiveCredentials) extends WithHiveCredentialsAdapter(hiveCredentials) { protected def parseShrineResponse(nodeSeq: NodeSeq): ShrineResponse private[adapter] def parseShrineErrorResponseWithFallback(xmlResponseFromCrc: String): ShrineResponse = { //NB: See https://open.med.harvard.edu/jira/browse/SHRINE-534 //NB: https://open.med.harvard.edu/jira/browse/SHRINE-745 val shrineResponseAttempt = for { crcXml <- Try(XML.loadString(xmlResponseFromCrc)) shrineResponse <- Try(parseShrineResponse(crcXml)).recover { case NonFatal(e) => info(s"Exception while parsing $crcXml",e) ErrorResponse.fromI2b2(crcXml) } //todo pass the exception to build a proper error response, and log the exception } yield shrineResponse shrineResponseAttempt.recover { case saxx:SAXParseException => ErrorResponse(CannotParseXmlFromCrc(saxx,xmlResponseFromCrc)) case NonFatal(e) => error(s"Error parsing response from CRC: ", e) ErrorResponse(ExceptionWhileLoadingCrcResponse(e,xmlResponseFromCrc)) }.get } //NB: default is a noop; only RunQueryAdapter needs this for now protected[adapter] def translateNetworkToLocal(request: T): T = request protected[adapter] override def processRequest(message: BroadcastMessage): BaseShrineResponse = { val i2b2Response = callCrc(translateRequest(message.request)) parseShrineErrorResponseWithFallback(i2b2Response) } protected def callCrc(request: ShrineRequest): String = { debug(s"Sending Shrine-formatted request to the CRC at '${poster.url}': $request") val crcRequest = request.toI2b2String val crcResponse = XmlDateHelper.time(s"Calling the CRC at '${poster.url}'")(debug(_)) { //Wrap exceptions in a more descriptive form, to enable sending better error messages back to the legacy web client try { poster.post(crcRequest) } catch { case NonFatal(e) => throw CrcInvocationException(poster.url, request, e) } } crcResponse.body } private[adapter] def translateRequest(request: BaseShrineRequest): ShrineRequest = request match { case transReq: TranslatableRequest[T] => //noinspection RedundantBlock { val HiveCredentials(domain, username, password, project) = hiveCredentials val authInfo = AuthenticationInfo(domain, username, Credential(password, isToken = false)) translateNetworkToLocal(transReq.withAuthn(authInfo).withProject(project).asRequest) } case req: ShrineRequest => req case _ => throw new IllegalArgumentException(s"Unexpected request: $request") } } case class CannotParseXmlFromCrc(saxx:SAXParseException,xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(saxx) override val summary: String = "Could not parse response from CRC." override val description:String = s"${saxx.getMessage} while parsing the response from the CRC." override val detailsXml =

{throwableDetail.getOrElse("")} Response is {xmlResponseFromCrc}
- createAndLog } case class ExceptionWhileLoadingCrcResponse(t:Throwable,xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(t) override val summary: String = "Unanticipated exception with response from CRC." override val description:String = s"${t.getMessage} while parsing the response from the CRC." override val detailsXml =
{throwableDetail.getOrElse("")} Response is {xmlResponseFromCrc}
- createAndLog } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala index 9dd269990..21ad6753e 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala @@ -1,291 +1,289 @@ package net.shrine.adapter import net.shrine.adapter.audit.AdapterAuditDb import scala.util.Failure import scala.util.Success import scala.util.Try import scala.xml.NodeSeq import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.protocol.{AuthenticationInfo, BroadcastMessage, Credential, ErrorFromCrcException, ErrorResponse, HiveCredentials, I2b2ResultEnvelope, MissingCrCXmlResultException, QueryResult, RawCrcRunQueryResponse, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse, ShrineResponse} import net.shrine.client.Poster import net.shrine.problem.{AbstractProblem, LoggingProblemHandler, Problem, ProblemNotYetEncoded, ProblemSources} import scala.util.control.NonFatal import net.shrine.util.XmlDateHelper import scala.concurrent.duration.Duration import scala.xml.XML /** * @author Bill Simons * @author clint * @since 4/15/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ final case class RunQueryAdapter( poster: Poster, dao: AdapterDao, override val hiveCredentials: HiveCredentials, conceptTranslator: QueryDefinitionTranslator, adapterLockoutAttemptsThreshold: Int, doObfuscation: Boolean, runQueriesImmediately: Boolean, breakdownTypes: Set[ResultOutputType], collectAdapterAudit:Boolean, botCountTimeThresholds:Map[Long,Duration] ) extends CrcAdapter[RunQueryRequest, RunQueryResponse](poster, hiveCredentials) { logStartup() import RunQueryAdapter._ override protected[adapter] def parseShrineResponse(xml: NodeSeq) = RawCrcRunQueryResponse.fromI2b2(breakdownTypes)(xml).get //TODO: Avoid .get call override protected[adapter] def translateNetworkToLocal(request: RunQueryRequest): RunQueryRequest = { try { request.mapQueryDefinition(conceptTranslator.translate) } catch { case NonFatal(e) => throw new AdapterMappingException(request,s"Error mapping query terms from network to local forms.", e) } } override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = { if (collectAdapterAudit) AdapterAuditDb.db.insertQueryReceived(message) if (isLockedOut(message.networkAuthn)) { throw new AdapterLockoutException(message.networkAuthn,poster.url) } dao.checkIfBot(message.networkAuthn,botCountTimeThresholds) val runQueryReq = message.request.asInstanceOf[RunQueryRequest] //We need to use the network identity from the BroadcastMessage, since that will have the network username //(ie, ecommons) of the querying user. Using the AuthenticationInfo from the incoming request breaks the fetching //of previous queries on deployed systems where the credentials in the identity param to this method and the authn //field of the incoming request are different, like the HMS Shrine deployment. //NB: Credential field is wiped out to preserve old behavior -Clint 14 Nov, 2013 val authnToUse = message.networkAuthn.copy(credential = Credential("", isToken = false)) if (!runQueriesImmediately) { debug(s"Queueing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}") storeQuery(authnToUse, message, runQueryReq) } else { debug(s"Performing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}") val result: ShrineResponse = runQuery(authnToUse, message.copy(request = runQueryReq.withAuthn(authnToUse)), runQueryReq.withAuthn(authnToUse)) if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(runQueryReq.networkQueryId,result) result } } private def storeQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): RunQueryResponse = { //Use dummy ids for what we would have received from the CRC val masterId: Long = -1L val queryInstanceId: Long = -1L val resultId: Long = -1L //TODO: is this right?? Or maybe it's project id? val groupId = authnToUse.domain val invalidSetSize = -1L val now = XmlDateHelper.now val queryResult = QueryResult(resultId, queryInstanceId, Some(ResultOutputType.PATIENT_COUNT_XML), invalidSetSize, Some(now), Some(now), Some("Query enqueued for later processing"), QueryResult.StatusType.Held, Some("Query enqueued for later processing")) dao.inTransaction { val insertedQueryId = dao.insertQuery(masterId.toString, request.networkQueryId, authnToUse, request.queryDefinition, isFlagged = false, hasBeenRun = false, flagMessage = None) val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(queryResult)) //NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in //AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete //queries, will work. val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head dao.insertCountResult(countQueryResultId, -1L, -1L) } RunQueryResponse(masterId, XmlDateHelper.now, authnToUse.username, groupId, request.queryDefinition, queryInstanceId, queryResult) } private def runQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): ShrineResponse = { if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionStarted(request) //NB: Pass through ErrorResponses received from the CRC. //See: https://open.med.harvard.edu/jira/browse/SHRINE-794 val result = super.processRequest(message) match { case e: ErrorResponse => e case rawRunQueryResponse: RawCrcRunQueryResponse => processRawCrcRunQueryResponse(authnToUse, request, rawRunQueryResponse) } if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionCompletedShrineResponse(request,result) result } private[adapter] def processRawCrcRunQueryResponse(authnToUse: AuthenticationInfo, request: RunQueryRequest, rawRunQueryResponse: RawCrcRunQueryResponse): RunQueryResponse = { def isBreakdown(result: QueryResult) = result.resultType.exists(_.isBreakdown) val originalResults: Seq[QueryResult] = rawRunQueryResponse.results val (originalBreakdownResults, originalNonBreakDownResults): (Seq[QueryResult],Seq[QueryResult]) = originalResults.partition(isBreakdown) val originalBreakdownCountAttempts: Seq[(QueryResult, Try[QueryResult])] = attemptToRetrieveBreakdowns(request, originalBreakdownResults) val (successfulBreakdownCountAttempts, failedBreakdownCountAttempts) = originalBreakdownCountAttempts.partition { case (_, t) => t.isSuccess } val failedBreakdownCountAttemptsWithProblems = failedBreakdownCountAttempts.map { attempt => val originalResult: QueryResult = attempt._1 val queryResult:QueryResult = if (originalResult.problemDigest.isDefined) originalResult else { attempt._2 match { case Success(_) => originalResult case Failure(x) => //noinspection RedundantBlock { val problem:Problem = x match { case e: ErrorFromCrcException => ErrorFromCrcBreakdown(e) case e: MissingCrCXmlResultException => CannotInterpretCrcBreakdownXml(e) case NonFatal(e) => { val summary = s"Unexpected exception while interpreting breakdown response" ProblemNotYetEncoded(summary, e) } } LoggingProblemHandler.handleProblem(problem) originalResult.copy(problemDigest = Some(problem.toDigest)) } } } (queryResult,attempt._2) } logBreakdownFailures(rawRunQueryResponse, failedBreakdownCountAttemptsWithProblems) val originalMergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope] = { val withBreakdownCounts = successfulBreakdownCountAttempts.collect { case (_, Success(queryResultWithBreakdowns)) => queryResultWithBreakdowns } withBreakdownCounts.map(_.breakdowns).fold(Map.empty)(_ ++ _) } val obfuscatedQueryResults = originalResults.map(Obfuscator.obfuscate) val obfuscatedNonBreakdownQueryResults = obfuscatedQueryResults.filterNot(isBreakdown) val obfuscatedMergedBreakdowns = obfuscateBreakdowns(originalMergedBreakdowns) val failedBreakdownTypes = failedBreakdownCountAttemptsWithProblems.flatMap { case (qr, _) => qr.resultType } dao.storeResults( authn = authnToUse, masterId = rawRunQueryResponse.queryId.toString, networkQueryId = request.networkQueryId, queryDefinition = request.queryDefinition, rawQueryResults = originalResults, obfuscatedQueryResults = obfuscatedQueryResults, failedBreakdownTypes = failedBreakdownTypes, mergedBreakdowns = originalMergedBreakdowns, obfuscatedBreakdowns = obfuscatedMergedBreakdowns) // at this point the queryResult could be a mix of successes and failures. // SHRINE reports only the successes. See SHRINE-1567 for details val queryResults: Seq[QueryResult] = if (doObfuscation) obfuscatedNonBreakdownQueryResults else originalNonBreakDownResults val breakdownsToReturn: Map[ResultOutputType, I2b2ResultEnvelope] = if (doObfuscation) obfuscatedMergedBreakdowns else originalMergedBreakdowns //TODO: Will fail in the case of NO non-breakdown QueryResults. Can this ever happen, and is it worth protecting against here? //can failedBreakdownCountAttempts be mixed back in here? val resultWithBreakdowns: QueryResult = queryResults.head.withBreakdowns(breakdownsToReturn) if(debugEnabled) { def justBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = breakdowns.mapValues(_.data) val obfuscationMessage = s"obfuscation is ${if(doObfuscation) "ON" else "OFF"}" debug(s"Returning QueryResult with count ${resultWithBreakdowns.setSize} (original count: ${originalNonBreakDownResults.headOption.map(_.setSize)} ; $obfuscationMessage)") debug(s"Returning QueryResult with breakdowns ${justBreakdowns(resultWithBreakdowns.breakdowns)} (original breakdowns: ${justBreakdowns(originalMergedBreakdowns)} ; $obfuscationMessage)") debug(s"Full QueryResult: $resultWithBreakdowns") } //if any results had problems, this commented out code can turn it into an error QueryResult //See SHRINE-1619 //val problem: Option[ProblemDigest] = failedBreakdownCountAttemptsWithProblems.headOption.flatMap(x => x._1.problemDigest) //val queryResult = problem.fold(resultWithBreakdowns)(pd => QueryResult.errorResult(Some(pd.description),"Error with CRC",pd)) rawRunQueryResponse.toRunQueryResponse.withResult(resultWithBreakdowns) } private def getResultFromCrc(parentRequest: RunQueryRequest, networkResultId: Long): Try[ReadResultResponse] = { def readResultRequest(runQueryReq: RunQueryRequest, networkResultId: Long) = ReadResultRequest(hiveCredentials.projectId, runQueryReq.waitTime, hiveCredentials.toAuthenticationInfo, networkResultId.toString) Try(XML.loadString(callCrc(readResultRequest(parentRequest, networkResultId)))).flatMap(ReadResultResponse.fromI2b2(breakdownTypes)) } private[adapter] def attemptToRetrieveCount(runQueryReq: RunQueryRequest, originalCountQueryResult: QueryResult): (QueryResult, Try[QueryResult]) = { originalCountQueryResult -> (for { countData <- getResultFromCrc(runQueryReq, originalCountQueryResult.resultId) } yield originalCountQueryResult.withSetSize(countData.metadata.setSize)) } private[adapter] def attemptToRetrieveBreakdowns(runQueryReq: RunQueryRequest, breakdownResults: Seq[QueryResult]): Seq[(QueryResult, Try[QueryResult])] = { breakdownResults.map { origBreakdownResult => origBreakdownResult -> (for { breakdownData <- getResultFromCrc(runQueryReq, origBreakdownResult.resultId).map(_.data) } yield origBreakdownResult.withBreakdown(breakdownData)) } } private[adapter] def logBreakdownFailures(response: RawCrcRunQueryResponse, failures: Seq[(QueryResult, Try[QueryResult])]) { for { (origQueryResult, Failure(e)) <- failures } { error(s"Couldn't load breakdown for QueryResult with masterId: ${response.queryId}, instanceId: ${origQueryResult.instanceId}, resultId: ${origQueryResult.resultId}. Asked for result type: ${origQueryResult.resultType}", e) } } private def isLockedOut(authn: AuthenticationInfo): Boolean = { adapterLockoutAttemptsThreshold match { case 0 => false case _ => dao.isUserLockedOut(authn, adapterLockoutAttemptsThreshold) } } private def logStartup(): Unit = { val message = { if (runQueriesImmediately) { s"${getClass.getSimpleName} will run queries immediately" } else { s"${getClass.getSimpleName} will queue queries for later execution" } } info(message) } } object RunQueryAdapter { private[adapter] def obfuscateBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Map[ResultOutputType, I2b2ResultEnvelope] = { breakdowns.mapValues(_.mapValues(Obfuscator.obfuscate)) } } case class ErrorFromCrcBreakdown(x:ErrorFromCrcException) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) override val summary: String = "The CRC reported an error." override val description = "The CRC reported an internal error." - createAndLog } case class CannotInterpretCrcBreakdownXml(x:MissingCrCXmlResultException) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) override val summary: String = "SHRINE cannot interpret the CRC response." override val description = "The CRC responded, but SHRINE could not interpret that response." - createAndLog } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala index b678794d7..9e3069947 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala @@ -1,40 +1,39 @@ package net.shrine.adapter.components import net.shrine.adapter.dao.AdapterDao import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.protocol.ShrineResponse import net.shrine.protocol.ReadQueryDefinitionRequest import net.shrine.protocol.ReadQueryDefinitionResponse import net.shrine.protocol.ErrorResponse import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.AbstractReadQueryDefinitionRequest /** * @author clint * @since Apr 4, 2013 * * NB: Tested by ReadQueryDefinitionAdapterTest */ final case class QueryDefinitions[Req <: AbstractReadQueryDefinitionRequest](dao: AdapterDao) { def get(request: Req): ShrineResponse = { val resultOption = for { shrineQuery <- dao.findQueryByNetworkId(request.queryId) } yield { ReadQueryDefinitionResponse( shrineQuery.networkId, shrineQuery.name, shrineQuery.username, shrineQuery.dateCreated, //TODO: I2b2 or Shrine format? shrineQuery.queryDefinition.toI2b2String) } resultOption.getOrElse(ErrorResponse(QueryNotInDatabase(request))) } } case class QueryNotInDatabase(request:AbstractReadQueryDefinitionRequest) extends AbstractProblem(ProblemSources.Hub) { override val summary: String = s"Couldn't find query definition." override val description:String = s"The query definition with network id: ${request.queryId} does not exist at this site." - createAndLog } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala index 9c36a5bf8..f4cbb80b9 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala @@ -1,490 +1,489 @@ package net.shrine.adapter.dao.squeryl import java.sql.Timestamp import javax.xml.datatype.XMLGregorianCalendar import net.shrine.adapter.dao.{AdapterDao, BotDetectedException} import net.shrine.adapter.dao.model.{ObfuscatedPair, ShrineQuery, ShrineQueryResult} import net.shrine.adapter.dao.model.squeryl.{SquerylBreakdownResultRow, SquerylCountRow, SquerylPrivilegedUser, SquerylQueryResultRow, SquerylShrineError, SquerylShrineQuery} import net.shrine.adapter.dao.squeryl.tables.Tables import net.shrine.dao.DateHelpers import net.shrine.dao.squeryl.{SquerylEntryPoint, SquerylInitializer} import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.protocol.{AuthenticationInfo, I2b2ResultEnvelope, QueryResult, ResultOutputType} import net.shrine.protocol.query.QueryDefinition import net.shrine.util.XmlDateHelper import org.squeryl.Query import org.squeryl.dsl.{GroupWithMeasures, Measures} import scala.concurrent.duration.Duration import scala.util.Try import scala.xml.NodeSeq /** * @author clint * @since May 22, 2013 */ final class SquerylAdapterDao(initializer: SquerylInitializer, tables: Tables)(implicit breakdownTypes: Set[ResultOutputType]) extends AdapterDao with Loggable { initializer.init() override def inTransaction[T](f: => T): T = SquerylEntryPoint.inTransaction { f } import SquerylEntryPoint._ override def flagQuery(networkQueryId: Long, flagMessage: Option[String]): Unit = mutateFlagField(networkQueryId, newIsFlagged = true, flagMessage) override def unFlagQuery(networkQueryId: Long): Unit = mutateFlagField(networkQueryId, newIsFlagged = false, None) private def mutateFlagField(networkQueryId: Long, newIsFlagged: Boolean, newFlagMessage: Option[String]): Unit = { inTransaction { update(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId). set(queryRow.isFlagged := newIsFlagged, queryRow.flagMessage := newFlagMessage) } } } override def storeResults( authn: AuthenticationInfo, masterId: String, networkQueryId: Long, queryDefinition: QueryDefinition, rawQueryResults: Seq[QueryResult], obfuscatedQueryResults: Seq[QueryResult], failedBreakdownTypes: Seq[ResultOutputType], mergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Unit = { inTransaction { val insertedQueryId = insertQuery(masterId, networkQueryId, authn, queryDefinition, isFlagged = false, hasBeenRun = true, flagMessage = None) val insertedQueryResultIds = insertQueryResults(insertedQueryId, rawQueryResults) storeCountResults(rawQueryResults, obfuscatedQueryResults, insertedQueryResultIds) storeErrorResults(rawQueryResults, insertedQueryResultIds) storeBreakdownFailures(failedBreakdownTypes.toSet, insertedQueryResultIds) insertBreakdownResults(insertedQueryResultIds, mergedBreakdowns, obfuscatedBreakdowns) } } private[adapter] def storeCountResults(raw: Seq[QueryResult], obfuscated: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val notErrors = raw.filter(!_.isError) val obfuscatedNotErrors = obfuscated.filter(!_.isError) if(notErrors.size > 1) { warn(s"Got ${notErrors.size} raw (hopefully-)count results; more than 1 is unusual.") } if(obfuscatedNotErrors.size > 1) { warn(s"Got ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; more than 1 is unusual.") } if(notErrors.size != obfuscatedNotErrors.size) { warn(s"Got ${notErrors.size} raw and ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; that these numbers are different is unusual.") } import ResultOutputType.PATIENT_COUNT_XML def isCount(qr: QueryResult): Boolean = qr.resultType.contains(PATIENT_COUNT_XML) inTransaction { //NB: Take the count/setSize from the FIRST PATIENT_COUNT_XML QueryResult, //though the same count should be there for all of them, if there are more than one for { Seq(insertedCountQueryResultId) <- insertedIds.get(PATIENT_COUNT_XML) notError <- notErrors.find(isCount) //NB: Find a count result, just to be sure obfuscatedNotError <- obfuscatedNotErrors.find(isCount) //NB: Find a count result, just to be sure } { insertCountResult(insertedCountQueryResultId, notError.setSize, obfuscatedNotError.setSize) } } } private[adapter] def storeErrorResults(results: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val errors = results.filter(_.isError) val insertedErrorResultIds = insertedIds.getOrElse(ResultOutputType.ERROR,Nil) val insertedIdsToErrors = insertedErrorResultIds zip errors inTransaction { for { (insertedErrorResultId, errorQueryResult) <- insertedIdsToErrors } { val pd = errorQueryResult.problemDigest.get //it's an error so it will have a problem digest insertErrorResult( insertedErrorResultId, errorQueryResult.statusMessage.getOrElse("Unknown failure"), pd.codec, pd.stampText, pd.summary, pd.description, pd.detailsXml ) } } } private[adapter] def storeBreakdownFailures(failedBreakdownTypes: Set[ResultOutputType], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val insertedIdsForFailedBreakdownTypes = insertedIds.filterKeys(failedBreakdownTypes.contains) inTransaction { for { (failedBreakdownType, Seq(resultId)) <- insertedIdsForFailedBreakdownTypes } { //todo propagate backwards to the breakdown failure to create the corect problem object BreakdownFailure extends AbstractProblem(ProblemSources.Adapter) { override val summary: String = "Couldn't retrieve result breakdown" override val description:String = s"Couldn't retrieve result breakdown of type '$failedBreakdownType'" - createAndLog } val pd = BreakdownFailure.toDigest insertErrorResult( resultId, s"Couldn't retrieve breakdown of type '$failedBreakdownType'", pd.codec, pd.stampText, pd.summary, pd.description, pd.detailsXml ) } } } override def findRecentQueries(howMany: Int): Seq[ShrineQuery] = { inTransaction { Queries.queriesForAllUsers.take(howMany).map(_.toShrineQuery).toSeq } } def findAllCounts():Seq[SquerylCountRow] = { inTransaction{ Queries.allCountResults.toSeq } } override def renameQuery(networkQueryId: Long, newName: String) { inTransaction { update(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId). set(queryRow.name := newName) } } } override def deleteQuery(networkQueryId: Long): Unit = { inTransaction { tables.shrineQueries.deleteWhere(_.networkId === networkQueryId) } } override def deleteQueryResultsFor(networkQueryId: Long): Unit = { inTransaction { val resultIdsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) => where(queryRow.networkId === networkQueryId). select(resultRow.id). on(queryRow.id === resultRow.queryId) }.toSet tables.queryResults.deleteWhere(_.id in resultIdsForNetworkQueryId) } } override def isUserLockedOut(authn: AuthenticationInfo, defaultThreshold: Int): Boolean = Try { inTransaction { val privilegedUserOption = Queries.privilegedUsers(authn.domain, authn.username).singleOption val threshold:Int = privilegedUserOption.flatMap(_.threshold).getOrElse(defaultThreshold.intValue) val thirtyDaysInThePast: XMLGregorianCalendar = DateHelpers.daysFromNow(-30) val overrideDate: XMLGregorianCalendar = privilegedUserOption.map(_.toPrivilegedUser).flatMap(_.overrideDate).getOrElse(thirtyDaysInThePast) //sorted instead of just finding max val counts: Seq[Long] = Queries.repeatedResults(authn.domain, authn.username, overrideDate).toSeq.sorted //and then grabbing the last, highest value in the sorted sequence val repeatedResultCount: Long = counts.lastOption.getOrElse(0L) val result = repeatedResultCount > threshold debug(s"User ${authn.domain}:${authn.username} locked out? $result") result } }.getOrElse(false) override def checkIfBot(authn:AuthenticationInfo, botTimeThresholds:Map[Long,Duration]): Unit = { val now = System.currentTimeMillis() botTimeThresholds.foreach{countDuration => inTransaction { val sinceMs: Long = now - countDuration._2.toMillis val query: Query[Measures[Long]] = Queries.countQueriesForUserSince(authn.domain, authn.username, sinceMs) val queriesSince = query.headOption.map(_.measures).getOrElse(0L) if (queriesSince > countDuration._1) throw new BotDetectedException(domain = authn.domain, username = authn.username, detectedCount = queriesSince, sinceMs = sinceMs, limit = countDuration._1) }} } override def insertQuery(localMasterId: String, networkId: Long, authn: AuthenticationInfo, queryDefinition: QueryDefinition, isFlagged: Boolean, hasBeenRun: Boolean, flagMessage: Option[String]): Int = { inTransaction { val inserted = tables.shrineQueries.insert(new SquerylShrineQuery( 0, localMasterId, networkId, authn.username, authn.domain, XmlDateHelper.now, isFlagged, flagMessage, hasBeenRun, queryDefinition)) inserted.id } } /** * Insert rows into QueryResults, one for each QueryResult in the passed RunQueryResponse * Inserted rows are 'children' of the passed ShrineQuery (ie, they are the results of the query) */ override def insertQueryResults(parentQueryId: Int, results: Seq[QueryResult]): Map[ResultOutputType, Seq[Int]] = { def execTime(result: QueryResult): Option[Long] = { //TODO: How are locales handled here? Do we care? def toMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis for { start <- result.startDate end <- result.endDate } yield toMillis(end) - toMillis(start) } val typeToIdTuples = inTransaction { for { result <- results resultType = result.resultType.getOrElse(ResultOutputType.ERROR) //TODO: under what circumstances can QueryResults NOT have start and end dates set? elapsed = execTime(result) } yield { val lastInsertedQueryResultRow = tables.queryResults.insert(new SquerylQueryResultRow(0, result.resultId, parentQueryId, resultType, result.statusType, elapsed, XmlDateHelper.now)) (resultType, lastInsertedQueryResultRow.id) } } typeToIdTuples.groupBy { case (resultType, _) => resultType }.mapValues(_.map { case (_, count) => count }) } override def insertCountResult(resultId: Int, originalCount: Long, obfuscatedCount: Long) { //NB: Squeryl steers us toward inserting with dummy ids :( inTransaction { tables.countResults.insert(new SquerylCountRow(0, resultId, originalCount, obfuscatedCount, XmlDateHelper.now)) } } override def insertBreakdownResults(parentResultIds: Map[ResultOutputType, Seq[Int]], originalBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) { def merge(original: I2b2ResultEnvelope, obfuscated: I2b2ResultEnvelope): Map[String, ObfuscatedPair] = { Map.empty ++ (for { (key, originalValue) <- original.data obfuscatedValue <- obfuscated.data.get(key) } yield (key, ObfuscatedPair(originalValue, obfuscatedValue))) } inTransaction { for { (resultType, Seq(resultId)) <- parentResultIds if resultType.isBreakdown originalBreakdown <- originalBreakdowns.get(resultType) obfuscatedBreakdown <- obfuscatedBreakdowns.get(resultType) (key, ObfuscatedPair(original, obfuscated)) <- merge(originalBreakdown, obfuscatedBreakdown) } { tables.breakdownResults.insert(SquerylBreakdownResultRow(0, resultId, key, original, obfuscated)) } } } override def insertErrorResult(parentResultId: Int, errorMessage: String, codec:String, stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq) { //NB: Squeryl steers us toward inserting with dummy ids :( inTransaction { tables.errorResults.insert(SquerylShrineError(0, parentResultId, errorMessage, codec, stampText, summary, digestDescription, detailsXml.toString())) } } override def findQueryByNetworkId(networkQueryId: Long): Option[ShrineQuery] = { inTransaction { Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery) } } override def findQueriesByUserAndDomain(domain: String, username: String, howMany: Int): Seq[ShrineQuery] = { inTransaction { Queries.queriesForUser(username, domain).take(howMany).toSeq.map(_.toShrineQuery) } } override def findQueriesByDomain(domain: String): Seq[ShrineQuery] = { inTransaction { Queries.queriesForDomain(domain).toList.map(_.toShrineQuery) } } override def findResultsFor(networkQueryId: Long): Option[ShrineQueryResult] = { inTransaction { val breakdownRowsByType = Queries.breakdownResults(networkQueryId).toSeq.groupBy { case (outputType, _) => outputType.toQueryResultRow.resultType }.mapValues(_.map { case (_, row) => row.toBreakdownResultRow }) val queryRowOption = Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery) val countRowOption = Queries.countResults(networkQueryId).headOption.map(_.toCountRow) val queryResultRows = Queries.resultsForQuery(networkQueryId).toSeq.map(_.toQueryResultRow) val errorResultRows = Queries.errorResults(networkQueryId).toSeq.map(_.toShrineError) for { queryRow <- queryRowOption countRow <- countRowOption shrineQueryResult <- ShrineQueryResult.fromRows(queryRow, queryResultRows, countRow, breakdownRowsByType, errorResultRows) } yield { shrineQueryResult } } } /** * @author clint * @since Nov 19, 2012 */ object Queries { def privilegedUsers(domain: String, username: String): Query[SquerylPrivilegedUser] = { from(tables.privilegedUsers) { user => where(user.username === username and user.domain === domain).select(user) } } def countQueriesForUserSince(domain:String, username:String, sinceMs:Long): Query[Measures[Long]] = { val since = new Timestamp(sinceMs) from(tables.shrineQueries) { queryRow => where(queryRow.domain === domain and queryRow.username === username and queryRow.dateCreated >= since). compute(count) } } def repeatedResults(domain: String, username: String, overrideDate: XMLGregorianCalendar): Query[Long] = { val counts: Query[GroupWithMeasures[Long, Long]] = join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) => where(queryRow.username === username and queryRow.domain === domain and (countRow.originalValue <> 0L) and queryRow.dateCreated > DateHelpers.toTimestamp(overrideDate)). groupBy(countRow.originalValue). compute(count(countRow.originalValue)). on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId) } //Filter for result counts > 0 from(counts) { cnt => where(cnt.measures gt 0).select(cnt.measures) } } val queriesForAllUsers: Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => select(queryRow).orderBy(queryRow.dateCreated.desc) } } //TODO: Find a way to parameterize on limit, to avoid building the query every time //TODO: limit def queriesForUser(username: String, domain: String): Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => where(queryRow.domain === domain and queryRow.username === username). select(queryRow). orderBy(queryRow.dateCreated.desc) } } def queriesForDomain(domain: String): Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => where(queryRow.domain === domain). select(queryRow). orderBy(queryRow.dateCreated.desc) } } val allCountResults: Query[SquerylCountRow] = { from(tables.countResults) { queryRow => select(queryRow) } } def queriesByNetworkId(networkQueryId: Long): Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId).select(queryRow) } } //TODO: Find out how to compose queries, to re-use queriesByNetworkId def queryNamesByNetworkId(networkQueryId: Long): Query[String] = { from(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId).select(queryRow.name) } } def resultsForQuery(networkQueryId: Long): Query[SquerylQueryResultRow] = { val resultsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) => where(queryRow.networkId === networkQueryId). select(resultRow). on(queryRow.id === resultRow.queryId) } from(resultsForNetworkQueryId)(select(_)) } def countResults(networkQueryId: Long): Query[SquerylCountRow] = { join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) => where(queryRow.networkId === networkQueryId). select(countRow). on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId) } } def errorResults(networkQueryId: Long): Query[SquerylShrineError] = { join(tables.shrineQueries, tables.queryResults, tables.errorResults) { (queryRow, resultRow, errorRow) => where(queryRow.networkId === networkQueryId). select(errorRow). on(queryRow.id === resultRow.queryId, resultRow.id === errorRow.resultId) } } //NB: using groupBy here is too much of a pain; do it 'manually' later def breakdownResults(networkQueryId: Long): Query[(SquerylQueryResultRow, SquerylBreakdownResultRow)] = { join(tables.shrineQueries, tables.queryResults, tables.breakdownResults) { (queryRow, resultRow, breakdownRow) => where(queryRow.networkId === networkQueryId). select((resultRow, breakdownRow)). on(queryRow.id === resultRow.queryId, resultRow.id === breakdownRow.resultId) } } } } diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala index cea546ce7..4156d09c9 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala @@ -1,105 +1,103 @@ package net.shrine.adapter.service import net.shrine.log.Loggable import net.shrine.protocol.{BaseShrineResponse, BroadcastMessage, ErrorResponse, NodeId, RequestType, Result, Signature} import net.shrine.adapter.AdapterMap import net.shrine.crypto.Verifier import net.shrine.problem.{AbstractProblem, ProblemSources} import scala.concurrent.duration.Duration import scala.concurrent.duration._ /** * Heart of the adapter. * * @author clint * @since Nov 14, 2013 */ final class AdapterService( nodeId: NodeId, signatureVerifier: Verifier, maxSignatureAge: Duration, adapterMap: AdapterMap) extends AdapterRequestHandler with Loggable { import AdapterService._ logStartup(adapterMap) override def handleRequest(message: BroadcastMessage): Result = { handleInvalidSignature(message).orElse { for { adapter <- adapterMap.adapterFor(message.request.requestType) } yield time(nodeId) { adapter.perform(message) } }.getOrElse { Result(nodeId, 0.milliseconds, ErrorResponse(UnknownRequestType(message.request.requestType))) } } /** * @return None if the signature is fine, Some(result with an ErrorResponse) if not */ private def handleInvalidSignature(message: BroadcastMessage): Option[Result] = { val (sigIsValid, elapsed) = time(signatureVerifier.verifySig(message, maxSignatureAge)) if(sigIsValid) { None } else { info(s"Incoming message had invalid signature: $message") Some(Result(nodeId, elapsed.milliseconds, ErrorResponse(CouldNotVerifySignature(message)))) } } } object AdapterService extends Loggable { private def logStartup(adapterMap: AdapterMap) { info("Adapter service initialized, will respond to the following queries: ") val sortedByReqType = adapterMap.requestsToAdapters.toSeq.sortBy { case (k, _) => k } sortedByReqType.foreach { case (requestType, adapter) => info(s" $requestType:\t(${adapter.getClass.getSimpleName})") } } private[service] def time[T](f: => T): (T, Long) = { val start = System.currentTimeMillis val result = f val elapsed = System.currentTimeMillis - start (result, elapsed) } private[service] def time(nodeId: NodeId)(f: => BaseShrineResponse): Result = { val (response, elapsed) = time(f) Result(nodeId, elapsed.milliseconds, response) } } case class CouldNotVerifySignature(message: BroadcastMessage) extends AbstractProblem(ProblemSources.Adapter){ val signature: Option[Signature] = message.signature override val summary: String = signature.fold("A message was not signed")(sig => s"The trust relationship with ${sig.signedBy} is not properly configured.") override val description: String = signature.fold(s"The Adapter at ${stamp.host.getHostName} could not properly validate a request because it had no signature.")(sig => s"The Adapter at ${stamp.host.getHostName} could not properly validate the request from ${sig.signedBy}. An incoming message from the hub had an invalid signature.") override val detailsXml = signature.fold(

)( sig =>
Signature is {sig}
) - createAndLog } case class UnknownRequestType(requestType: RequestType) extends AbstractProblem(ProblemSources.Adapter){ override val summary: String = s"Unknown request type $requestType" override val description: String = s"The Adapter at ${stamp.host.getHostName} received a request of type $requestType that it cannot process." - createAndLog } \ No newline at end of file diff --git a/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala b/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala index 53746ce1b..d981a9125 100644 --- a/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala +++ b/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala @@ -1,37 +1,36 @@ package net.shrine.authentication import net.shrine.authentication.AuthenticationResult.NotAuthenticated import net.shrine.problem.{AbstractProblem, ProblemSources} import scala.xml.NodeSeq /** * @author clint * @since Dec 13, 2013 */ final case class NotAuthenticatedException(domain: String, username: String,message: String, cause: Throwable) extends RuntimeException(message, cause) { def problem = NotAuthenticatedProblem(this) } object NotAuthenticatedException { def apply(na:NotAuthenticated):NotAuthenticatedException = NotAuthenticatedException(na.domain,na.username,na.message,na.cause.getOrElse(null)) } case class NotAuthenticatedProblem(nax:NotAuthenticatedException) extends AbstractProblem(ProblemSources.Qep){ override val summary = s"Can not authenticate ${nax.domain}:${nax.username}." override val throwable = Some(nax) override val description = s"Can not authenticate ${nax.domain}:${nax.username}. ${nax.getLocalizedMessage}" override val detailsXml: NodeSeq = NodeSeq.fromSeq(
{throwableDetail.getOrElse("")}
) - createAndLog } \ No newline at end of file diff --git a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala index a4f5ee329..f1df2acca 100644 --- a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala +++ b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala @@ -1,115 +1,112 @@ package net.shrine.authorization import net.shrine.log.Loggable import scala.util.{Failure, Success, Try} import net.shrine.client.HttpResponse import net.shrine.i2b2.protocol.pm.GetUserConfigurationRequest import net.shrine.i2b2.protocol.pm.User import net.shrine.problem._ import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.ErrorResponse import scala.util.control.NonFatal /** * @author clint * @since Apr 5, 2013 */ trait PmAuthorizerComponent { self: PmHttpClientComponent with Loggable => import PmAuthorizerComponent._ //noinspection RedundantBlock object Pm { def parsePmResult(authn: AuthenticationInfo)(httpResponse: HttpResponse): Try[Either[ErrorResponse, User]] = { User.fromI2b2(httpResponse.body).map(Right(_)).recoverWith { case NonFatal(e) => { debug(s"Couldn't extract a User from '$httpResponse'") Try(Left(ErrorResponse.fromI2b2(httpResponse.body))) } }.recover { case NonFatal(e) => { val problem = CouldNotInterpretResponseFromPmCell(pmPoster.url,authn,httpResponse,e) LoggingProblemHandler.handleProblem(problem) Left(ErrorResponse(problem)) } } } def authorize(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo): AuthorizationStatus = { val request = GetUserConfigurationRequest(authn) val responseAttempt: Try[HttpResponse] = Try { debug(s"Authorizing with PM cell at ${pmPoster.url}") pmPoster.post(request.toI2b2String) } val authStatusAttempt: Try[AuthorizationStatus with Product with Serializable] = responseAttempt.flatMap(parsePmResult(authn)).map { case Right(user) => { val managerUserOption = for { roles <- user.rolesByProject.get(projectId) if neededRoles.forall(roles.contains) } yield user managerUserOption.map(Authorized).getOrElse { NotAuthorized(MissingRequiredRoles(projectId,neededRoles,authn)) } } case Left(errorResponse) => { //todo remove when ErrorResponse gets its message info(s"ErrorResponse message '${errorResponse.errorMessage}' may not have carried through to the NotAuthorized object") NotAuthorized(errorResponse.problemDigest) } } authStatusAttempt match { case Success(s) => s case Failure(x) => NotAuthorized(CouldNotReachPmCell(pmPoster.url,authn,x)) } } } } object PmAuthorizerComponent { sealed trait AuthorizationStatus case class Authorized(user: User) extends AuthorizationStatus case class NotAuthorized(problemDigest: ProblemDigest) extends AuthorizationStatus { def toErrorResponse = ErrorResponse(problemDigest.summary,problemDigest) } object NotAuthorized { def apply(problem:Problem):NotAuthorized = NotAuthorized(problem.toDigest) } } case class MissingRequiredRoles(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep) { override val summary: String = s"User ${authn.domain}:${authn.username} is missing roles in project '$projectId'" override val description:String = s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'" - createAndLog } case class CouldNotReachPmCell(pmUrl:String,authn: AuthenticationInfo,x:Throwable) extends AbstractProblem(ProblemSources.Qep) { override val throwable = Some(x) override val summary: String = s"Could not reach PM cell." override val description:String = s"Shrine encountered ${throwable.get} while attempting to reach the PM cell at $pmUrl for ${authn.domain}:${authn.username}." - createAndLog } case class CouldNotInterpretResponseFromPmCell(pmUrl:String,authn: AuthenticationInfo,httpResponse: HttpResponse,x:Throwable) extends AbstractProblem(ProblemSources.Qep) { override val throwable = Some(x) override def summary: String = s"Could not interpret response from PM cell." override def description: String = s"Shrine could not interpret the response from the PM cell at ${pmUrl} for ${authn.domain}:${authn.username}: due to ${throwable.get}" override val detailsXml =
Response is {httpResponse} {throwableDetail.getOrElse("")}
- createAndLog } \ No newline at end of file diff --git a/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala b/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala index cf5560a09..790ba61ba 100644 --- a/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala +++ b/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala @@ -1,236 +1,235 @@ package net.shrine.authorization import java.net.URL import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager} import java.security.cert.X509Certificate import akka.io.IO import com.typesafe.config.{Config, ConfigFactory} import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized} import net.shrine.authorization.steward.{InboundShrineQuery, ResearchersTopics, TopicIdAndName} import net.shrine.log.Loggable import net.shrine.protocol.{ApprovedTopic, AuthenticationInfo, ErrorResponse, ReadApprovedQueryTopicsRequest, ReadApprovedQueryTopicsResponse, RunQueryRequest} import net.shrine.config.ConfigExtensions import org.json4s.native.JsonMethods.parse import org.json4s.{DefaultFormats, Formats} import akka.actor.ActorSystem import akka.util.Timeout import akka.pattern.ask import net.shrine.problem.{AbstractProblem, ProblemSources} import spray.can.Http import spray.can.Http.{HostConnectorInfo, HostConnectorSetup} import spray.http.{BasicHttpCredentials, HttpRequest, HttpResponse} import spray.http.StatusCodes.{OK, Unauthorized, UnavailableForLegalReasons} import spray.httpx.TransformerPipelineSupport.WithTransformation import spray.httpx.Json4sSupport import spray.client.pipelining.{Get, Post, addCredentials, sendReceive} import spray.io.{ClientSSLEngineProvider, PipelineContext, SSLContextProvider} import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import scala.concurrent.{Await, Future} import scala.language.postfixOps /** * A QueryAuthorizationService that talks to the standard data steward application to learn about topics (intents) and check that a * shrine query can be run * * @author david * @since 4/2/15 */ final case class StewardQueryAuthorizationService(qepUserName:String, qepPassword:String, stewardBaseUrl:URL, defaultTimeout:FiniteDuration = 10 seconds) extends QueryAuthorizationService with Loggable with Json4sSupport { import system.dispatcher // execution context for futures implicit val system = ActorSystem("AuthorizationServiceActors",ConfigFactory.load("shrine")) //todo use shrine's config implicit val timeout:Timeout = Timeout.durationToTimeout(defaultTimeout)//10 seconds implicit def json4sFormats: Formats = DefaultFormats val qepCredentials = BasicHttpCredentials(qepUserName,qepPassword) def sendHttpRequest(httpRequest: HttpRequest):Future[HttpResponse] = { // Place a special SSLContext in scope here to be used by HttpClient. // It trusts all server certificates. // Most important - it will encrypt all of the traffic on the wire. implicit def trustfulSslContext: SSLContext = { object BlindFaithX509TrustManager extends X509TrustManager { def checkClientTrusted(chain: Array[X509Certificate], authType: String) = (info(s"Client asked BlindFaithX509TrustManager to check $chain for $authType")) def checkServerTrusted(chain: Array[X509Certificate], authType: String) = (info(s"Server asked BlindFaithX509TrustManager to check $chain for $authType")) def getAcceptedIssuers = Array[X509Certificate]() } val context = SSLContext.getInstance("TLS") context.init(Array[KeyManager](), Array(BlindFaithX509TrustManager), null) context } implicit def trustfulSslContextProvider: SSLContextProvider = { SSLContextProvider.forContext(trustfulSslContext) } class CustomClientSSLEngineProvider extends ClientSSLEngineProvider { def apply(pc: PipelineContext) = ClientSSLEngineProvider.default(trustfulSslContextProvider).apply(pc) } implicit def sslEngineProvider: ClientSSLEngineProvider = new CustomClientSSLEngineProvider val requestWithCredentials = httpRequest ~> addCredentials(qepCredentials) val responseFuture: Future[HttpResponse] = for { HostConnectorInfo(hostConnector, _) <- { val hostConnectorSetup = new HostConnectorSetup(httpRequest.uri.authority.host.address, httpRequest.uri.authority.port, sslEncryption = httpRequest.uri.scheme=="https")( sslEngineProvider = sslEngineProvider) IO(Http) ask hostConnectorSetup } response <- sendReceive(hostConnector).apply(requestWithCredentials) _ <- hostConnector ask Http.CloseAll } yield response responseFuture } /* todo to recycle connections with http://spray.io/documentation/1.2.3/spray-client/ if needed def sendHttpRequest(httpRequest: HttpRequest):Future[HttpResponse] = { import akka.io.IO import akka.pattern.ask import spray.can.Http val requestWithCredentials = httpRequest ~> addCredentials(qepCredentials) //todo failures via onFailure callbacks for{ sendR:SendReceive <- connectorSource response:HttpResponse <- sendR(requestWithCredentials) } yield response } val connectorSource: Future[SendReceive] = //Future[HttpRequest => Future[HttpResponse]] for ( //keep asking for a connector until you get one //todo correct URL // Http.HostConnectorInfo(connector, _) <- IO(Http) ? Http.HostConnectorSetup("www.spray.io", port = 8080) Http.HostConnectorInfo(connector, _) <- IO(Http) ? Http.HostConnectorSetup("localhost", port = 6060) ) yield sendReceive(connector) */ def sendAndReceive(httpRequest: HttpRequest,timeout:Duration = defaultTimeout):HttpResponse = { info("StewardQueryAuthorizationService will request "+httpRequest.uri) //todo someday log request and response val responseFuture = sendHttpRequest(httpRequest) val response:HttpResponse = Await.result(responseFuture,timeout) info("StewardQueryAuthorizationService received response with status "+response.status) response } //Contact a data steward and either return an Authorized or a NotAuthorized or throw an exception override def authorizeRunQueryRequest(runQueryRequest: RunQueryRequest): AuthorizationResult = { debug(s"authorizeRunQueryRequest started for ${runQueryRequest.queryDefinition.name}") val interpreted = runQueryRequest.topicId.fold( authorizeRunQueryRequestNoTopic(runQueryRequest) )( authorizeRunQueryRequestForTopic(runQueryRequest,_) ) debug(s"authorizeRunQueryRequest completed with $interpreted) for ${runQueryRequest.queryDefinition.name}") interpreted } def authorizeRunQueryRequestNoTopic(runQueryRequest: RunQueryRequest): AuthorizationResult = { val userName = runQueryRequest.authn.username val queryId = runQueryRequest.queryDefinition.name //xml's .text returns something that looks like xquery with backwards slashes. toString() returns xml. val queryForJson = InboundShrineQuery(runQueryRequest.networkQueryId,queryId,runQueryRequest.queryDefinition.toXml.toString()) val request = Post(s"$stewardBaseUrl/steward/qep/requestQueryAccess/user/$userName", queryForJson) val response:HttpResponse = sendAndReceive(request,runQueryRequest.waitTime) interpretAuthorizeRunQueryResponse(response) } def authorizeRunQueryRequestForTopic(runQueryRequest: RunQueryRequest,topicIdString:String): AuthorizationResult = { val userName = runQueryRequest.authn.username val queryId = runQueryRequest.queryDefinition.name //xml's .text returns something that looks like xquery with backwards slashes. toString() returns xml. val queryForJson = InboundShrineQuery(runQueryRequest.networkQueryId,queryId,runQueryRequest.queryDefinition.toXml.toString()) val request = Post(s"$stewardBaseUrl/steward/qep/requestQueryAccess/user/$userName/topic/$topicIdString", queryForJson) val response:HttpResponse = sendAndReceive(request,runQueryRequest.waitTime) debug(s"authorizeRunQueryRequestForTopic response is $response") interpretAuthorizeRunQueryResponse(response) } /** Interpret the response from the steward app. Primarily here for testing. */ def interpretAuthorizeRunQueryResponse(response:HttpResponse):AuthorizationResult = { response.status match { case OK => { val topicJson = new String(response.entity.data.toByteArray) debug(s"topicJson is $topicJson") val topic:Option[TopicIdAndName] = parse(topicJson).extractOpt[TopicIdAndName] debug(s"topic is $topic") Authorized(topic.map(x => (x.id,x.name))) } case UnavailableForLegalReasons => NotAuthorized(response.entity.asString) case Unauthorized => throw new AuthorizationException(s"steward rejected qep's login credentials. $response") case _ => throw new AuthorizationException(s"QueryAuthorizationService detected a problem: $response") } } //Either read the approved topics from a data steward or have an error response. override def readApprovedEntries(readTopicsRequest: ReadApprovedQueryTopicsRequest): Either[ErrorResponse, ReadApprovedQueryTopicsResponse] = { val userName = readTopicsRequest.authn.username val request = Get(s"$stewardBaseUrl/steward/qep/approvedTopics/user/$userName") val response:HttpResponse = sendAndReceive(request,readTopicsRequest.waitTime) if(response.status == OK) { val topicsJson = new String(response.entity.data.toByteArray) val topicsFromSteward: ResearchersTopics = parse(topicsJson).extract[ResearchersTopics] val topics: Seq[ApprovedTopic] = topicsFromSteward.topics.map(topic => ApprovedTopic(topic.id, topic.name)) Right(ReadApprovedQueryTopicsResponse(topics)) } else Left(ErrorResponse(ErrorStatusFromDataStewardApp(response,stewardBaseUrl))) } override def toString() = { super.toString().replaceAll(qepPassword,"REDACTED") } } object StewardQueryAuthorizationService { def apply(config:Config):StewardQueryAuthorizationService = StewardQueryAuthorizationService ( qepUserName = config.getString("qepUserName"), qepPassword = config.getString("qepPassword"), stewardBaseUrl = config.get("stewardBaseUrl", new URL(_)) ) } case class ErrorStatusFromDataStewardApp(response:HttpResponse,stewardBaseUrl:URL) extends AbstractProblem(ProblemSources.Qep) { override val summary: String = s"Data Steward App responded with status ${response.status}" override val description:String = s"The Data Steward App at ${stewardBaseUrl} responded with status ${response.status}, not OK." override val detailsXml =
Response is {response} {throwableDetail.getOrElse("")}
- createAndLog } \ No newline at end of file diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/NonI2b2ableResponse.scala b/commons/protocol/src/main/scala/net/shrine/protocol/NonI2b2ableResponse.scala index d99b9a238..7fc8a38ff 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/NonI2b2ableResponse.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/NonI2b2ableResponse.scala @@ -1,23 +1,22 @@ package net.shrine.protocol import net.shrine.problem.{AbstractProblem, ProblemSources} import scala.xml.NodeSeq /** * @author clint * @since Apr 30, 2013 */ trait NonI2b2ableResponse { self: ShrineResponse => //Fail loudly here protected override def i2b2MessageBody: NodeSeq = ??? override def toI2b2: NodeSeq = ErrorResponse(NoI2b2AnalogExists(this.getClass)).toI2b2 } case class NoI2b2AnalogExists(claz:Class[_ <: NonI2b2ableResponse]) extends AbstractProblem(ProblemSources.Unknown) { override def summary: String = s"${ claz.getSimpleName } can't be marshalled to i2b2 XML, as it has no i2b2 analog" override def description: String = s"${ claz.getSimpleName } can't be marshalled to i2b2 XML, as it has no i2b2 analog" - createAndLog } \ No newline at end of file diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala index 7c229768f..069ef0197 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala @@ -1,401 +1,400 @@ package net.shrine.protocol import javax.xml.datatype.XMLGregorianCalendar import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, Problem, ProblemDigest, ProblemSources} import net.shrine.protocol.QueryResult.StatusType import scala.xml.NodeSeq import net.shrine.util.{NodeSeqEnrichments, OptionEnrichments, SEnum, Tries, XmlDateHelper, XmlUtil} import net.shrine.serialization.{I2b2Marshaller, XmlMarshaller} import scala.util.Try /** * @author Bill Simons * @since 4/15/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html * * NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing */ final case class QueryResult ( resultId: Long, instanceId: Long, resultType: Option[ResultOutputType], setSize: Long, startDate: Option[XMLGregorianCalendar], endDate: Option[XMLGregorianCalendar], description: Option[String], statusType: StatusType, statusMessage: Option[String], problemDigest: Option[ProblemDigest] = None, breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty ) extends XmlMarshaller with I2b2Marshaller with Loggable { //only used in tests def this( resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, statusType: QueryResult.StatusType) = { this( resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), None, //description statusType, None) //statusMessage } def this( resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, description: String, statusType: QueryResult.StatusType) = { this( resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), Option(description), statusType, None) //statusMessage } def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match { case Some(rt) => rt == testedResultType case _ => false } import QueryResult._ //NB: Fragile, non-type-safe == def isError = statusType == StatusType.Error def elapsed: Option[Long] = { def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis for { start <- startDate end <- endDate } yield inMillis(end) - inMillis(start) } //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = { breakdowns.values.toSeq.sortBy(_.resultType.name) } override def toI2b2: NodeSeq = { import OptionEnrichments._ XmlUtil.stripWhitespace { { resultId } { instanceId } { description.toXml() } { resultType.fold( ResultOutputType.ERROR.toI2b2NameOnly("") ){ rt => if(rt.isBreakdown) rt.toI2b2NameOnly() else if (rt.isError) rt.toI2b2NameOnly() //The result type can be an error else if (statusType.isError) rt.toI2b2NameOnly() //Or the status type can be an error else rt.toI2b2 } } { setSize } { startDate.toXml() } { endDate.toXml() } { statusType } { statusType.toI2b2(this) } { //NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case, //so if we're going to do that, let's produce saner XML. sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data")) } } } override def toXml: NodeSeq = XmlUtil.stripWhitespace { import OptionEnrichments._ { resultId } { instanceId } { resultType.toXml(_.toXml) } { setSize } { startDate.toXml() } { endDate.toXml() } { description.toXml() } { statusType } { statusMessage.toXml() } { //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. sortedBreakdowns.map(_.toXml) } { problemDigest.map(_.toXml).getOrElse("") } } def withId(id: Long): QueryResult = copy(resultId = id) def withInstanceId(id: Long): QueryResult = copy(instanceId = id) def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize)) def withSetSize(size: Long): QueryResult = copy(setSize = size) def withDescription(desc: String): QueryResult = copy(description = Option(desc)) def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType)) def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData)) def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns) } object QueryResult { final case class StatusType( name: String, isDone: Boolean, i2b2Id: Option[Int] = Some(-1), private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2) extends StatusType.Value { def isError = this == StatusType.Error def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult) } object StatusType extends SEnum[StatusType] { private val defaultToI2b2: QueryResult => NodeSeq = { queryResult => val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{ throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id") } { i2b2Id }{ queryResult.statusType.name } } val noMessage:NodeSeq = null val Error = StatusType("ERROR", isDone = true, None, { queryResult => (queryResult.statusMessage, queryResult.problemDigest) match { case (Some(msg),Some(pd)) => { if(msg != "ERROR") msg else pd.summary } ++ pd.toXml case (Some(msg),None) => { msg } case (None,Some(pd)) => { pd.summary } ++ pd.toXml case (None, None) => noMessage } }) val Finished = StatusType("FINISHED", isDone = true, Some(3)) //TODO: Can we use the same for Queued, Processing, and Incomplete? val Processing = StatusType("PROCESSING", isDone = false, Some(2)) //todo only used in tests val Queued = StatusType("QUEUED", isDone = false, Some(2)) val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2)) //TODO: What s should these have? Does anyone care? val Held = StatusType("HELD", isDone = false) val SmallQueue = StatusType("SMALL_QUEUE", isDone = false) val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false) val LargeQueue = StatusType("LARGE_QUEUE", isDone = false) val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false) } def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _) def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = { import ResultOutputType.valueOf val typeName = asText(elemNames: _*)(xml) valueOf(typeName) orElse valueOf(breakdownTypes)(typeName) } def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = { val attempt = parse(xml) attempt.toOption } def extractProblemDigest(xml: NodeSeq):Option[ProblemDigest] = { val subXml = xml \ "problem" if(subXml.nonEmpty) Some(ProblemDigest.fromXml(xml)) else None } def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def extract(elemName: String): Option[String] = { Option((xml \ elemName).text.trim).filter(!_.isEmpty) } def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate) val asLong = extractLong(xml) _ import NodeSeqEnrichments.Strictness._ import Tries.sequence def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = { //noinspection ScalaUnnecessaryParentheses val mapAttempt = for { subXml <- xml.withChild(elemName) envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes))) mappings = envelopes.map(envelope => (envelope.resultType -> envelope)) } yield Map.empty ++ mappings mapAttempt.getOrElse(Map.empty) } QueryResult( resultId = asLong("resultId"), instanceId = asLong("instanceId"), resultType = extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml), setSize = asLong("setSize"), startDate = extractDate("startDate"), endDate = extractDate("endDate"), description = extract("description"), statusType = StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call statusMessage = extract("statusMessage"), problemDigest = extractProblemDigest(xml), breakdowns = extractBreakdowns("resultEnvelope") ) } def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def asLong = extractLong(xml) _ def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim) def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate) val statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get //TODO: Avoid fragile .get call val statusMessage: Option[String] = asTextOption("query_status_type", "description") val encodedProblemDigest = extractProblemDigest(xml \ "query_status_type") val problemDigest = if (encodedProblemDigest.isDefined) encodedProblemDigest else if (statusType.isError) Some(ErrorStatusFromCrc(statusMessage,xml.text).toDigest) else None case class Filling( resultType:Option[ResultOutputType], setSize:Long, startDate:Option[XMLGregorianCalendar], endDate:Option[XMLGregorianCalendar] ) val filling = if(!statusType.isError) { val resultType: Option[ResultOutputType] = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2) val setSize = asLong("set_size") val startDate = asXmlGcOption("start_date") val endDate = asXmlGcOption("end_date") Filling(resultType,setSize,startDate,endDate) } else { val resultType = None val setSize = 0L val startDate = None val endDate = None Filling(resultType,setSize,startDate,endDate) } QueryResult( resultId = asLong("result_instance_id"), instanceId = asLong("query_instance_id"), resultType = filling.resultType, setSize = filling.setSize, startDate = filling.startDate, endDate = filling.endDate, description = asTextOption("description"), statusType = statusType, statusMessage = statusMessage, problemDigest = problemDigest ) } def errorResult(description: Option[String], statusMessage: String,problemDigest:ProblemDigest):QueryResult = { QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } def errorResult(description: Option[String], statusMessage: String,problem:Problem):QueryResult = { val problemDigest = problem.toDigest QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } /** * For reconstituting errorResults from a database */ def errorResult(description:Option[String], statusMessage:String, codec:String,stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq): QueryResult = { // This would require parsing the stamp text to change, and without a standard locale that's nigh impossible. // If this is replaced with real problems, then this can be addressed then. For now, passing on zero is the best bet. val problemDigest = ProblemDigest(codec,stampText,summary,digestDescription,detailsXml,0) QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } } case class ErrorStatusFromCrc(messageFromCrC:Option[String], xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) { override val summary: String = "The I2B2 CRC reported an internal error." override val description:String = s"The I2B2 CRC responded with status type ERROR ${messageFromCrC.fold(" but no message")(message => s"and a message of '$message'")}" override val detailsXml =

CRC's Response is {xmlResponseFromCrc}
- createAndLog } diff --git a/commons/util/src/main/scala/net/shrine/problem/DashboardProblemsDatabase.scala b/commons/util/src/main/scala/net/shrine/problem/DashboardProblemDatabase.scala similarity index 100% rename from commons/util/src/main/scala/net/shrine/problem/DashboardProblemsDatabase.scala rename to commons/util/src/main/scala/net/shrine/problem/DashboardProblemDatabase.scala diff --git a/commons/util/src/main/scala/net/shrine/problem/Problem.scala b/commons/util/src/main/scala/net/shrine/problem/Problem.scala index af59b9c5e..49af1b49d 100644 --- a/commons/util/src/main/scala/net/shrine/problem/Problem.scala +++ b/commons/util/src/main/scala/net/shrine/problem/Problem.scala @@ -1,215 +1,241 @@ package net.shrine.problem import java.net.InetAddress import java.text.SimpleDateFormat import java.util.Date import net.shrine.log.Loggable import net.shrine.serialization.{XmlMarshaller, XmlUnmarshaller} -import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.{Future, Promise} import scala.xml.{Elem, Node, NodeSeq} /** * Describes what information we have about a problem at the site in code where we discover it. * * @author david * @since 8/6/15 */ trait Problem { def summary:String def problemName = getClass.getName def throwable:Option[Throwable] = None def stamp:Stamp def description:String def exceptionXml(exception:Option[Throwable]): Option[Elem] = { exception.map{x => {x.getClass.getName} {x.getMessage} {x.getStackTrace.map(line => {line})}{exceptionXml(Option(x.getCause)).getOrElse("")} }} def throwableDetail: Option[Elem] = exceptionXml(throwable) def detailsXml: NodeSeq = NodeSeq.fromSeq(
{throwableDetail.getOrElse("")}
) def toDigest:ProblemDigest = ProblemDigest(problemName,stamp.pretty,summary,description,detailsXml, stamp.time) def logDigest:Problem = { if (!ProblemConfigSource.turnOffConnector) { val problems = Problems problems.DatabaseConnector.insertProblem(toDigest) } this } def createAndLog:Problem = { if (!ProblemConfigSource.turnOffConnector) Problems.DatabaseConnector.insertProblem(toDigest) this } + /** + * The hack that will get us through until onCreate in 2.13 + * The problem is that we want to insert the createAndLog call after a problem is constructed. + * The only way to currently do that is with DelayedInit... which is just no. + * Thus, the hack (that's still better than DelayedInit) is to watch the summary, description, + * and throwable field, and call createAndLog once we know they've been initialized. The one + * caveat is that creating throwable is optional, so in the worst case we wait 25 ms then decide + * it's not gettting initialized. + * @return + */ + def logAfterInitialization:Future[Problem] = { + Future { + while (synchronized(summary) == null || synchronized(description) == null) { + Thread.sleep(10) + } + + var count = 0 + while(count < 5 && synchronized(throwable).isEmpty) { + Thread.sleep(5) + count += 1 + } + + createAndLog + } + } } case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq, epoch: Long) extends XmlMarshaller { override def toXml: Node = { {codec} {stampText} {summary} {description} {epoch} {detailsXml} } /** * Ignores detailXml. equals with scala.xml is impossible. See http://www.scala-lang.org/api/2.10.3/index.html#scala.xml.Equality$ */ override def equals(other: Any): Boolean = other match { case that: ProblemDigest => (that canEqual this) && codec == that.codec && stampText == that.stampText && summary == that.summary && description == that.description && epoch == that.epoch case _ => false } /** * Ignores detailXml */ override def hashCode: Int = { val prime = 67 codec.hashCode + prime * (stampText.hashCode + prime *(summary.hashCode + prime * (description.hashCode + prime * epoch.hashCode()))) } } object ProblemDigest extends XmlUnmarshaller[ProblemDigest] with Loggable { override def fromXml(xml: NodeSeq): ProblemDigest = { val problemNode = xml \ "problem" require(problemNode.nonEmpty,s"No problem tag in $xml") def extractText(tagName:String) = (problemNode \ tagName).text val codec = extractText("codec") val stampText = extractText("stamp") val summary = extractText("summary") val description = extractText("description") val detailsXml: NodeSeq = problemNode \ "details" val epoch = try { extractText("epoch").toLong } catch { case nx:NumberFormatException => error(s"While parsing xml representing a ProblemDigest, the epoch could not be parsed into a long", nx) 0 } ProblemDigest(codec,stampText,summary,description,detailsXml,epoch) } } case class Stamp(host:InetAddress,time:Long,source:ProblemSources.ProblemSource) { def pretty = s"${new Date(time)} on ${host.getHostName} ${source.pretty}" } object Stamp { //TODO: val dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")? //TODO: Currently the stamp text is locale specific, which can change depending on the jre/computer running it... def apply(source:ProblemSources.ProblemSource, timer: => Long): Stamp = Stamp(InetAddress.getLocalHost, timer,source) } /** * An abstract problem to enable easy creation of Problems. Note that when overriding fields, * you should only use def or lazy val, and not val. * See: http://stackoverflow.com/questions/15346600/field-inside-object-which-extends-app-trait-is-set-to-null-why-is-that-so * @param source */ abstract class AbstractProblem(source:ProblemSources.ProblemSource) extends Problem { println(s"Problem $getClass created") def timer = System.currentTimeMillis - lazy val stamp = Stamp(source, timer) + override val stamp = Stamp(source, System.currentTimeMillis) + logAfterInitialization } trait ProblemHandler { def handleProblem(problem:Problem) } /** * An example problem handler */ object LoggingProblemHandler extends ProblemHandler with Loggable { override def handleProblem(problem: Problem): Unit = { problem.throwable.fold(error(problem.toString))(throwable => error(problem.toString,throwable) ) } } //object DatabaseProblemhandler extends ProblemHandler { // override def handleProblem(problem: Problem): Unit = { // Problems.DatabaseConnector.insertProblem(problem.toDigest) // } // //} object ProblemSources{ sealed trait ProblemSource { def pretty = getClass.getSimpleName.dropRight(1) } case object Adapter extends ProblemSource case object Hub extends ProblemSource case object Qep extends ProblemSource case object Dsa extends ProblemSource case object Unknown extends ProblemSource def problemSources = Set(Adapter,Hub,Qep,Dsa,Unknown) } case class ProblemNotYetEncoded(internalSummary:String,t:Option[Throwable] = None) extends AbstractProblem(ProblemSources.Unknown){ override val summary = "An unanticipated problem encountered." override val throwable = { val rx = t.fold(new IllegalStateException(s"$summary"))( new IllegalStateException(s"$summary",_) ) rx.fillInStackTrace() Some(rx) } val reportedAtStackTrace = new IllegalStateException("Capture reporting stack trace.") override val description = "This problem is not yet classified in Shrine source code. Please report the details to the Shrine dev team." override val detailsXml: NodeSeq = NodeSeq.fromSeq(
{internalSummary} {throwableDetail.getOrElse("")}
) - createAndLog } object ProblemNotYetEncoded { def apply(summary:String,x:Throwable):ProblemNotYetEncoded = ProblemNotYetEncoded(summary,Some(x)) } diff --git a/commons/util/src/main/scala/net/shrine/problem/TestProblem.scala b/commons/util/src/main/scala/net/shrine/problem/TestProblem.scala index 5a021356d..55a3419c9 100644 --- a/commons/util/src/main/scala/net/shrine/problem/TestProblem.scala +++ b/commons/util/src/main/scala/net/shrine/problem/TestProblem.scala @@ -1,12 +1,11 @@ package net.shrine.problem /** * @author david * @since 1.22 */ case class TestProblem(override val summary: String = "test summary", override val description:String = "test description", override val throwable: Option[Throwable] = None) extends AbstractProblem(ProblemSources.Unknown) { override def timer = 0 - createAndLog } diff --git a/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala b/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala index 6963ae766..df03a47f1 100644 --- a/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala +++ b/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala @@ -1,92 +1,92 @@ package net.shrine.problem import org.scalatest.concurrent.ScalaFutures import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} import slick.driver.H2Driver.api._ import scala.concurrent.duration._ /** * Test creation, insertion, querying, and deletion of ProblemDigest values into an * in-memory H2 database. Demonstrates proof of concept mapping of ProblemDigest * case class into a database. */ class DashboardProblemDatabaseTest extends FlatSpec with BeforeAndAfter with ScalaFutures with Matchers { implicit val timeout = 10.seconds val connector = Problems.DatabaseConnector val IO = connector.IO val problemDigests = Seq( ProblemDigest("MJPG", "01:01:01", "summary here", "description here" ,
uh not sure
, 2), ProblemDigest("wewu", "01:02:01", "coffee spill", "coffee everywhere" ,
He chose decaf
, 1), ProblemDigest("wuwu", "02:01:01", "squirrel" , "chewed all the cables",
Like ALL of them
, 0), ProblemDigest("code", "10:01:02", "such summary", "such description" ,
Wow
, 3)) before { connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false connector.runBlocking(IO.createIfNotExists >> IO.tableExists) shouldBe true connector.runBlocking(IO.createIfNotExists) shouldBe NoOperation connector.runBlocking(IO.selectAll) shouldBe empty } after { connector.runBlocking(IO.tableExists) shouldBe true connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false connector.runBlocking(IO.dropIfExists) shouldBe NoOperation } "The Database" should "Connect without any problems" in { // Insert the suppliers and ProblemDigests connector.executeTransactionBlocking(IO.problems ++= problemDigests) // Test that they are all in the table var * = connector.runBlocking(IO.selectAll) * should contain theSameElementsAs problemDigests * should have length problemDigests.length // Reset the table connector.runBlocking(IO.resetTable >> IO.selectAll) shouldBe empty // Run the test again connector.executeTransactionBlocking(IO.problems += problemDigests.head, IO.problems += problemDigests(1), IO.problems += problemDigests(2), IO.problems += problemDigests(3)) // Test that they are all in the table * = connector.runBlocking(IO.selectAll) * should contain theSameElementsAs problemDigests * should have length problemDigests.length // Test that the simple select and filter queries work val filtered = connector.runBlocking(IO.problems.filter(_.codec === "code").map(_.description).result) filtered should have length 1 filtered.head shouldBe problemDigests(3).description // This also tests that our conversion from xml to strings works val xml = connector.runBlocking(IO.problems.map(_.xml).result) xml should have length problemDigests.length xml should contain theSameElementsAs problemDigests.map(_.detailsXml.toString()) val result = connector.runBlocking(IO.sizeAndProblemDigest(2)) result._1 should have length 2 result._2 shouldBe problemDigests.length result._1.head shouldBe problemDigests(3) result._1(1) shouldBe problemDigests.head val resultOverLength = connector.runBlocking(IO.sizeAndProblemDigest(10)) resultOverLength._1 should have length 4 resultOverLength._1 should contain theSameElementsAs problemDigests connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size val testProblem = ProblemDatabaseTestProblem(ProblemSources.Unknown) + Thread.sleep(50) connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size + 1 } } case class ProblemDatabaseTestProblem(source: ProblemSources.ProblemSource) extends AbstractProblem(source: ProblemSources.ProblemSource) { override def summary: String = "This is a test problem! No user should ever see this." override def description: String = "Wow, this is a nice looking problem. I mean really, just look at it." - createAndLog } \ No newline at end of file diff --git a/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala b/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala index 7dbdab662..0b444d164 100644 --- a/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala +++ b/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala @@ -1,90 +1,89 @@ package net.shrine.hms.authorization import java.net.URL import com.typesafe.config.Config import net.shrine.authentication.{AuthenticationResult, Authenticator} import net.shrine.authorization.{AuthorizationResult, QueryAuthorizationService} import net.shrine.client.EndpointConfig import net.shrine.log.Loggable import net.shrine.protocol.{AuthenticationInfo, CredentialConfig, ErrorResponse, ReadApprovedQueryTopicsRequest, ReadApprovedQueryTopicsResponse, RunQueryRequest} import net.shrine.config.ConfigExtensions import net.shrine.problem.{AbstractProblem, ProblemSources} /** * @author Bill Simons * @since 1/30/12 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ final case class HmsDataStewardAuthorizationService( sheriffClient: SheriffClient, authenticator: Authenticator ) extends QueryAuthorizationService with Loggable { import net.shrine.hms.authorization.HmsDataStewardAuthorizationService._ override def readApprovedEntries(request: ReadApprovedQueryTopicsRequest): Either[ErrorResponse, ReadApprovedQueryTopicsResponse] = { val authn = request.authn authenticate(authn) match { case None => Left(ErrorResponse(HMSNotAuthenticatedProblem(authn))) case Some(ecommonsUsername) => val topics = sheriffClient.getApprovedEntries(ecommonsUsername) Right(ReadApprovedQueryTopicsResponse(topics)) } } override def authorizeRunQueryRequest(request: RunQueryRequest): AuthorizationResult = { val authn = request.authn if (request.topicId.isEmpty) { AuthorizationResult.NotAuthorized(s"HMS queries require a topic id; couldn't authenticate user ${toDomainAndUser(authn)}") } else { authenticate(authn) match { case None => AuthorizationResult.NotAuthorized(s"Requested topic is not approved; couldn't authenticate user ${toDomainAndUser(authn)}") case Some(ecommonsUsername) => sheriffClient.isAuthorized(ecommonsUsername, request.topicId.get, request.queryDefinition.toI2b2String) } } } private def authenticate(authn: AuthenticationInfo): Option[String] = { val authenticationResult = authenticator.authenticate(authn) identifyEcommonsUsername(authenticationResult) } } object HmsDataStewardAuthorizationService { def apply(config:Config,authenticator: Authenticator):HmsDataStewardAuthorizationService = { val endpointUrl = config.getString("sheriffEndpoint"+EndpointConfig.Keys.url) val credentials = config.getConfigured("sheriffCredentials", CredentialConfig(_)) val sheriffClient = JerseySheriffClient(endpointUrl, credentials.username, credentials.password) HmsDataStewardAuthorizationService(sheriffClient, authenticator) } private def toDomainAndUser(authn: AuthenticationInfo): String = s"${authn.domain}:${authn.username}" def identifyEcommonsUsername(authenticationResult: AuthenticationResult): Option[String] = authenticationResult match { case AuthenticationResult.Authenticated(_, ecommonsUsername) => Option(ecommonsUsername) case _ => None } } case class HMSNotAuthenticatedProblem(authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep){ override val summary = s"Can not authenticate ${authn.domain}:${authn.username}." override val description = s"Can not authenticate ${authn.domain}:${authn.username}." - createAndLog } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala index 37338324f..0a1de8fe6 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala @@ -1,125 +1,121 @@ package net.shrine.aggregation import java.net.{ConnectException, UnknownHostException} import com.sun.jersey.api.client.ClientHandlerException import net.shrine.broadcaster.CouldNotParseResultsException import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemNotYetEncoded, ProblemSources} import scala.concurrent.duration.Duration import net.shrine.protocol.{BaseShrineResponse, ErrorResponse, FailureResult, FailureResult$, NodeId, Result, SingleNodeResult, Timeout} /** * * @author Clint Gilbert * @since Sep 16, 2011 * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Represents the basic aggregation strategy shared by several aggregators: * - Parses a sequence of SpinResultEntries into a sequence of some * combination of valid responses, ErrorResponses, and invalid * responses (cases where ShrineResponse.fromXml returns None) * - Filters the valid responses, weeding out responses that aren't of * the expected type * Invokes an abstract method with the valid responses, errors, and * invalid responses. * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class BasicAggregator[T <: BaseShrineResponse: Manifest] extends Aggregator with Loggable { private[aggregation] def isAggregatable(response: BaseShrineResponse): Boolean = { manifest[T].runtimeClass.isAssignableFrom(response.getClass) } import BasicAggregator._ override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): BaseShrineResponse = { val resultsOrErrors: Iterable[ParsedResult[T]] = { for { result <- results } yield { val parsedResponse: ParsedResult[T] = result match { case Result(origin, _, errorResponse: ErrorResponse) => Error(Option(origin), errorResponse) case Result(origin, elapsed, response: T) if isAggregatable(response) => Valid(origin, elapsed, response) case Timeout(origin) => Error(Option(origin), ErrorResponse(TimedOutWithAdapter(origin))) case FailureResult(origin, cause) => cause match { case cx: ConnectException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, cx))) case uhx: UnknownHostException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, uhx))) case chx: ClientHandlerException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, chx))) case cnprx:CouldNotParseResultsException => if(cnprx.statusCode >= 400) Error(Option(origin), ErrorResponse(HttpErrorResponseProblem(cnprx))) else Error(Option(origin), ErrorResponse(CouldNotParseResultsProblem(cnprx))) case x => Error(Option(origin), ErrorResponse(ProblemNotYetEncoded(s"Failure querying node ${origin.name}",x))) } case _ => Invalid(None, s"Unexpected response in $getClass:\r\n $result") } parsedResponse } } val invalidResponses = resultsOrErrors.collect { case invalid: Invalid => invalid } val validResponses = resultsOrErrors.collect { case valid: Valid[T] => valid } val errorResponses: Iterable[Error] = resultsOrErrors.collect { case error: Error => error } //Log all parsing errors invalidResponses.map(_.errorMessage).foreach(this.error(_)) val previouslyDetectedErrors = errors.map(Error(None, _)) makeResponseFrom(validResponses, errorResponses ++ previouslyDetectedErrors, invalidResponses) } private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse } object BasicAggregator { private[aggregation] sealed abstract class ParsedResult[+T] private[aggregation] final case class Valid[T](origin: NodeId, elapsed: Duration, response: T) extends ParsedResult[T] private[aggregation] final case class Error(origin: Option[NodeId], response: ErrorResponse) extends ParsedResult[Nothing] private[aggregation] final case class Invalid(origin: Option[NodeId], errorMessage: String) extends ParsedResult[Nothing] } case class CouldNotConnectToAdapter(origin:NodeId,cx: Exception) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(cx) override val summary: String = "Shrine could not connect to the adapter." override val description: String = s"Shrine could not connect to the adapter at ${origin.name} due to ${throwable.get}." - createAndLog } case class TimedOutWithAdapter(origin:NodeId) extends AbstractProblem(ProblemSources.Hub) { override val throwable = None override val summary: String = "Timed out with adapter." override val description: String = s"Shrine observed a timeout with the adapter at ${origin.name}." - createAndLog } case class CouldNotParseResultsProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(cnrpx) override val summary: String = "Could not parse response." override val description = s"While parsing a response from ${cnrpx.url} with http code ${cnrpx.statusCode} caught '${cnrpx.cause}'" override val detailsXml =

Message body is {cnrpx.body} {throwableDetail.getOrElse("")}
- createAndLog } case class HttpErrorResponseProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(cnrpx) override val summary: String = "Adapter error." override val description = s"Observed http status code ${cnrpx.statusCode} from ${cnrpx.url} and caught ${cnrpx.cause}." override val detailsXml =
Message body is {cnrpx.body} {throwableDetail.getOrElse("")}
- createAndLog } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala index ab6099403..97789b287 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala @@ -1,44 +1,43 @@ package net.shrine.aggregation import net.shrine.aggregation.BasicAggregator.{Error, Invalid, Valid} import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.protocol.ErrorResponse import net.shrine.protocol.BaseShrineResponse /** * * @author Clint Gilbert * @since Sep 16, 2011 * * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Extends BasicAggregator to ignore Errors and Invalid responses * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class IgnoresErrorsAggregator[T <: BaseShrineResponse : Manifest] extends BasicAggregator[T] { private[aggregation] override def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse = { //Filter out errors and invalid responses makeResponseFrom(validResponses) } //Default implementation, just returns first valid response, or if there are none, an ErrorResponse private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]]): BaseShrineResponse = { validResponses.map(_.response).toSet.headOption.getOrElse{ val problem = NoValidResponsesToAggregate() ErrorResponse(problem) } } } case class NoValidResponsesToAggregate() extends AbstractProblem(ProblemSources.Hub) { override val summary: String = "No valid responses to aggregate." override val description:String = "The hub received no valid responses to aggregate." - createAndLog } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala index d87b0381b..12d42f4e9 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala @@ -1,55 +1,54 @@ package net.shrine.aggregation import net.shrine.protocol.ShrineResponse import net.shrine.aggregation.BasicAggregator.{Error, Invalid, Valid} import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.protocol.QueryResult /** * * @author Clint Gilbert * @since Sep 16, 2011 * * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Extends BasicAggregator to package Errors and Invalid responses into QueryResults * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class PackagesErrorsAggregator[T <: ShrineResponse : Manifest]( errorMessage: Option[String] = None, invalidMessage: Option[String] = None) extends BasicAggregator[T] { private[aggregation] def makeErrorResult(error: Error): QueryResult = { val Error(originOption, errorResponse) = error //Use node name as the description, to avoid giving the web UI more data than it can display val desc = originOption.map(_.name) QueryResult.errorResult(desc, errorMessage.getOrElse(errorResponse.errorMessage),errorResponse.problemDigest) } private[aggregation] def makeInvalidResult(invalid: Invalid): QueryResult = { val Invalid(originOption, errorMessage) = invalid //Use node name as the description, to avoid giving the web UI more data than it can display val desc = originOption.map(_.name) QueryResult.errorResult(desc, invalidMessage.getOrElse(errorMessage),InvalidResultProblem(invalid)) } private[aggregation] final override def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): ShrineResponse = { makeResponse(validResponses, errorResponses.map(makeErrorResult), invalidResponses.map(makeInvalidResult)) } private[aggregation] def makeResponse(validResponses: Iterable[Valid[T]], errorResponses: Iterable[QueryResult], invalidResponses: Iterable[QueryResult]): ShrineResponse } case class InvalidResultProblem(invalid:Invalid) extends AbstractProblem(ProblemSources.Hub) { override def summary: String = s"Invalid response." override def description: String = s"${invalid.errorMessage} from ${invalid.origin.getOrElse("an unknown node")}" - createAndLog } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala index 79bf27266..b61c2a5b0 100644 --- a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala +++ b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala @@ -1,534 +1,533 @@ package net.shrine.qep.queries import java.sql.SQLException import java.util.concurrent.TimeoutException import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName} import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources} import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, FlagQueryRequest, I2b2ResultEnvelope, QueryMaster, QueryResult, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RenameQueryRequest, ResultOutputType, ResultOutputTypes, RunQueryRequest, UnFlagQueryRequest} import net.shrine.qep.QepConfigSource import net.shrine.slick.{CouldNotRunDbIoActionException, TestableDataSourceCreator} import net.shrine.util.XmlDateHelper import slick.driver.JdbcProfile import scala.collection.immutable.Iterable import scala.concurrent.duration.{Duration, DurationInt} import scala.concurrent.{Await, Future, blocking} import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.util.control.NonFatal import scala.xml.XML /** * DB code for the QEP's query instances and query results. * * @author david * @since 1/19/16 */ case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource,timeout:Duration) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) try { blocking { Await.result(future, timeout) } } catch { case tx:TimeoutException => throw CouldNotRunDbIoActionException(dataSource,tx) case NonFatal(x) => throw CouldNotRunDbIoActionException(dataSource,x) } } def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = { debug(s"insertQepQuery $runQueryRequest") insertQepQuery(QepQuery(runQueryRequest)) } def insertQepQuery(qepQuery: QepQuery):Unit = { dbRun(allQepQueryQuery += qepQuery) } def selectAllQepQueries:Seq[QepQuery] = { dbRun(mostRecentVisibleQepQueries.result) } def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = { val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain,request.fetchSize) val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set]) val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId))) ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2))) } def selectPreviousQueriesByUserAndDomain(userName: UserName, domain: String, limit:Int):Seq[QepQuery] = { dbRun(mostRecentVisibleQepQueries.filter(_.userName === userName).filter(_.userDomain === domain).sortBy(x => x.changeDate.desc).take(limit).result) } def renamePreviousQuery(request:RenameQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName,changeDate = System.currentTimeMillis())) } yield queryResults ) } def markDeleted(request:DeleteQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true,changeDate = System.currentTimeMillis())) } yield queryResults ) } def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(flagQueryRequest)) } def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(unflagQueryRequest)) } def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = { dbRun(allQepQueryFlags += qepQueryFlag) } def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = { val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result) flags.map(x => x.networkQueryId -> x).toMap } def insertQepResultRow(qepQueryRow:QueryResultRow) = { dbRun(allQueryResultRows += qepQueryRow) } def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = { val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node")) val queryResultRow = QueryResultRow(networkQueryId,result) val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_)) val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq] dbRun( for { _ <- allQueryResultRows += queryResultRow _ <- allBreakdownResultsRows ++= breakdowns _ <- allProblemDigestRows ++= problem } yield () ) } def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = { dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result) } def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = { val (queryResults, breakdowns,problems) = dbRun( for { queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result breakdowns <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result } yield (queryResults, breakdowns, problems) ) val resultIdsToI2b2ResultEnvelopes: Map[Long, Map[ResultOutputType, I2b2ResultEnvelope]] = breakdowns.groupBy(_.resultId).map(rIdToB => rIdToB._1 -> QepQueryBreakdownResultsRow.resultEnvelopesFrom(rIdToB._2)) def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = { if(problemSeq.size == 1) problemSeq.head.toProblemDigest else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq") } val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) ) queryResults.map(r => r.toQueryResult( resultIdsToI2b2ResultEnvelopes.getOrElse(r.resultId,Map.empty), adapterNodesToProblemDigests.get(r.adapterNode) )) } def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = { dbRun(allBreakdownResultsRows += breakdownResultsRow) } def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = { dbRun(allBreakdownResultsRows.result) } } object QepQueryDb extends Loggable { val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config) val timeout = QepQuerySchema.config.getInt("timeout") seconds val db = QepQueryDb(QepQuerySchema.schema,dataSource,timeout) val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) QepQueryDb.db.createTables() } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepQuerySchema(jdbcProfile: JdbcProfile,moreBreakdowns: Set[ResultOutputType]) extends Loggable { import jdbcProfile.api._ def ddlForAllTables: jdbcProfile.DDL = { allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema } //to get the schema, use the REPL //println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") { def networkId = column[NetworkQueryId]("networkId") def userName = column[UserName]("userName") def userDomain = column[String]("domain") def queryName = column[QueryName]("queryName") def expression = column[Option[String]]("expression") def dateCreated = column[Time]("dateCreated") def deleted = column[Boolean]("deleted") def queryXml = column[String]("queryXml") def changeDate = column[Long]("changeDate") def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply) } val allQepQueryQuery = TableQuery[QepQueries] val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for( queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists ) yield queries val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false) class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") { def networkId = column[NetworkQueryId]("networkId") def flagged = column[Boolean]("flagged") def flagMessage = column[String]("flagMessage") def changeDate = column[Long]("changeDate") def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply) } val allQepQueryFlags = TableQuery[QepQueryFlags] val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for( queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists ) yield queryFlags val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values ++ moreBreakdowns val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap) implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({ (resultType: ResultOutputType) => queryResultTypesToString(resultType) },{ (string: String) => stringsToQueryResultTypes(string) }) implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({ statusType => statusType.name },{ name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}")) }) class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") { def resultId = column[Long]("resultId") def networkQueryId = column[NetworkQueryId]("networkQueryId") def instanceId = column[Long]("instanceId") def adapterNode = column[String]("adapterNode") def resultType = column[Option[ResultOutputType]]("resultType") def size = column[Long]("size") def startDate = column[Option[Long]]("startDate") def endDate = column[Option[Long]]("endDate") def status = column[QueryResult.StatusType]("status") def statusMessage = column[Option[String]]("statusMessage") def changeDate = column[Long]("changeDate") def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply) } val allQueryResultRows = TableQuery[QepQueryResults] //Most recent query result rows for each queryId from each adapter val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for( queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists ) yield queryResultRows class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def resultId = column[Long]("resultId") def resultType = column[ResultOutputType]("resultType") def dataKey = column[String]("dataKey") def value = column[Long]("value") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply) } val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults] //Most recent query result rows for each queryId from each adapter val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for( breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists ) yield breakdownResultsRows /* case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller { */ class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def codec = column[String]("codec") def stamp = column[String]("stamp") def summary = column[String]("summary") def description = column[String]("description") def details = column[String]("details") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply) } val allProblemDigestRows = TableQuery[QepResultProblemDigests] val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for( problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists ) yield problemDigests } object QepQuerySchema { val allConfig:Config = QepConfigSource.config val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName) import net.shrine.config.{ConfigExtensions, Keys} val moreBreakdowns: Set[ResultOutputType] = config.getOptionConfigured("breakdownResultOutputTypes",ResultOutputTypes.fromConfig).getOrElse(Set.empty) val schema = QepQuerySchema(slickProfile,moreBreakdowns) } case class QepQuery( networkId:NetworkQueryId, userName: UserName, userDomain: String, queryName: QueryName, expression: Option[String], dateCreated: Time, deleted: Boolean, queryXml: String, changeDate: Time ){ def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = { QueryMaster( queryMasterId = networkId.toString, networkQueryId = networkId, name = queryName, userId = userName, groupId = userDomain, createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated), flagged = qepQueryFlag.map(_.flagged), flagMessage = qepQueryFlag.map(_.flagMessage) ) } } object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,Option[String],Time,Boolean,String,Time) => QepQuery) { def apply(runQueryRequest: RunQueryRequest):QepQuery = { new QepQuery( networkId = runQueryRequest.networkQueryId, userName = runQueryRequest.authn.username, userDomain = runQueryRequest.authn.domain, queryName = runQueryRequest.queryDefinition.name, expression = runQueryRequest.queryDefinition.expr.map(_.toString), dateCreated = System.currentTimeMillis(), deleted = false, queryXml = runQueryRequest.toXmlString, changeDate = System.currentTimeMillis() ) } } case class QepQueryFlag( networkQueryId: NetworkQueryId, flagged:Boolean, flagMessage:String, changeDate:Long ) object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) { def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = flagQueryRequest.networkQueryId, flagged = true, flagMessage = flagQueryRequest.message.getOrElse(""), changeDate = System.currentTimeMillis() ) } def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = unflagQueryRequest.networkQueryId, flagged = false, flagMessage = "", changeDate = System.currentTimeMillis() ) } } case class QueryResultRow( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:Option[ResultOutputType], size:Long, startDate:Option[Long], endDate:Option[Long], status:QueryResult.StatusType, statusMessage:Option[String], changeDate:Long ) { def toQueryResult(breakdowns:Map[ResultOutputType,I2b2ResultEnvelope],problemDigest:Option[ProblemDigest]) = QueryResult( resultId = resultId, instanceId = instanceId, resultType = resultType, setSize = size, startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar), endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar), description = Some(adapterNode), statusType = status, statusMessage = statusMessage, breakdowns = breakdowns, problemDigest = problemDigest ) } object QueryResultRow extends ((Long,NetworkQueryId,Long,String,Option[ResultOutputType],Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow) { def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = { new QueryResultRow( resultId = result.resultId, networkQueryId = networkQueryId, instanceId = result.instanceId, adapterNode = result.description.getOrElse(s"$result has None in its description field, not a name of an adapter node."), resultType = result.resultType, size = result.setSize, startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis), endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis), status = result.statusType, statusMessage = result.statusMessage, changeDate = System.currentTimeMillis() ) } } case class QepQueryBreakdownResultsRow( networkQueryId: NetworkQueryId, adapterNode:String, resultId:Long, resultType: ResultOutputType, dataKey:String, value:Long, changeDate:Long ) object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){ def breakdownRowsFor(networkQueryId:NetworkQueryId, adapterNode:String, resultId:Long, breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = { breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis())) } def resultEnvelopesFrom(breakdowns:Seq[QepQueryBreakdownResultsRow]): Map[ResultOutputType, I2b2ResultEnvelope] = { def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = { val data = breakdowns.map(b => b.dataKey -> b.value).toMap I2b2ResultEnvelope(resultType,data) } breakdowns.groupBy(_.resultType).map(r => r._1 -> resultEnvelopeFrom(r._1,r._2)) } } case class QepProblemDigestRow( networkQueryId: NetworkQueryId, adapterNode: String, codec: String, stampText: String, summary: String, description: String, details: String, changeDate:Long ){ def toProblemDigest = { ProblemDigest( codec, stampText, summary, description, if(!details.isEmpty) XML.loadString(details) else
, //TODO: FIGURE OUT HOW TO GET AN ACUTAL EPOCH INTO HERE 0 ) } } case class QepDatabaseProblem(x:Exception) extends AbstractProblem(ProblemSources.Qep){ override val summary = "A problem encountered while using a database." override val throwable = Some(x) override val description = x.getMessage - createAndLog } \ No newline at end of file