diff --git a/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala b/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala
index fe825dee0..5d956e957 100644
--- a/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala
+++ b/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala
@@ -1,133 +1,131 @@
package net.shrine.adapter.client
import java.net.{SocketTimeoutException, URL}
import org.xml.sax.SAXParseException
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.blocking
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.util.control.NonFatal
import scala.xml.{NodeSeq, XML}
import com.sun.jersey.api.client.ClientHandlerException
import net.shrine.client.{HttpResponse, Poster, TimeoutException}
import net.shrine.problem.{AbstractProblem, ProblemNotYetEncoded, ProblemSources}
import net.shrine.protocol.BroadcastMessage
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.NodeId
import net.shrine.protocol.Result
import scala.util.{Failure, Success, Try}
import net.shrine.protocol.ResultOutputType
/**
* @author clint
* @since Nov 15, 2013
*
*
*/
final class RemoteAdapterClient private (nodeId:NodeId,val poster: Poster, val breakdownTypes: Set[ResultOutputType]) extends AdapterClient {
import RemoteAdapterClient._
//NB: Overriding apply in the companion object screws up case-class code generation for some reason, so
//we add the would-have-been-generated methods here
override def toString = s"RemoteAdapterClient($poster)"
override def hashCode: Int = 31 * (if(poster == null) 1 else poster.hashCode)
override def equals(other: Any): Boolean = other match {
case that: RemoteAdapterClient if that != null => poster == that.poster
case _ => false
}
def url:Option[URL] = Some(new URL(poster.url))
//TODO: Revisit this
import scala.concurrent.ExecutionContext.Implicits.global
override def query(request: BroadcastMessage): Future[Result] = {
val requestXml = request.toXml
Future {
blocking {
val response: HttpResponse = poster.post(requestXml.toString())
interpretResponse(response)
}
}.recover {
case e if isTimeout(e) => throw new TimeoutException(s"Invoking adapter at ${poster.url} timed out", e)
}
}
def interpretResponse(response:HttpResponse):Result = {
if(response.statusCode <= 400){
val responseXml = response.body
import scala.concurrent.duration._
//Should we know the NodeID here? It would let us make a better error response.
Try(XML.loadString(responseXml)).flatMap(Result.fromXml(breakdownTypes)) match {
case Success(result) => result
case Failure(x) => {
val errorResponse = x match {
case sx: SAXParseException => ErrorResponse(CouldNotParseXmlFromAdapter(poster.url,response.statusCode,responseXml,sx))
case _ => ErrorResponse(ProblemNotYetEncoded(s"Couldn't understand response from adapter at '${poster.url}': $responseXml", x))
}
Result(nodeId, 0.milliseconds, errorResponse)
}
}
}
else {
Result(nodeId,0.milliseconds,ErrorResponse(HttpErrorCodeFromAdapter(poster.url,response.statusCode,response.body)))
}
}
}
object RemoteAdapterClient {
def apply(nodeId:NodeId,poster: Poster, breakdownTypes: Set[ResultOutputType]): RemoteAdapterClient = {
//NB: Replicate URL-munging that used to be performed by JerseyAdapterClient
val posterToUse = {
if(poster.url.endsWith("requests")) { poster }
else { poster.mapUrl(_ + "/requests") }
}
new RemoteAdapterClient(nodeId,posterToUse, breakdownTypes)
}
def isTimeout(e: Throwable): Boolean = e match {
case e: SocketTimeoutException => true
case e: ClientHandlerException => {
val cause = e.getCause
cause != null && cause.isInstanceOf[SocketTimeoutException]
}
case _ => false
}
}
case class HttpErrorCodeFromAdapter(url:String,statusCode:Int,responseBody:String) extends AbstractProblem(ProblemSources.Adapter) {
override def summary: String = "Hub received a fatal error response"
override def description: String = s"Hub received error code $statusCode from the adapter at $url"
override def detailsXml:NodeSeq = {s"Http response body was $responseBody"}
- createAndLog
}
case class CouldNotParseXmlFromAdapter(url:String,statusCode:Int,responseBody:String,saxx: SAXParseException) extends AbstractProblem(ProblemSources.Adapter) {
override def throwable = Some(saxx)
override def summary: String = s"Hub could not parse response from adapter"
override def description: String = s"Hub could not parse xml from $url due to ${saxx.toString}"
override def detailsXml:NodeSeq =
{s"Http response code was $statusCode and the body was $responseBody"}
{throwableDetail}
- createAndLog
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
index 5432cda0a..6e948f3e2 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/AbstractReadQueryResultAdapter.scala
@@ -1,301 +1,298 @@
package net.shrine.adapter
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import net.shrine.adapter.audit.AdapterAuditDb
import scala.Option.option2Iterable
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.xml.NodeSeq
import net.shrine.adapter.Obfuscator.obfuscateResults
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.dao.model.Breakdown
import net.shrine.adapter.dao.model.ShrineQueryResult
import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BroadcastMessage, ErrorResponse, HasQueryResults, HiveCredentials, QueryResult, ReadResultRequest, ReadResultResponse, ResultOutputType, ShrineRequest, ShrineResponse}
import net.shrine.protocol.query.QueryDefinition
import net.shrine.util.StackTrace
import net.shrine.util.Tries.sequence
import scala.concurrent.duration.Duration
import net.shrine.client.Poster
import net.shrine.problem.{AbstractProblem, ProblemSources}
/**
* @author clint
* @since Nov 2, 2012
*
*/
object AbstractReadQueryResultAdapter {
private final case class RawResponseAttempts(countResponseAttempt: Try[ReadResultResponse], breakdownResponseAttempts: Seq[Try[ReadResultResponse]])
private final case class SpecificResponseAttempts[R](responseAttempt: Try[R], breakdownResponseAttempts: Seq[Try[ReadResultResponse]])
}
//noinspection RedundantBlock
abstract class AbstractReadQueryResultAdapter[Req <: BaseShrineRequest, Rsp <: ShrineResponse with HasQueryResults](
poster: Poster,
override val hiveCredentials: HiveCredentials,
dao: AdapterDao,
doObfuscation: Boolean,
getQueryId: Req => Long,
getProjectId: Req => String,
toResponse: (Long, QueryResult) => Rsp,
breakdownTypes: Set[ResultOutputType],
collectAdapterAudit:Boolean
) extends WithHiveCredentialsAdapter(hiveCredentials) {
//TODO: Make this configurable
private val numThreads = math.max(5, Runtime.getRuntime.availableProcessors)
//TODO: Use scala.concurrent.ExecutionContext.Implicits.global instead?
private lazy val executorService = Executors.newFixedThreadPool(numThreads)
private lazy val executionContext = ExecutionContext.fromExecutorService(executorService)
override def shutdown() {
try {
executorService.shutdown()
executorService.awaitTermination(5, TimeUnit.SECONDS)
} finally {
executorService.shutdownNow()
super.shutdown()
}
}
import AbstractReadQueryResultAdapter._
override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = {
val req = message.request.asInstanceOf[Req]
val queryId = getQueryId(req)
def findShrineQueryRow = dao.findQueryByNetworkId(queryId)
def findShrineQueryResults = dao.findResultsFor(queryId)
findShrineQueryRow match {
case None => {
debug(s"Query $queryId not found in the Shrine DB")
ErrorResponse(QueryNotFound(queryId))
}
case Some(shrineQueryRow) => {
findShrineQueryResults match {
case None => {
debug(s"Query $queryId found but its results are not available")
//TODO: When precisely can this happen? Should we go back to the CRC here?
ErrorResponse(QueryResultNotAvailable(queryId))
}
case Some(shrineQueryResult) => {
if (shrineQueryResult.isDone) {
debug(s"Query $queryId is done and already stored, returning stored results")
makeResponseFrom(queryId, shrineQueryResult)
} else {
debug(s"Query $queryId is incomplete, asking CRC for results")
val result: ShrineResponse = retrieveQueryResults(queryId, req, shrineQueryResult, message)
if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(queryId,result)
result
}
}
}
}
}
}
private def makeResponseFrom(queryId: Long, shrineQueryResult: ShrineQueryResult): ShrineResponse = {
shrineQueryResult.toQueryResults(doObfuscation).map(toResponse(queryId, _)).getOrElse(ErrorResponse(QueryNotFound(queryId)))
}
private def retrieveQueryResults(queryId: Long, req: Req, shrineQueryResult: ShrineQueryResult, message: BroadcastMessage): ShrineResponse = {
//NB: If the requested query was not finished executing on the i2b2 side when Shrine recorded it, attempt to
//retrieve it and all its sub-components (breakdown results, if any) in parallel. Asking for the results in
//parallel is quite possibly too clever, but may be faster than asking for them serially.
//TODO: Review this.
//Make requests for results in parallel
val futureResponses = scatter(message.networkAuthn, req, shrineQueryResult)
//Gather all the results (block until they're all returned)
val SpecificResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = gather(queryId, futureResponses, req.waitTime)
countResponseAttempt match {
//If we successfully received the parent response (the one with query type PATIENT_COUNT_XML), re-store it along
//with any retrieved breakdowns before returning it.
case Success(countResponse) => {
//NB: Only store the result if needed, that is, if all results are done
//TODO: REVIEW THIS
storeResultIfNecessary(shrineQueryResult, countResponse, req.authn, queryId, getFailedBreakdownTypes(breakdownResponseAttempts))
countResponse
}
case Failure(e) => ErrorResponse(CouldNotRetrieveQueryFromCrc(queryId,e))
}
}
private def scatter(authn: AuthenticationInfo, req: Req, shrineQueryResult: ShrineQueryResult): Future[RawResponseAttempts] = {
def makeRequest(localResultId: Long) = ReadResultRequest(hiveCredentials.projectId, req.waitTime, hiveCredentials.toAuthenticationInfo, localResultId.toString)
def process(localResultId: Long): ShrineResponse = {
delegateResultRetrievingAdapter.process(authn, makeRequest(localResultId))
}
implicit val executionContext = this.executionContext
import scala.concurrent.blocking
def futureBlockingAttempt[T](f: => T): Future[Try[T]] = Future(blocking(Try(f)))
val futureCountAttempt: Future[Try[ShrineResponse]] = futureBlockingAttempt {
process(shrineQueryResult.count.localId)
}
val futureBreakdownAttempts = Future.sequence(for {
Breakdown(_, localResultId, resultType, data) <- shrineQueryResult.breakdowns
} yield futureBlockingAttempt {
process(localResultId)
})
//Log errors retrieving count
futureCountAttempt.collect {
case Success(e: ErrorResponse) => error(s"Error requesting count result from the CRC: '$e'")
case Failure(e) => error(s"Error requesting count result from the CRC: ", e)
}
//Log errors retrieving breakdown
for {
breakdownResponseAttempts <- futureBreakdownAttempts
} {
breakdownResponseAttempts.collect {
case Success(e: ErrorResponse) => error(s"Error requesting breakdown result from the CRC: '$e'")
case Failure(e) => error(s"Error requesting breakdown result from the CRC: ", e)
}
}
//"Filter" for non-ErrorResponses
val futureNonErrorCountAttempt: Future[Try[ReadResultResponse]] = futureCountAttempt.collect {
case Success(resp: ReadResultResponse) => Success(resp) //NB: Need to repackage response here to avoid ugly, obscure, superfluous cast
case unexpected => Failure(new Exception(s"Getting count result failed. Response is: '$unexpected'"))
}
//"Filter" for non-ErrorResponses
val futureNonErrorBreakdownResponseAttempts: Future[Seq[Try[ReadResultResponse]]] = for {
breakdownResponseAttempts <- futureBreakdownAttempts
} yield {
breakdownResponseAttempts.collect {
case Success(resp: ReadResultResponse) => Try(resp)
}
}
for {
countResponseAttempt <- futureNonErrorCountAttempt
breakdownResponseAttempts <- futureNonErrorBreakdownResponseAttempts
} yield {
RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts)
}
}
private def gather(queryId: Long, futureResponses: Future[RawResponseAttempts], waitTime: Duration): SpecificResponseAttempts[Rsp] = {
val RawResponseAttempts(countResponseAttempt, breakdownResponseAttempts) = Await.result(futureResponses, waitTime)
//Log any failures
(countResponseAttempt +: breakdownResponseAttempts).collect { case Failure(e) => e }.foreach(error("Error retrieving result from the CRC: ", _))
//NB: Count response and ALL breakdown responses must be available (not Failures) or else a Failure will be returned
val responseAttempt = for {
countResponse: ReadResultResponse <- countResponseAttempt
countQueryResult = countResponse.metadata
breakdownResponses: Seq[ReadResultResponse] <- sequence(breakdownResponseAttempts)
} yield {
val breakdownsByType = (for {
breakdownResponse <- breakdownResponses
resultType <- breakdownResponse.metadata.resultType
} yield resultType -> breakdownResponse.data).toMap
val queryResultWithBreakdowns = countQueryResult.withBreakdowns(breakdownsByType)
val queryResultToReturn = if(doObfuscation) Obfuscator.obfuscate(queryResultWithBreakdowns) else queryResultWithBreakdowns
toResponse(queryId, queryResultToReturn)
}
SpecificResponseAttempts(responseAttempt, breakdownResponseAttempts)
}
private def getFailedBreakdownTypes(attempts: Seq[Try[ReadResultResponse]]): Set[ResultOutputType] = {
val successfulBreakdownTypes = attempts.collect { case Success(ReadResultResponse(_, metadata, _)) => metadata.resultType }.flatten
breakdownTypes -- successfulBreakdownTypes
}
private def storeResultIfNecessary(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) {
val responseIsDone = response.results.forall(_.statusType.isDone)
if (responseIsDone) {
storeResult(shrineQueryResult, response, authn, queryId, failedBreakdownTypes)
}
}
private def storeResult(shrineQueryResult: ShrineQueryResult, response: Rsp, authn: AuthenticationInfo, queryId: Long, failedBreakdownTypes: Set[ResultOutputType]) {
val rawResults = response.results
val obfuscatedResults = obfuscateResults(doObfuscation)(response.results)
for {
shrineQuery <- dao.findQueryByNetworkId(queryId)
queryResult <- rawResults.headOption
obfuscatedQueryResult <- obfuscatedResults.headOption
} {
val queryDefinition = QueryDefinition(shrineQuery.name, shrineQuery.queryDefinition.expr)
dao.inTransaction {
dao.deleteQuery(queryId)
dao.storeResults(authn, shrineQueryResult.localId, queryId, queryDefinition, rawResults, obfuscatedResults, failedBreakdownTypes.toSeq, queryResult.breakdowns, obfuscatedQueryResult.breakdowns)
}
}
}
private type Unmarshaller[R] = Set[ResultOutputType] => NodeSeq => Try[R]
private final class DelegateAdapter[Rqst <: ShrineRequest, Rspns <: ShrineResponse](unmarshaller: Unmarshaller[Rspns]) extends CrcAdapter[Rqst, Rspns](poster, hiveCredentials) {
def process(authn: AuthenticationInfo, req: Rqst): Rspns = processRequest(BroadcastMessage(authn, req)).asInstanceOf[Rspns]
override protected def parseShrineResponse(xml: NodeSeq): ShrineResponse = unmarshaller(breakdownTypes)(xml).get //TODO: Avoid .get call
}
private lazy val delegateResultRetrievingAdapter = new DelegateAdapter[ReadResultRequest, ReadResultResponse](ReadResultResponse.fromI2b2 _)
}
case class QueryNotFound(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) {
override def summary: String = s"Query not found"
override def description:String = s"No query with id $queryId found on ${stamp.host.getHostName}"
- createAndLog
}
case class QueryResultNotAvailable(queryId:Long) extends AbstractProblem(ProblemSources.Adapter) {
override def summary: String = s"Query $queryId found but its results are not available yet"
override def description:String = s"Query $queryId found but its results are not available yet on ${stamp.host.getHostName}"
- createAndLog
}
case class CouldNotRetrieveQueryFromCrc(queryId:Long,x: Throwable) extends AbstractProblem(ProblemSources.Adapter) {
override def summary: String = s"Could not retrieve query $queryId from the CRC"
override def description:String = s"Unhandled exception while retrieving query $queryId while retrieving it from the CRC on ${stamp.host.getHostName}"
override def throwable = Some(x)
- createAndLog
}
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
index 3914fedc6..d6592184c 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
@@ -1,104 +1,100 @@
package net.shrine.adapter
import java.sql.SQLException
import java.util.Date
import net.shrine.adapter.dao.BotDetectedException
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, LoggingProblemHandler, Problem, ProblemNotYetEncoded, ProblemSources}
import net.shrine.protocol.{AuthenticationInfo, BaseShrineResponse, BroadcastMessage, ErrorResponse, ShrineRequest}
import scala.util.control.NonFatal
/**
* @author Bill Simons
* @since 4/8/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
abstract class Adapter extends Loggable {
//noinspection RedundantBlock
final def perform(message: BroadcastMessage): BaseShrineResponse = {
def problemToErrorResponse(problem:Problem):ErrorResponse = {
LoggingProblemHandler.handleProblem(problem)
ErrorResponse(problem)
}
val shrineResponse = try {
processRequest(message)
} catch {
case e: AdapterLockoutException => problemToErrorResponse(AdapterLockout(message.request.authn,e))
case e: BotDetectedException => problemToErrorResponse(BotDetected(e))
case e @ CrcInvocationException(invokedCrcUrl, request, cause) => problemToErrorResponse(CrcCouldNotBeInvoked(invokedCrcUrl,request,e))
case e: AdapterMappingException => problemToErrorResponse(AdapterMappingProblem(e))
case e: SQLException => problemToErrorResponse(AdapterDatabaseProblem(e))
case NonFatal(e) => {
val summary = if(message == null) "Unknown problem in Adapter.perform with null BroadcastMessage"
else s"Unexpected exception in Adapter"
problemToErrorResponse(ProblemNotYetEncoded(summary,e))
}
}
shrineResponse
}
protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse
//NOOP, may be overridden by subclasses
def shutdown(): Unit = ()
}
case class AdapterLockout(authn:AuthenticationInfo,x:AdapterLockoutException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
override val summary: String = s"User '${authn.domain}:${authn.username}' locked out."
override val description:String = s"User '${authn.domain}:${authn.username}' has run too many queries that produce the same result at ${x.url} ."
- createAndLog
}
case class CrcCouldNotBeInvoked(crcUrl:String,request:ShrineRequest,x:CrcInvocationException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
override val summary: String = s"Error communicating with I2B2 CRC."
override val description: String = s"Error invoking the CRC at '$crcUrl' with a ${request.getClass.getSimpleName} due to ${throwable.get}."
override val detailsXml =
Request is {request}
{throwableDetail.getOrElse("")}
- createAndLog
}
case class AdapterMappingProblem(x:AdapterMappingException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
override val summary: String = "Could not map query term(s)."
override val description = s"The Shrine Adapter on ${stamp.host.getHostName} cannot map this query to its local terms."
override val detailsXml =
Query Defitiontion is {x.runQueryRequest.queryDefinition}
RunQueryRequest is ${x.runQueryRequest.elideAuthenticationInfo}
{throwableDetail.getOrElse("")}
- createAndLog
}
case class AdapterDatabaseProblem(x:SQLException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
override val summary: String = "Problem using the Adapter database."
override val description = "The Shrine Adapter encountered a problem using a database."
- createAndLog
}
case class BotDetected(bdx:BotDetectedException) extends AbstractProblem(ProblemSources.Adapter) {
override val summary: String = s"A user has run enough queries in a short period of time the adapter suspects a bot."
override val description: String = s"${bdx.domain}:${bdx.username} has run ${bdx.detectedCount} queries since ${new Date(bdx.sinceMs)}, more than the limit of ${bdx.limit} allowed in this time frame."
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala
index 1d6175726..df6ef54f2 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala
@@ -1,110 +1,108 @@
package net.shrine.adapter
import org.xml.sax.SAXParseException
import scala.xml.NodeSeq
import scala.xml.XML
import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, BroadcastMessage, Credential, ErrorResponse, HiveCredentials, ShrineRequest, ShrineResponse, TranslatableRequest}
import net.shrine.util.XmlDateHelper
import net.shrine.client.Poster
import net.shrine.problem.{AbstractProblem, ProblemSources}
import scala.util.Try
import scala.util.control.NonFatal
/**
* @author Bill Simons
* @since 4/11/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
abstract class CrcAdapter[T <: ShrineRequest, V <: ShrineResponse](
poster: Poster,
override protected val hiveCredentials: HiveCredentials) extends WithHiveCredentialsAdapter(hiveCredentials) {
protected def parseShrineResponse(nodeSeq: NodeSeq): ShrineResponse
private[adapter] def parseShrineErrorResponseWithFallback(xmlResponseFromCrc: String): ShrineResponse = {
//NB: See https://open.med.harvard.edu/jira/browse/SHRINE-534
//NB: https://open.med.harvard.edu/jira/browse/SHRINE-745
val shrineResponseAttempt = for {
crcXml <- Try(XML.loadString(xmlResponseFromCrc))
shrineResponse <- Try(parseShrineResponse(crcXml)).recover { case NonFatal(e) =>
info(s"Exception while parsing $crcXml",e)
ErrorResponse.fromI2b2(crcXml)
} //todo pass the exception to build a proper error response, and log the exception
} yield shrineResponse
shrineResponseAttempt.recover {
case saxx:SAXParseException => ErrorResponse(CannotParseXmlFromCrc(saxx,xmlResponseFromCrc))
case NonFatal(e) =>
error(s"Error parsing response from CRC: ", e)
ErrorResponse(ExceptionWhileLoadingCrcResponse(e,xmlResponseFromCrc))
}.get
}
//NB: default is a noop; only RunQueryAdapter needs this for now
protected[adapter] def translateNetworkToLocal(request: T): T = request
protected[adapter] override def processRequest(message: BroadcastMessage): BaseShrineResponse = {
val i2b2Response = callCrc(translateRequest(message.request))
parseShrineErrorResponseWithFallback(i2b2Response)
}
protected def callCrc(request: ShrineRequest): String = {
debug(s"Sending Shrine-formatted request to the CRC at '${poster.url}': $request")
val crcRequest = request.toI2b2String
val crcResponse = XmlDateHelper.time(s"Calling the CRC at '${poster.url}'")(debug(_)) {
//Wrap exceptions in a more descriptive form, to enable sending better error messages back to the legacy web client
try { poster.post(crcRequest) }
catch {
case NonFatal(e) => throw CrcInvocationException(poster.url, request, e)
}
}
crcResponse.body
}
private[adapter] def translateRequest(request: BaseShrineRequest): ShrineRequest = request match {
case transReq: TranslatableRequest[T] => //noinspection RedundantBlock
{
val HiveCredentials(domain, username, password, project) = hiveCredentials
val authInfo = AuthenticationInfo(domain, username, Credential(password, isToken = false))
translateNetworkToLocal(transReq.withAuthn(authInfo).withProject(project).asRequest)
}
case req: ShrineRequest => req
case _ => throw new IllegalArgumentException(s"Unexpected request: $request")
}
}
case class CannotParseXmlFromCrc(saxx:SAXParseException,xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(saxx)
override val summary: String = "Could not parse response from CRC."
override val description:String = s"${saxx.getMessage} while parsing the response from the CRC."
override val detailsXml =
{throwableDetail.getOrElse("")}
Response is {xmlResponseFromCrc}
- createAndLog
}
case class ExceptionWhileLoadingCrcResponse(t:Throwable,xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(t)
override val summary: String = "Unanticipated exception with response from CRC."
override val description:String = s"${t.getMessage} while parsing the response from the CRC."
override val detailsXml =
{throwableDetail.getOrElse("")}
Response is {xmlResponseFromCrc}
- createAndLog
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala
index 9dd269990..21ad6753e 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala
@@ -1,291 +1,289 @@
package net.shrine.adapter
import net.shrine.adapter.audit.AdapterAuditDb
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.xml.NodeSeq
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.translators.QueryDefinitionTranslator
import net.shrine.protocol.{AuthenticationInfo, BroadcastMessage, Credential, ErrorFromCrcException, ErrorResponse, HiveCredentials, I2b2ResultEnvelope, MissingCrCXmlResultException, QueryResult, RawCrcRunQueryResponse, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse, ShrineResponse}
import net.shrine.client.Poster
import net.shrine.problem.{AbstractProblem, LoggingProblemHandler, Problem, ProblemNotYetEncoded, ProblemSources}
import scala.util.control.NonFatal
import net.shrine.util.XmlDateHelper
import scala.concurrent.duration.Duration
import scala.xml.XML
/**
* @author Bill Simons
* @author clint
* @since 4/15/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
final case class RunQueryAdapter(
poster: Poster,
dao: AdapterDao,
override val hiveCredentials: HiveCredentials,
conceptTranslator: QueryDefinitionTranslator,
adapterLockoutAttemptsThreshold: Int,
doObfuscation: Boolean,
runQueriesImmediately: Boolean,
breakdownTypes: Set[ResultOutputType],
collectAdapterAudit:Boolean,
botCountTimeThresholds:Map[Long,Duration]
) extends CrcAdapter[RunQueryRequest, RunQueryResponse](poster, hiveCredentials) {
logStartup()
import RunQueryAdapter._
override protected[adapter] def parseShrineResponse(xml: NodeSeq) = RawCrcRunQueryResponse.fromI2b2(breakdownTypes)(xml).get //TODO: Avoid .get call
override protected[adapter] def translateNetworkToLocal(request: RunQueryRequest): RunQueryRequest = {
try { request.mapQueryDefinition(conceptTranslator.translate) }
catch {
case NonFatal(e) => throw new AdapterMappingException(request,s"Error mapping query terms from network to local forms.", e)
}
}
override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = {
if (collectAdapterAudit) AdapterAuditDb.db.insertQueryReceived(message)
if (isLockedOut(message.networkAuthn)) {
throw new AdapterLockoutException(message.networkAuthn,poster.url)
}
dao.checkIfBot(message.networkAuthn,botCountTimeThresholds)
val runQueryReq = message.request.asInstanceOf[RunQueryRequest]
//We need to use the network identity from the BroadcastMessage, since that will have the network username
//(ie, ecommons) of the querying user. Using the AuthenticationInfo from the incoming request breaks the fetching
//of previous queries on deployed systems where the credentials in the identity param to this method and the authn
//field of the incoming request are different, like the HMS Shrine deployment.
//NB: Credential field is wiped out to preserve old behavior -Clint 14 Nov, 2013
val authnToUse = message.networkAuthn.copy(credential = Credential("", isToken = false))
if (!runQueriesImmediately) {
debug(s"Queueing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}")
storeQuery(authnToUse, message, runQueryReq)
} else {
debug(s"Performing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}")
val result: ShrineResponse = runQuery(authnToUse, message.copy(request = runQueryReq.withAuthn(authnToUse)), runQueryReq.withAuthn(authnToUse))
if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(runQueryReq.networkQueryId,result)
result
}
}
private def storeQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): RunQueryResponse = {
//Use dummy ids for what we would have received from the CRC
val masterId: Long = -1L
val queryInstanceId: Long = -1L
val resultId: Long = -1L
//TODO: is this right?? Or maybe it's project id?
val groupId = authnToUse.domain
val invalidSetSize = -1L
val now = XmlDateHelper.now
val queryResult = QueryResult(resultId, queryInstanceId, Some(ResultOutputType.PATIENT_COUNT_XML), invalidSetSize, Some(now), Some(now), Some("Query enqueued for later processing"), QueryResult.StatusType.Held, Some("Query enqueued for later processing"))
dao.inTransaction {
val insertedQueryId = dao.insertQuery(masterId.toString, request.networkQueryId, authnToUse, request.queryDefinition, isFlagged = false, hasBeenRun = false, flagMessage = None)
val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(queryResult))
//NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in
//AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete
//queries, will work.
val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head
dao.insertCountResult(countQueryResultId, -1L, -1L)
}
RunQueryResponse(masterId, XmlDateHelper.now, authnToUse.username, groupId, request.queryDefinition, queryInstanceId, queryResult)
}
private def runQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): ShrineResponse = {
if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionStarted(request)
//NB: Pass through ErrorResponses received from the CRC.
//See: https://open.med.harvard.edu/jira/browse/SHRINE-794
val result = super.processRequest(message) match {
case e: ErrorResponse => e
case rawRunQueryResponse: RawCrcRunQueryResponse => processRawCrcRunQueryResponse(authnToUse, request, rawRunQueryResponse)
}
if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionCompletedShrineResponse(request,result)
result
}
private[adapter] def processRawCrcRunQueryResponse(authnToUse: AuthenticationInfo, request: RunQueryRequest, rawRunQueryResponse: RawCrcRunQueryResponse): RunQueryResponse = {
def isBreakdown(result: QueryResult) = result.resultType.exists(_.isBreakdown)
val originalResults: Seq[QueryResult] = rawRunQueryResponse.results
val (originalBreakdownResults, originalNonBreakDownResults): (Seq[QueryResult],Seq[QueryResult]) = originalResults.partition(isBreakdown)
val originalBreakdownCountAttempts: Seq[(QueryResult, Try[QueryResult])] = attemptToRetrieveBreakdowns(request, originalBreakdownResults)
val (successfulBreakdownCountAttempts, failedBreakdownCountAttempts) = originalBreakdownCountAttempts.partition { case (_, t) => t.isSuccess }
val failedBreakdownCountAttemptsWithProblems = failedBreakdownCountAttempts.map { attempt =>
val originalResult: QueryResult = attempt._1
val queryResult:QueryResult = if (originalResult.problemDigest.isDefined) originalResult
else {
attempt._2 match {
case Success(_) => originalResult
case Failure(x) => //noinspection RedundantBlock
{
val problem:Problem = x match {
case e: ErrorFromCrcException => ErrorFromCrcBreakdown(e)
case e: MissingCrCXmlResultException => CannotInterpretCrcBreakdownXml(e)
case NonFatal(e) => {
val summary = s"Unexpected exception while interpreting breakdown response"
ProblemNotYetEncoded(summary, e)
}
}
LoggingProblemHandler.handleProblem(problem)
originalResult.copy(problemDigest = Some(problem.toDigest))
}
}
}
(queryResult,attempt._2)
}
logBreakdownFailures(rawRunQueryResponse, failedBreakdownCountAttemptsWithProblems)
val originalMergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope] = {
val withBreakdownCounts = successfulBreakdownCountAttempts.collect { case (_, Success(queryResultWithBreakdowns)) => queryResultWithBreakdowns }
withBreakdownCounts.map(_.breakdowns).fold(Map.empty)(_ ++ _)
}
val obfuscatedQueryResults = originalResults.map(Obfuscator.obfuscate)
val obfuscatedNonBreakdownQueryResults = obfuscatedQueryResults.filterNot(isBreakdown)
val obfuscatedMergedBreakdowns = obfuscateBreakdowns(originalMergedBreakdowns)
val failedBreakdownTypes = failedBreakdownCountAttemptsWithProblems.flatMap { case (qr, _) => qr.resultType }
dao.storeResults(
authn = authnToUse,
masterId = rawRunQueryResponse.queryId.toString,
networkQueryId = request.networkQueryId,
queryDefinition = request.queryDefinition,
rawQueryResults = originalResults,
obfuscatedQueryResults = obfuscatedQueryResults,
failedBreakdownTypes = failedBreakdownTypes,
mergedBreakdowns = originalMergedBreakdowns,
obfuscatedBreakdowns = obfuscatedMergedBreakdowns)
// at this point the queryResult could be a mix of successes and failures.
// SHRINE reports only the successes. See SHRINE-1567 for details
val queryResults: Seq[QueryResult] = if (doObfuscation) obfuscatedNonBreakdownQueryResults else originalNonBreakDownResults
val breakdownsToReturn: Map[ResultOutputType, I2b2ResultEnvelope] = if (doObfuscation) obfuscatedMergedBreakdowns else originalMergedBreakdowns
//TODO: Will fail in the case of NO non-breakdown QueryResults. Can this ever happen, and is it worth protecting against here?
//can failedBreakdownCountAttempts be mixed back in here?
val resultWithBreakdowns: QueryResult = queryResults.head.withBreakdowns(breakdownsToReturn)
if(debugEnabled) {
def justBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = breakdowns.mapValues(_.data)
val obfuscationMessage = s"obfuscation is ${if(doObfuscation) "ON" else "OFF"}"
debug(s"Returning QueryResult with count ${resultWithBreakdowns.setSize} (original count: ${originalNonBreakDownResults.headOption.map(_.setSize)} ; $obfuscationMessage)")
debug(s"Returning QueryResult with breakdowns ${justBreakdowns(resultWithBreakdowns.breakdowns)} (original breakdowns: ${justBreakdowns(originalMergedBreakdowns)} ; $obfuscationMessage)")
debug(s"Full QueryResult: $resultWithBreakdowns")
}
//if any results had problems, this commented out code can turn it into an error QueryResult
//See SHRINE-1619
//val problem: Option[ProblemDigest] = failedBreakdownCountAttemptsWithProblems.headOption.flatMap(x => x._1.problemDigest)
//val queryResult = problem.fold(resultWithBreakdowns)(pd => QueryResult.errorResult(Some(pd.description),"Error with CRC",pd))
rawRunQueryResponse.toRunQueryResponse.withResult(resultWithBreakdowns)
}
private def getResultFromCrc(parentRequest: RunQueryRequest, networkResultId: Long): Try[ReadResultResponse] = {
def readResultRequest(runQueryReq: RunQueryRequest, networkResultId: Long) = ReadResultRequest(hiveCredentials.projectId, runQueryReq.waitTime, hiveCredentials.toAuthenticationInfo, networkResultId.toString)
Try(XML.loadString(callCrc(readResultRequest(parentRequest, networkResultId)))).flatMap(ReadResultResponse.fromI2b2(breakdownTypes))
}
private[adapter] def attemptToRetrieveCount(runQueryReq: RunQueryRequest, originalCountQueryResult: QueryResult): (QueryResult, Try[QueryResult]) = {
originalCountQueryResult -> (for {
countData <- getResultFromCrc(runQueryReq, originalCountQueryResult.resultId)
} yield originalCountQueryResult.withSetSize(countData.metadata.setSize))
}
private[adapter] def attemptToRetrieveBreakdowns(runQueryReq: RunQueryRequest, breakdownResults: Seq[QueryResult]): Seq[(QueryResult, Try[QueryResult])] = {
breakdownResults.map { origBreakdownResult =>
origBreakdownResult -> (for {
breakdownData <- getResultFromCrc(runQueryReq, origBreakdownResult.resultId).map(_.data)
} yield origBreakdownResult.withBreakdown(breakdownData))
}
}
private[adapter] def logBreakdownFailures(response: RawCrcRunQueryResponse,
failures: Seq[(QueryResult, Try[QueryResult])]) {
for {
(origQueryResult, Failure(e)) <- failures
} {
error(s"Couldn't load breakdown for QueryResult with masterId: ${response.queryId}, instanceId: ${origQueryResult.instanceId}, resultId: ${origQueryResult.resultId}. Asked for result type: ${origQueryResult.resultType}", e)
}
}
private def isLockedOut(authn: AuthenticationInfo): Boolean = {
adapterLockoutAttemptsThreshold match {
case 0 => false
case _ => dao.isUserLockedOut(authn, adapterLockoutAttemptsThreshold)
}
}
private def logStartup(): Unit = {
val message = {
if (runQueriesImmediately) { s"${getClass.getSimpleName} will run queries immediately" }
else { s"${getClass.getSimpleName} will queue queries for later execution" }
}
info(message)
}
}
object RunQueryAdapter {
private[adapter] def obfuscateBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Map[ResultOutputType, I2b2ResultEnvelope] = {
breakdowns.mapValues(_.mapValues(Obfuscator.obfuscate))
}
}
case class ErrorFromCrcBreakdown(x:ErrorFromCrcException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
override val summary: String = "The CRC reported an error."
override val description = "The CRC reported an internal error."
- createAndLog
}
case class CannotInterpretCrcBreakdownXml(x:MissingCrCXmlResultException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
override val summary: String = "SHRINE cannot interpret the CRC response."
override val description = "The CRC responded, but SHRINE could not interpret that response."
- createAndLog
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala
index b678794d7..9e3069947 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala
@@ -1,40 +1,39 @@
package net.shrine.adapter.components
import net.shrine.adapter.dao.AdapterDao
import net.shrine.problem.{AbstractProblem, ProblemSources}
import net.shrine.protocol.ShrineResponse
import net.shrine.protocol.ReadQueryDefinitionRequest
import net.shrine.protocol.ReadQueryDefinitionResponse
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.query.QueryDefinition
import net.shrine.protocol.AbstractReadQueryDefinitionRequest
/**
* @author clint
* @since Apr 4, 2013
*
* NB: Tested by ReadQueryDefinitionAdapterTest
*/
final case class QueryDefinitions[Req <: AbstractReadQueryDefinitionRequest](dao: AdapterDao) {
def get(request: Req): ShrineResponse = {
val resultOption = for {
shrineQuery <- dao.findQueryByNetworkId(request.queryId)
} yield {
ReadQueryDefinitionResponse(
shrineQuery.networkId,
shrineQuery.name,
shrineQuery.username,
shrineQuery.dateCreated,
//TODO: I2b2 or Shrine format?
shrineQuery.queryDefinition.toI2b2String)
}
resultOption.getOrElse(ErrorResponse(QueryNotInDatabase(request)))
}
}
case class QueryNotInDatabase(request:AbstractReadQueryDefinitionRequest) extends AbstractProblem(ProblemSources.Hub) {
override val summary: String = s"Couldn't find query definition."
override val description:String = s"The query definition with network id: ${request.queryId} does not exist at this site."
- createAndLog
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala
index 9c36a5bf8..f4cbb80b9 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala
@@ -1,490 +1,489 @@
package net.shrine.adapter.dao.squeryl
import java.sql.Timestamp
import javax.xml.datatype.XMLGregorianCalendar
import net.shrine.adapter.dao.{AdapterDao, BotDetectedException}
import net.shrine.adapter.dao.model.{ObfuscatedPair, ShrineQuery, ShrineQueryResult}
import net.shrine.adapter.dao.model.squeryl.{SquerylBreakdownResultRow, SquerylCountRow, SquerylPrivilegedUser, SquerylQueryResultRow, SquerylShrineError, SquerylShrineQuery}
import net.shrine.adapter.dao.squeryl.tables.Tables
import net.shrine.dao.DateHelpers
import net.shrine.dao.squeryl.{SquerylEntryPoint, SquerylInitializer}
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemSources}
import net.shrine.protocol.{AuthenticationInfo, I2b2ResultEnvelope, QueryResult, ResultOutputType}
import net.shrine.protocol.query.QueryDefinition
import net.shrine.util.XmlDateHelper
import org.squeryl.Query
import org.squeryl.dsl.{GroupWithMeasures, Measures}
import scala.concurrent.duration.Duration
import scala.util.Try
import scala.xml.NodeSeq
/**
* @author clint
* @since May 22, 2013
*/
final class SquerylAdapterDao(initializer: SquerylInitializer, tables: Tables)(implicit breakdownTypes: Set[ResultOutputType]) extends AdapterDao with Loggable {
initializer.init()
override def inTransaction[T](f: => T): T = SquerylEntryPoint.inTransaction { f }
import SquerylEntryPoint._
override def flagQuery(networkQueryId: Long, flagMessage: Option[String]): Unit = mutateFlagField(networkQueryId, newIsFlagged = true, flagMessage)
override def unFlagQuery(networkQueryId: Long): Unit = mutateFlagField(networkQueryId, newIsFlagged = false, None)
private def mutateFlagField(networkQueryId: Long, newIsFlagged: Boolean, newFlagMessage: Option[String]): Unit = {
inTransaction {
update(tables.shrineQueries) { queryRow =>
where(queryRow.networkId === networkQueryId).
set(queryRow.isFlagged := newIsFlagged, queryRow.flagMessage := newFlagMessage)
}
}
}
override def storeResults(
authn: AuthenticationInfo,
masterId: String,
networkQueryId: Long,
queryDefinition: QueryDefinition,
rawQueryResults: Seq[QueryResult],
obfuscatedQueryResults: Seq[QueryResult],
failedBreakdownTypes: Seq[ResultOutputType],
mergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope],
obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Unit = {
inTransaction {
val insertedQueryId = insertQuery(masterId,
networkQueryId,
authn,
queryDefinition,
isFlagged = false,
hasBeenRun = true,
flagMessage = None)
val insertedQueryResultIds = insertQueryResults(insertedQueryId, rawQueryResults)
storeCountResults(rawQueryResults, obfuscatedQueryResults, insertedQueryResultIds)
storeErrorResults(rawQueryResults, insertedQueryResultIds)
storeBreakdownFailures(failedBreakdownTypes.toSet, insertedQueryResultIds)
insertBreakdownResults(insertedQueryResultIds, mergedBreakdowns, obfuscatedBreakdowns)
}
}
private[adapter] def storeCountResults(raw: Seq[QueryResult], obfuscated: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = {
val notErrors = raw.filter(!_.isError)
val obfuscatedNotErrors = obfuscated.filter(!_.isError)
if(notErrors.size > 1) {
warn(s"Got ${notErrors.size} raw (hopefully-)count results; more than 1 is unusual.")
}
if(obfuscatedNotErrors.size > 1) {
warn(s"Got ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; more than 1 is unusual.")
}
if(notErrors.size != obfuscatedNotErrors.size) {
warn(s"Got ${notErrors.size} raw and ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; that these numbers are different is unusual.")
}
import ResultOutputType.PATIENT_COUNT_XML
def isCount(qr: QueryResult): Boolean = qr.resultType.contains(PATIENT_COUNT_XML)
inTransaction {
//NB: Take the count/setSize from the FIRST PATIENT_COUNT_XML QueryResult,
//though the same count should be there for all of them, if there are more than one
for {
Seq(insertedCountQueryResultId) <- insertedIds.get(PATIENT_COUNT_XML)
notError <- notErrors.find(isCount) //NB: Find a count result, just to be sure
obfuscatedNotError <- obfuscatedNotErrors.find(isCount) //NB: Find a count result, just to be sure
} {
insertCountResult(insertedCountQueryResultId, notError.setSize, obfuscatedNotError.setSize)
}
}
}
private[adapter] def storeErrorResults(results: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = {
val errors = results.filter(_.isError)
val insertedErrorResultIds = insertedIds.getOrElse(ResultOutputType.ERROR,Nil)
val insertedIdsToErrors = insertedErrorResultIds zip errors
inTransaction {
for {
(insertedErrorResultId, errorQueryResult) <- insertedIdsToErrors
} {
val pd = errorQueryResult.problemDigest.get //it's an error so it will have a problem digest
insertErrorResult(
insertedErrorResultId,
errorQueryResult.statusMessage.getOrElse("Unknown failure"),
pd.codec,
pd.stampText,
pd.summary,
pd.description,
pd.detailsXml
)
}
}
}
private[adapter] def storeBreakdownFailures(failedBreakdownTypes: Set[ResultOutputType], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = {
val insertedIdsForFailedBreakdownTypes = insertedIds.filterKeys(failedBreakdownTypes.contains)
inTransaction {
for {
(failedBreakdownType, Seq(resultId)) <- insertedIdsForFailedBreakdownTypes
} {
//todo propagate backwards to the breakdown failure to create the corect problem
object BreakdownFailure extends AbstractProblem(ProblemSources.Adapter) {
override val summary: String = "Couldn't retrieve result breakdown"
override val description:String = s"Couldn't retrieve result breakdown of type '$failedBreakdownType'"
- createAndLog
}
val pd = BreakdownFailure.toDigest
insertErrorResult(
resultId,
s"Couldn't retrieve breakdown of type '$failedBreakdownType'",
pd.codec,
pd.stampText,
pd.summary,
pd.description,
pd.detailsXml
)
}
}
}
override def findRecentQueries(howMany: Int): Seq[ShrineQuery] = {
inTransaction {
Queries.queriesForAllUsers.take(howMany).map(_.toShrineQuery).toSeq
}
}
def findAllCounts():Seq[SquerylCountRow] = {
inTransaction{
Queries.allCountResults.toSeq
}
}
override def renameQuery(networkQueryId: Long, newName: String) {
inTransaction {
update(tables.shrineQueries) { queryRow =>
where(queryRow.networkId === networkQueryId).
set(queryRow.name := newName)
}
}
}
override def deleteQuery(networkQueryId: Long): Unit = {
inTransaction {
tables.shrineQueries.deleteWhere(_.networkId === networkQueryId)
}
}
override def deleteQueryResultsFor(networkQueryId: Long): Unit = {
inTransaction {
val resultIdsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) =>
where(queryRow.networkId === networkQueryId).
select(resultRow.id).
on(queryRow.id === resultRow.queryId)
}.toSet
tables.queryResults.deleteWhere(_.id in resultIdsForNetworkQueryId)
}
}
override def isUserLockedOut(authn: AuthenticationInfo, defaultThreshold: Int): Boolean = Try {
inTransaction {
val privilegedUserOption = Queries.privilegedUsers(authn.domain, authn.username).singleOption
val threshold:Int = privilegedUserOption.flatMap(_.threshold).getOrElse(defaultThreshold.intValue)
val thirtyDaysInThePast: XMLGregorianCalendar = DateHelpers.daysFromNow(-30)
val overrideDate: XMLGregorianCalendar = privilegedUserOption.map(_.toPrivilegedUser).flatMap(_.overrideDate).getOrElse(thirtyDaysInThePast)
//sorted instead of just finding max
val counts: Seq[Long] = Queries.repeatedResults(authn.domain, authn.username, overrideDate).toSeq.sorted
//and then grabbing the last, highest value in the sorted sequence
val repeatedResultCount: Long = counts.lastOption.getOrElse(0L)
val result = repeatedResultCount > threshold
debug(s"User ${authn.domain}:${authn.username} locked out? $result")
result
}
}.getOrElse(false)
override def checkIfBot(authn:AuthenticationInfo, botTimeThresholds:Map[Long,Duration]): Unit = {
val now = System.currentTimeMillis()
botTimeThresholds.foreach{countDuration => inTransaction {
val sinceMs: Long = now - countDuration._2.toMillis
val query: Query[Measures[Long]] = Queries.countQueriesForUserSince(authn.domain, authn.username, sinceMs)
val queriesSince = query.headOption.map(_.measures).getOrElse(0L)
if (queriesSince > countDuration._1) throw new BotDetectedException(domain = authn.domain,
username = authn.username,
detectedCount = queriesSince,
sinceMs = sinceMs,
limit = countDuration._1)
}}
}
override def insertQuery(localMasterId: String,
networkId: Long,
authn: AuthenticationInfo,
queryDefinition: QueryDefinition,
isFlagged: Boolean,
hasBeenRun: Boolean,
flagMessage: Option[String]): Int = {
inTransaction {
val inserted = tables.shrineQueries.insert(new SquerylShrineQuery(
0,
localMasterId,
networkId,
authn.username,
authn.domain,
XmlDateHelper.now,
isFlagged,
flagMessage,
hasBeenRun,
queryDefinition))
inserted.id
}
}
/**
* Insert rows into QueryResults, one for each QueryResult in the passed RunQueryResponse
* Inserted rows are 'children' of the passed ShrineQuery (ie, they are the results of the query)
*/
override def insertQueryResults(parentQueryId: Int, results: Seq[QueryResult]): Map[ResultOutputType, Seq[Int]] = {
def execTime(result: QueryResult): Option[Long] = {
//TODO: How are locales handled here? Do we care?
def toMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis
for {
start <- result.startDate
end <- result.endDate
} yield toMillis(end) - toMillis(start)
}
val typeToIdTuples = inTransaction {
for {
result <- results
resultType = result.resultType.getOrElse(ResultOutputType.ERROR)
//TODO: under what circumstances can QueryResults NOT have start and end dates set?
elapsed = execTime(result)
} yield {
val lastInsertedQueryResultRow = tables.queryResults.insert(new SquerylQueryResultRow(0, result.resultId, parentQueryId, resultType, result.statusType, elapsed, XmlDateHelper.now))
(resultType, lastInsertedQueryResultRow.id)
}
}
typeToIdTuples.groupBy { case (resultType, _) => resultType }.mapValues(_.map { case (_, count) => count })
}
override def insertCountResult(resultId: Int, originalCount: Long, obfuscatedCount: Long) {
//NB: Squeryl steers us toward inserting with dummy ids :(
inTransaction {
tables.countResults.insert(new SquerylCountRow(0, resultId, originalCount, obfuscatedCount, XmlDateHelper.now))
}
}
override def insertBreakdownResults(parentResultIds: Map[ResultOutputType, Seq[Int]], originalBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) {
def merge(original: I2b2ResultEnvelope, obfuscated: I2b2ResultEnvelope): Map[String, ObfuscatedPair] = {
Map.empty ++ (for {
(key, originalValue) <- original.data
obfuscatedValue <- obfuscated.data.get(key)
} yield (key, ObfuscatedPair(originalValue, obfuscatedValue)))
}
inTransaction {
for {
(resultType, Seq(resultId)) <- parentResultIds
if resultType.isBreakdown
originalBreakdown <- originalBreakdowns.get(resultType)
obfuscatedBreakdown <- obfuscatedBreakdowns.get(resultType)
(key, ObfuscatedPair(original, obfuscated)) <- merge(originalBreakdown, obfuscatedBreakdown)
} {
tables.breakdownResults.insert(SquerylBreakdownResultRow(0, resultId, key, original, obfuscated))
}
}
}
override def insertErrorResult(parentResultId: Int, errorMessage: String, codec:String, stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq) {
//NB: Squeryl steers us toward inserting with dummy ids :(
inTransaction {
tables.errorResults.insert(SquerylShrineError(0, parentResultId, errorMessage, codec, stampText, summary, digestDescription, detailsXml.toString()))
}
}
override def findQueryByNetworkId(networkQueryId: Long): Option[ShrineQuery] = {
inTransaction {
Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery)
}
}
override def findQueriesByUserAndDomain(domain: String, username: String, howMany: Int): Seq[ShrineQuery] = {
inTransaction {
Queries.queriesForUser(username, domain).take(howMany).toSeq.map(_.toShrineQuery)
}
}
override def findQueriesByDomain(domain: String): Seq[ShrineQuery] = {
inTransaction {
Queries.queriesForDomain(domain).toList.map(_.toShrineQuery)
}
}
override def findResultsFor(networkQueryId: Long): Option[ShrineQueryResult] = {
inTransaction {
val breakdownRowsByType = Queries.breakdownResults(networkQueryId).toSeq.groupBy { case (outputType, _) => outputType.toQueryResultRow.resultType }.mapValues(_.map { case (_, row) => row.toBreakdownResultRow })
val queryRowOption = Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery)
val countRowOption = Queries.countResults(networkQueryId).headOption.map(_.toCountRow)
val queryResultRows = Queries.resultsForQuery(networkQueryId).toSeq.map(_.toQueryResultRow)
val errorResultRows = Queries.errorResults(networkQueryId).toSeq.map(_.toShrineError)
for {
queryRow <- queryRowOption
countRow <- countRowOption
shrineQueryResult <- ShrineQueryResult.fromRows(queryRow, queryResultRows, countRow, breakdownRowsByType, errorResultRows)
} yield {
shrineQueryResult
}
}
}
/**
* @author clint
* @since Nov 19, 2012
*/
object Queries {
def privilegedUsers(domain: String, username: String): Query[SquerylPrivilegedUser] = {
from(tables.privilegedUsers) { user =>
where(user.username === username and user.domain === domain).select(user)
}
}
def countQueriesForUserSince(domain:String, username:String, sinceMs:Long): Query[Measures[Long]] = {
val since = new Timestamp(sinceMs)
from(tables.shrineQueries) { queryRow =>
where(queryRow.domain === domain and queryRow.username === username and queryRow.dateCreated >= since).
compute(count)
}
}
def repeatedResults(domain: String, username: String, overrideDate: XMLGregorianCalendar): Query[Long] = {
val counts: Query[GroupWithMeasures[Long, Long]] = join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) =>
where(queryRow.username === username and queryRow.domain === domain and (countRow.originalValue <> 0L) and queryRow.dateCreated > DateHelpers.toTimestamp(overrideDate)).
groupBy(countRow.originalValue).
compute(count(countRow.originalValue)).
on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId)
}
//Filter for result counts > 0
from(counts) { cnt =>
where(cnt.measures gt 0).select(cnt.measures)
}
}
val queriesForAllUsers: Query[SquerylShrineQuery] = {
from(tables.shrineQueries) { queryRow =>
select(queryRow).orderBy(queryRow.dateCreated.desc)
}
}
//TODO: Find a way to parameterize on limit, to avoid building the query every time
//TODO: limit
def queriesForUser(username: String, domain: String): Query[SquerylShrineQuery] = {
from(tables.shrineQueries) { queryRow =>
where(queryRow.domain === domain and queryRow.username === username).
select(queryRow).
orderBy(queryRow.dateCreated.desc)
}
}
def queriesForDomain(domain: String): Query[SquerylShrineQuery] = {
from(tables.shrineQueries) { queryRow =>
where(queryRow.domain === domain).
select(queryRow).
orderBy(queryRow.dateCreated.desc)
}
}
val allCountResults: Query[SquerylCountRow] = {
from(tables.countResults) { queryRow =>
select(queryRow)
}
}
def queriesByNetworkId(networkQueryId: Long): Query[SquerylShrineQuery] = {
from(tables.shrineQueries) { queryRow =>
where(queryRow.networkId === networkQueryId).select(queryRow)
}
}
//TODO: Find out how to compose queries, to re-use queriesByNetworkId
def queryNamesByNetworkId(networkQueryId: Long): Query[String] = {
from(tables.shrineQueries) { queryRow =>
where(queryRow.networkId === networkQueryId).select(queryRow.name)
}
}
def resultsForQuery(networkQueryId: Long): Query[SquerylQueryResultRow] = {
val resultsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) =>
where(queryRow.networkId === networkQueryId).
select(resultRow).
on(queryRow.id === resultRow.queryId)
}
from(resultsForNetworkQueryId)(select(_))
}
def countResults(networkQueryId: Long): Query[SquerylCountRow] = {
join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) =>
where(queryRow.networkId === networkQueryId).
select(countRow).
on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId)
}
}
def errorResults(networkQueryId: Long): Query[SquerylShrineError] = {
join(tables.shrineQueries, tables.queryResults, tables.errorResults) { (queryRow, resultRow, errorRow) =>
where(queryRow.networkId === networkQueryId).
select(errorRow).
on(queryRow.id === resultRow.queryId, resultRow.id === errorRow.resultId)
}
}
//NB: using groupBy here is too much of a pain; do it 'manually' later
def breakdownResults(networkQueryId: Long): Query[(SquerylQueryResultRow, SquerylBreakdownResultRow)] = {
join(tables.shrineQueries, tables.queryResults, tables.breakdownResults) { (queryRow, resultRow, breakdownRow) =>
where(queryRow.networkId === networkQueryId).
select((resultRow, breakdownRow)).
on(queryRow.id === resultRow.queryId, resultRow.id === breakdownRow.resultId)
}
}
}
}
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala
index cea546ce7..4156d09c9 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala
@@ -1,105 +1,103 @@
package net.shrine.adapter.service
import net.shrine.log.Loggable
import net.shrine.protocol.{BaseShrineResponse, BroadcastMessage, ErrorResponse, NodeId, RequestType, Result, Signature}
import net.shrine.adapter.AdapterMap
import net.shrine.crypto.Verifier
import net.shrine.problem.{AbstractProblem, ProblemSources}
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
/**
* Heart of the adapter.
*
* @author clint
* @since Nov 14, 2013
*/
final class AdapterService(
nodeId: NodeId,
signatureVerifier: Verifier,
maxSignatureAge: Duration,
adapterMap: AdapterMap) extends AdapterRequestHandler with Loggable {
import AdapterService._
logStartup(adapterMap)
override def handleRequest(message: BroadcastMessage): Result = {
handleInvalidSignature(message).orElse {
for {
adapter <- adapterMap.adapterFor(message.request.requestType)
} yield time(nodeId) {
adapter.perform(message)
}
}.getOrElse {
Result(nodeId, 0.milliseconds, ErrorResponse(UnknownRequestType(message.request.requestType)))
}
}
/**
* @return None if the signature is fine, Some(result with an ErrorResponse) if not
*/
private def handleInvalidSignature(message: BroadcastMessage): Option[Result] = {
val (sigIsValid, elapsed) = time(signatureVerifier.verifySig(message, maxSignatureAge))
if(sigIsValid) { None }
else {
info(s"Incoming message had invalid signature: $message")
Some(Result(nodeId, elapsed.milliseconds, ErrorResponse(CouldNotVerifySignature(message))))
}
}
}
object AdapterService extends Loggable {
private def logStartup(adapterMap: AdapterMap) {
info("Adapter service initialized, will respond to the following queries: ")
val sortedByReqType = adapterMap.requestsToAdapters.toSeq.sortBy { case (k, _) => k }
sortedByReqType.foreach {
case (requestType, adapter) =>
info(s" $requestType:\t(${adapter.getClass.getSimpleName})")
}
}
private[service] def time[T](f: => T): (T, Long) = {
val start = System.currentTimeMillis
val result = f
val elapsed = System.currentTimeMillis - start
(result, elapsed)
}
private[service] def time(nodeId: NodeId)(f: => BaseShrineResponse): Result = {
val (response, elapsed) = time(f)
Result(nodeId, elapsed.milliseconds, response)
}
}
case class CouldNotVerifySignature(message: BroadcastMessage) extends AbstractProblem(ProblemSources.Adapter){
val signature: Option[Signature] = message.signature
override val summary: String = signature.fold("A message was not signed")(sig => s"The trust relationship with ${sig.signedBy} is not properly configured.")
override val description: String = signature.fold(s"The Adapter at ${stamp.host.getHostName} could not properly validate a request because it had no signature.")(sig => s"The Adapter at ${stamp.host.getHostName} could not properly validate the request from ${sig.signedBy}. An incoming message from the hub had an invalid signature.")
override val detailsXml = signature.fold(
)(
sig =>
Signature is {sig}
)
- createAndLog
}
case class UnknownRequestType(requestType: RequestType) extends AbstractProblem(ProblemSources.Adapter){
override val summary: String = s"Unknown request type $requestType"
override val description: String = s"The Adapter at ${stamp.host.getHostName} received a request of type $requestType that it cannot process."
- createAndLog
}
\ No newline at end of file
diff --git a/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala b/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala
index 53746ce1b..d981a9125 100644
--- a/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala
+++ b/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala
@@ -1,37 +1,36 @@
package net.shrine.authentication
import net.shrine.authentication.AuthenticationResult.NotAuthenticated
import net.shrine.problem.{AbstractProblem, ProblemSources}
import scala.xml.NodeSeq
/**
* @author clint
* @since Dec 13, 2013
*/
final case class NotAuthenticatedException(domain: String, username: String,message: String, cause: Throwable) extends RuntimeException(message, cause) {
def problem = NotAuthenticatedProblem(this)
}
object NotAuthenticatedException {
def apply(na:NotAuthenticated):NotAuthenticatedException = NotAuthenticatedException(na.domain,na.username,na.message,na.cause.getOrElse(null))
}
case class NotAuthenticatedProblem(nax:NotAuthenticatedException) extends AbstractProblem(ProblemSources.Qep){
override val summary = s"Can not authenticate ${nax.domain}:${nax.username}."
override val throwable = Some(nax)
override val description = s"Can not authenticate ${nax.domain}:${nax.username}. ${nax.getLocalizedMessage}"
override val detailsXml: NodeSeq = NodeSeq.fromSeq(
{throwableDetail.getOrElse("")}
)
- createAndLog
}
\ No newline at end of file
diff --git a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala
index a4f5ee329..f1df2acca 100644
--- a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala
+++ b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala
@@ -1,115 +1,112 @@
package net.shrine.authorization
import net.shrine.log.Loggable
import scala.util.{Failure, Success, Try}
import net.shrine.client.HttpResponse
import net.shrine.i2b2.protocol.pm.GetUserConfigurationRequest
import net.shrine.i2b2.protocol.pm.User
import net.shrine.problem._
import net.shrine.protocol.AuthenticationInfo
import net.shrine.protocol.ErrorResponse
import scala.util.control.NonFatal
/**
* @author clint
* @since Apr 5, 2013
*/
trait PmAuthorizerComponent { self: PmHttpClientComponent with Loggable =>
import PmAuthorizerComponent._
//noinspection RedundantBlock
object Pm {
def parsePmResult(authn: AuthenticationInfo)(httpResponse: HttpResponse): Try[Either[ErrorResponse, User]] = {
User.fromI2b2(httpResponse.body).map(Right(_)).recoverWith {
case NonFatal(e) => {
debug(s"Couldn't extract a User from '$httpResponse'")
Try(Left(ErrorResponse.fromI2b2(httpResponse.body)))
}
}.recover {
case NonFatal(e) => {
val problem = CouldNotInterpretResponseFromPmCell(pmPoster.url,authn,httpResponse,e)
LoggingProblemHandler.handleProblem(problem)
Left(ErrorResponse(problem))
}
}
}
def authorize(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo): AuthorizationStatus = {
val request = GetUserConfigurationRequest(authn)
val responseAttempt: Try[HttpResponse] = Try {
debug(s"Authorizing with PM cell at ${pmPoster.url}")
pmPoster.post(request.toI2b2String)
}
val authStatusAttempt: Try[AuthorizationStatus with Product with Serializable] = responseAttempt.flatMap(parsePmResult(authn)).map {
case Right(user) => {
val managerUserOption = for {
roles <- user.rolesByProject.get(projectId)
if neededRoles.forall(roles.contains)
} yield user
managerUserOption.map(Authorized).getOrElse {
NotAuthorized(MissingRequiredRoles(projectId,neededRoles,authn))
}
}
case Left(errorResponse) => {
//todo remove when ErrorResponse gets its message
info(s"ErrorResponse message '${errorResponse.errorMessage}' may not have carried through to the NotAuthorized object")
NotAuthorized(errorResponse.problemDigest)
}
}
authStatusAttempt match {
case Success(s) => s
case Failure(x) => NotAuthorized(CouldNotReachPmCell(pmPoster.url,authn,x))
}
}
}
}
object PmAuthorizerComponent {
sealed trait AuthorizationStatus
case class Authorized(user: User) extends AuthorizationStatus
case class NotAuthorized(problemDigest: ProblemDigest) extends AuthorizationStatus {
def toErrorResponse = ErrorResponse(problemDigest.summary,problemDigest)
}
object NotAuthorized {
def apply(problem:Problem):NotAuthorized = NotAuthorized(problem.toDigest)
}
}
case class MissingRequiredRoles(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep) {
override val summary: String = s"User ${authn.domain}:${authn.username} is missing roles in project '$projectId'"
override val description:String = s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'"
- createAndLog
}
case class CouldNotReachPmCell(pmUrl:String,authn: AuthenticationInfo,x:Throwable) extends AbstractProblem(ProblemSources.Qep) {
override val throwable = Some(x)
override val summary: String = s"Could not reach PM cell."
override val description:String = s"Shrine encountered ${throwable.get} while attempting to reach the PM cell at $pmUrl for ${authn.domain}:${authn.username}."
- createAndLog
}
case class CouldNotInterpretResponseFromPmCell(pmUrl:String,authn: AuthenticationInfo,httpResponse: HttpResponse,x:Throwable) extends AbstractProblem(ProblemSources.Qep) {
override val throwable = Some(x)
override def summary: String = s"Could not interpret response from PM cell."
override def description: String = s"Shrine could not interpret the response from the PM cell at ${pmUrl} for ${authn.domain}:${authn.username}: due to ${throwable.get}"
override val detailsXml =
Response is {httpResponse}
{throwableDetail.getOrElse("")}
- createAndLog
}
\ No newline at end of file
diff --git a/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala b/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala
index cf5560a09..790ba61ba 100644
--- a/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala
+++ b/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala
@@ -1,236 +1,235 @@
package net.shrine.authorization
import java.net.URL
import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager}
import java.security.cert.X509Certificate
import akka.io.IO
import com.typesafe.config.{Config, ConfigFactory}
import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized}
import net.shrine.authorization.steward.{InboundShrineQuery, ResearchersTopics, TopicIdAndName}
import net.shrine.log.Loggable
import net.shrine.protocol.{ApprovedTopic, AuthenticationInfo, ErrorResponse, ReadApprovedQueryTopicsRequest, ReadApprovedQueryTopicsResponse, RunQueryRequest}
import net.shrine.config.ConfigExtensions
import org.json4s.native.JsonMethods.parse
import org.json4s.{DefaultFormats, Formats}
import akka.actor.ActorSystem
import akka.util.Timeout
import akka.pattern.ask
import net.shrine.problem.{AbstractProblem, ProblemSources}
import spray.can.Http
import spray.can.Http.{HostConnectorInfo, HostConnectorSetup}
import spray.http.{BasicHttpCredentials, HttpRequest, HttpResponse}
import spray.http.StatusCodes.{OK, Unauthorized, UnavailableForLegalReasons}
import spray.httpx.TransformerPipelineSupport.WithTransformation
import spray.httpx.Json4sSupport
import spray.client.pipelining.{Get, Post, addCredentials, sendReceive}
import spray.io.{ClientSSLEngineProvider, PipelineContext, SSLContextProvider}
import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
/**
* A QueryAuthorizationService that talks to the standard data steward application to learn about topics (intents) and check that a
* shrine query can be run
*
* @author david
* @since 4/2/15
*/
final case class StewardQueryAuthorizationService(qepUserName:String,
qepPassword:String,
stewardBaseUrl:URL,
defaultTimeout:FiniteDuration = 10 seconds) extends QueryAuthorizationService with Loggable with Json4sSupport {
import system.dispatcher // execution context for futures
implicit val system = ActorSystem("AuthorizationServiceActors",ConfigFactory.load("shrine")) //todo use shrine's config
implicit val timeout:Timeout = Timeout.durationToTimeout(defaultTimeout)//10 seconds
implicit def json4sFormats: Formats = DefaultFormats
val qepCredentials = BasicHttpCredentials(qepUserName,qepPassword)
def sendHttpRequest(httpRequest: HttpRequest):Future[HttpResponse] = {
// Place a special SSLContext in scope here to be used by HttpClient.
// It trusts all server certificates.
// Most important - it will encrypt all of the traffic on the wire.
implicit def trustfulSslContext: SSLContext = {
object BlindFaithX509TrustManager extends X509TrustManager {
def checkClientTrusted(chain: Array[X509Certificate], authType: String) = (info(s"Client asked BlindFaithX509TrustManager to check $chain for $authType"))
def checkServerTrusted(chain: Array[X509Certificate], authType: String) = (info(s"Server asked BlindFaithX509TrustManager to check $chain for $authType"))
def getAcceptedIssuers = Array[X509Certificate]()
}
val context = SSLContext.getInstance("TLS")
context.init(Array[KeyManager](), Array(BlindFaithX509TrustManager), null)
context
}
implicit def trustfulSslContextProvider: SSLContextProvider = {
SSLContextProvider.forContext(trustfulSslContext)
}
class CustomClientSSLEngineProvider extends ClientSSLEngineProvider {
def apply(pc: PipelineContext) = ClientSSLEngineProvider.default(trustfulSslContextProvider).apply(pc)
}
implicit def sslEngineProvider: ClientSSLEngineProvider = new CustomClientSSLEngineProvider
val requestWithCredentials = httpRequest ~> addCredentials(qepCredentials)
val responseFuture: Future[HttpResponse] = for {
HostConnectorInfo(hostConnector, _) <- {
val hostConnectorSetup = new HostConnectorSetup(httpRequest.uri.authority.host.address,
httpRequest.uri.authority.port,
sslEncryption = httpRequest.uri.scheme=="https")(
sslEngineProvider = sslEngineProvider)
IO(Http) ask hostConnectorSetup
}
response <- sendReceive(hostConnector).apply(requestWithCredentials)
_ <- hostConnector ask Http.CloseAll
} yield response
responseFuture
}
/* todo to recycle connections with http://spray.io/documentation/1.2.3/spray-client/ if needed
def sendHttpRequest(httpRequest: HttpRequest):Future[HttpResponse] = {
import akka.io.IO
import akka.pattern.ask
import spray.can.Http
val requestWithCredentials = httpRequest ~> addCredentials(qepCredentials)
//todo failures via onFailure callbacks
for{
sendR:SendReceive <- connectorSource
response:HttpResponse <- sendR(requestWithCredentials)
} yield response
}
val connectorSource: Future[SendReceive] = //Future[HttpRequest => Future[HttpResponse]]
for (
//keep asking for a connector until you get one
//todo correct URL
// Http.HostConnectorInfo(connector, _) <- IO(Http) ? Http.HostConnectorSetup("www.spray.io", port = 8080)
Http.HostConnectorInfo(connector, _) <- IO(Http) ? Http.HostConnectorSetup("localhost", port = 6060)
) yield sendReceive(connector)
*/
def sendAndReceive(httpRequest: HttpRequest,timeout:Duration = defaultTimeout):HttpResponse = {
info("StewardQueryAuthorizationService will request "+httpRequest.uri) //todo someday log request and response
val responseFuture = sendHttpRequest(httpRequest)
val response:HttpResponse = Await.result(responseFuture,timeout)
info("StewardQueryAuthorizationService received response with status "+response.status)
response
}
//Contact a data steward and either return an Authorized or a NotAuthorized or throw an exception
override def authorizeRunQueryRequest(runQueryRequest: RunQueryRequest): AuthorizationResult = {
debug(s"authorizeRunQueryRequest started for ${runQueryRequest.queryDefinition.name}")
val interpreted = runQueryRequest.topicId.fold(
authorizeRunQueryRequestNoTopic(runQueryRequest)
)(
authorizeRunQueryRequestForTopic(runQueryRequest,_)
)
debug(s"authorizeRunQueryRequest completed with $interpreted) for ${runQueryRequest.queryDefinition.name}")
interpreted
}
def authorizeRunQueryRequestNoTopic(runQueryRequest: RunQueryRequest): AuthorizationResult = {
val userName = runQueryRequest.authn.username
val queryId = runQueryRequest.queryDefinition.name
//xml's .text returns something that looks like xquery with backwards slashes. toString() returns xml.
val queryForJson = InboundShrineQuery(runQueryRequest.networkQueryId,queryId,runQueryRequest.queryDefinition.toXml.toString())
val request = Post(s"$stewardBaseUrl/steward/qep/requestQueryAccess/user/$userName", queryForJson)
val response:HttpResponse = sendAndReceive(request,runQueryRequest.waitTime)
interpretAuthorizeRunQueryResponse(response)
}
def authorizeRunQueryRequestForTopic(runQueryRequest: RunQueryRequest,topicIdString:String): AuthorizationResult = {
val userName = runQueryRequest.authn.username
val queryId = runQueryRequest.queryDefinition.name
//xml's .text returns something that looks like xquery with backwards slashes. toString() returns xml.
val queryForJson = InboundShrineQuery(runQueryRequest.networkQueryId,queryId,runQueryRequest.queryDefinition.toXml.toString())
val request = Post(s"$stewardBaseUrl/steward/qep/requestQueryAccess/user/$userName/topic/$topicIdString", queryForJson)
val response:HttpResponse = sendAndReceive(request,runQueryRequest.waitTime)
debug(s"authorizeRunQueryRequestForTopic response is $response")
interpretAuthorizeRunQueryResponse(response)
}
/** Interpret the response from the steward app. Primarily here for testing. */
def interpretAuthorizeRunQueryResponse(response:HttpResponse):AuthorizationResult = {
response.status match {
case OK => {
val topicJson = new String(response.entity.data.toByteArray)
debug(s"topicJson is $topicJson")
val topic:Option[TopicIdAndName] = parse(topicJson).extractOpt[TopicIdAndName]
debug(s"topic is $topic")
Authorized(topic.map(x => (x.id,x.name)))
}
case UnavailableForLegalReasons => NotAuthorized(response.entity.asString)
case Unauthorized => throw new AuthorizationException(s"steward rejected qep's login credentials. $response")
case _ => throw new AuthorizationException(s"QueryAuthorizationService detected a problem: $response")
}
}
//Either read the approved topics from a data steward or have an error response.
override def readApprovedEntries(readTopicsRequest: ReadApprovedQueryTopicsRequest): Either[ErrorResponse, ReadApprovedQueryTopicsResponse] = {
val userName = readTopicsRequest.authn.username
val request = Get(s"$stewardBaseUrl/steward/qep/approvedTopics/user/$userName")
val response:HttpResponse = sendAndReceive(request,readTopicsRequest.waitTime)
if(response.status == OK) {
val topicsJson = new String(response.entity.data.toByteArray)
val topicsFromSteward: ResearchersTopics = parse(topicsJson).extract[ResearchersTopics]
val topics: Seq[ApprovedTopic] = topicsFromSteward.topics.map(topic => ApprovedTopic(topic.id, topic.name))
Right(ReadApprovedQueryTopicsResponse(topics))
}
else Left(ErrorResponse(ErrorStatusFromDataStewardApp(response,stewardBaseUrl)))
}
override def toString() = {
super.toString().replaceAll(qepPassword,"REDACTED")
}
}
object StewardQueryAuthorizationService {
def apply(config:Config):StewardQueryAuthorizationService = StewardQueryAuthorizationService (
qepUserName = config.getString("qepUserName"),
qepPassword = config.getString("qepPassword"),
stewardBaseUrl = config.get("stewardBaseUrl", new URL(_))
)
}
case class ErrorStatusFromDataStewardApp(response:HttpResponse,stewardBaseUrl:URL) extends AbstractProblem(ProblemSources.Qep) {
override val summary: String = s"Data Steward App responded with status ${response.status}"
override val description:String = s"The Data Steward App at ${stewardBaseUrl} responded with status ${response.status}, not OK."
override val detailsXml =
Response is {response}
{throwableDetail.getOrElse("")}
- createAndLog
}
\ No newline at end of file
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/NonI2b2ableResponse.scala b/commons/protocol/src/main/scala/net/shrine/protocol/NonI2b2ableResponse.scala
index d99b9a238..7fc8a38ff 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/NonI2b2ableResponse.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/NonI2b2ableResponse.scala
@@ -1,23 +1,22 @@
package net.shrine.protocol
import net.shrine.problem.{AbstractProblem, ProblemSources}
import scala.xml.NodeSeq
/**
* @author clint
* @since Apr 30, 2013
*/
trait NonI2b2ableResponse { self: ShrineResponse =>
//Fail loudly here
protected override def i2b2MessageBody: NodeSeq = ???
override def toI2b2: NodeSeq = ErrorResponse(NoI2b2AnalogExists(this.getClass)).toI2b2
}
case class NoI2b2AnalogExists(claz:Class[_ <: NonI2b2ableResponse]) extends AbstractProblem(ProblemSources.Unknown) {
override def summary: String = s"${ claz.getSimpleName } can't be marshalled to i2b2 XML, as it has no i2b2 analog"
override def description: String = s"${ claz.getSimpleName } can't be marshalled to i2b2 XML, as it has no i2b2 analog"
- createAndLog
}
\ No newline at end of file
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
index 7c229768f..069ef0197 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
@@ -1,401 +1,400 @@
package net.shrine.protocol
import javax.xml.datatype.XMLGregorianCalendar
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, Problem, ProblemDigest, ProblemSources}
import net.shrine.protocol.QueryResult.StatusType
import scala.xml.NodeSeq
import net.shrine.util.{NodeSeqEnrichments, OptionEnrichments, SEnum, Tries, XmlDateHelper, XmlUtil}
import net.shrine.serialization.{I2b2Marshaller, XmlMarshaller}
import scala.util.Try
/**
* @author Bill Simons
* @since 4/15/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*
* NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing
*/
final case class QueryResult (
resultId: Long,
instanceId: Long,
resultType: Option[ResultOutputType],
setSize: Long,
startDate: Option[XMLGregorianCalendar],
endDate: Option[XMLGregorianCalendar],
description: Option[String],
statusType: StatusType,
statusMessage: Option[String],
problemDigest: Option[ProblemDigest] = None,
breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty
) extends XmlMarshaller with I2b2Marshaller with Loggable {
//only used in tests
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
None, //description
statusType,
None) //statusMessage
}
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
description: String,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
Option(description),
statusType,
None) //statusMessage
}
def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match {
case Some(rt) => rt == testedResultType
case _ => false
}
import QueryResult._
//NB: Fragile, non-type-safe ==
def isError = statusType == StatusType.Error
def elapsed: Option[Long] = {
def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis
for {
start <- startDate
end <- endDate
} yield inMillis(end) - inMillis(start)
}
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = {
breakdowns.values.toSeq.sortBy(_.resultType.name)
}
override def toI2b2: NodeSeq = {
import OptionEnrichments._
XmlUtil.stripWhitespace {
{ resultId }
{ instanceId }
{ description.toXml() }
{
resultType.fold( ResultOutputType.ERROR.toI2b2NameOnly("") ){ rt =>
if(rt.isBreakdown) rt.toI2b2NameOnly()
else if (rt.isError) rt.toI2b2NameOnly() //The result type can be an error
else if (statusType.isError) rt.toI2b2NameOnly() //Or the status type can be an error
else rt.toI2b2
}
}
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ statusType }
{ statusType.toI2b2(this) }
{
//NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case,
//so if we're going to do that, let's produce saner XML.
sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data"))
}
}
}
override def toXml: NodeSeq = XmlUtil.stripWhitespace {
import OptionEnrichments._
{ resultId }
{ instanceId }
{ resultType.toXml(_.toXml) }
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ description.toXml() }
{ statusType }
{ statusMessage.toXml() }
{
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
sortedBreakdowns.map(_.toXml)
}
{ problemDigest.map(_.toXml).getOrElse("") }
}
def withId(id: Long): QueryResult = copy(resultId = id)
def withInstanceId(id: Long): QueryResult = copy(instanceId = id)
def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize))
def withSetSize(size: Long): QueryResult = copy(setSize = size)
def withDescription(desc: String): QueryResult = copy(description = Option(desc))
def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType))
def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData))
def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns)
}
object QueryResult {
final case class StatusType(
name: String,
isDone: Boolean,
i2b2Id: Option[Int] = Some(-1),
private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2) extends StatusType.Value {
def isError = this == StatusType.Error
def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult)
}
object StatusType extends SEnum[StatusType] {
private val defaultToI2b2: QueryResult => NodeSeq = { queryResult =>
val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{
throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id")
}
{ i2b2Id }{ queryResult.statusType.name }
}
val noMessage:NodeSeq = null
val Error = StatusType("ERROR", isDone = true, None, { queryResult =>
(queryResult.statusMessage, queryResult.problemDigest) match {
case (Some(msg),Some(pd)) => { if(msg != "ERROR") msg else pd.summary } ++ pd.toXml
case (Some(msg),None) => { msg }
case (None,Some(pd)) => { pd.summary } ++ pd.toXml
case (None, None) => noMessage
}
})
val Finished = StatusType("FINISHED", isDone = true, Some(3))
//TODO: Can we use the same for Queued, Processing, and Incomplete?
val Processing = StatusType("PROCESSING", isDone = false, Some(2)) //todo only used in tests
val Queued = StatusType("QUEUED", isDone = false, Some(2))
val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2))
//TODO: What s should these have? Does anyone care?
val Held = StatusType("HELD", isDone = false)
val SmallQueue = StatusType("SMALL_QUEUE", isDone = false)
val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false)
val LargeQueue = StatusType("LARGE_QUEUE", isDone = false)
val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false)
}
def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong
private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption
def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _)
def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim
def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = {
import ResultOutputType.valueOf
val typeName = asText(elemNames: _*)(xml)
valueOf(typeName) orElse valueOf(breakdownTypes)(typeName)
}
def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = {
val attempt = parse(xml)
attempt.toOption
}
def extractProblemDigest(xml: NodeSeq):Option[ProblemDigest] = {
val subXml = xml \ "problem"
if(subXml.nonEmpty) Some(ProblemDigest.fromXml(xml))
else None
}
def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def extract(elemName: String): Option[String] = {
Option((xml \ elemName).text.trim).filter(!_.isEmpty)
}
def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate)
val asLong = extractLong(xml) _
import NodeSeqEnrichments.Strictness._
import Tries.sequence
def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = {
//noinspection ScalaUnnecessaryParentheses
val mapAttempt = for {
subXml <- xml.withChild(elemName)
envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes)))
mappings = envelopes.map(envelope => (envelope.resultType -> envelope))
} yield Map.empty ++ mappings
mapAttempt.getOrElse(Map.empty)
}
QueryResult(
resultId = asLong("resultId"),
instanceId = asLong("instanceId"),
resultType = extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml),
setSize = asLong("setSize"),
startDate = extractDate("startDate"),
endDate = extractDate("endDate"),
description = extract("description"),
statusType = StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call
statusMessage = extract("statusMessage"),
problemDigest = extractProblemDigest(xml),
breakdowns = extractBreakdowns("resultEnvelope")
)
}
def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def asLong = extractLong(xml) _
def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim)
def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate)
val statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get //TODO: Avoid fragile .get call
val statusMessage: Option[String] = asTextOption("query_status_type", "description")
val encodedProblemDigest = extractProblemDigest(xml \ "query_status_type")
val problemDigest = if (encodedProblemDigest.isDefined) encodedProblemDigest
else if (statusType.isError) Some(ErrorStatusFromCrc(statusMessage,xml.text).toDigest)
else None
case class Filling(
resultType:Option[ResultOutputType],
setSize:Long,
startDate:Option[XMLGregorianCalendar],
endDate:Option[XMLGregorianCalendar]
)
val filling = if(!statusType.isError) {
val resultType: Option[ResultOutputType] = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2)
val setSize = asLong("set_size")
val startDate = asXmlGcOption("start_date")
val endDate = asXmlGcOption("end_date")
Filling(resultType,setSize,startDate,endDate)
}
else {
val resultType = None
val setSize = 0L
val startDate = None
val endDate = None
Filling(resultType,setSize,startDate,endDate)
}
QueryResult(
resultId = asLong("result_instance_id"),
instanceId = asLong("query_instance_id"),
resultType = filling.resultType,
setSize = filling.setSize,
startDate = filling.startDate,
endDate = filling.endDate,
description = asTextOption("description"),
statusType = statusType,
statusMessage = statusMessage,
problemDigest = problemDigest
)
}
def errorResult(description: Option[String], statusMessage: String,problemDigest:ProblemDigest):QueryResult = {
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
def errorResult(description: Option[String], statusMessage: String,problem:Problem):QueryResult = {
val problemDigest = problem.toDigest
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
/**
* For reconstituting errorResults from a database
*/
def errorResult(description:Option[String], statusMessage:String, codec:String,stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq): QueryResult = {
// This would require parsing the stamp text to change, and without a standard locale that's nigh impossible.
// If this is replaced with real problems, then this can be addressed then. For now, passing on zero is the best bet.
val problemDigest = ProblemDigest(codec,stampText,summary,digestDescription,detailsXml,0)
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
}
case class ErrorStatusFromCrc(messageFromCrC:Option[String], xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) {
override val summary: String = "The I2B2 CRC reported an internal error."
override val description:String = s"The I2B2 CRC responded with status type ERROR ${messageFromCrC.fold(" but no message")(message => s"and a message of '$message'")}"
override val detailsXml =
CRC's Response is {xmlResponseFromCrc}
- createAndLog
}
diff --git a/commons/util/src/main/scala/net/shrine/problem/DashboardProblemsDatabase.scala b/commons/util/src/main/scala/net/shrine/problem/DashboardProblemDatabase.scala
similarity index 100%
rename from commons/util/src/main/scala/net/shrine/problem/DashboardProblemsDatabase.scala
rename to commons/util/src/main/scala/net/shrine/problem/DashboardProblemDatabase.scala
diff --git a/commons/util/src/main/scala/net/shrine/problem/Problem.scala b/commons/util/src/main/scala/net/shrine/problem/Problem.scala
index af59b9c5e..49af1b49d 100644
--- a/commons/util/src/main/scala/net/shrine/problem/Problem.scala
+++ b/commons/util/src/main/scala/net/shrine/problem/Problem.scala
@@ -1,215 +1,241 @@
package net.shrine.problem
import java.net.InetAddress
import java.text.SimpleDateFormat
import java.util.Date
import net.shrine.log.Loggable
import net.shrine.serialization.{XmlMarshaller, XmlUnmarshaller}
-import scala.concurrent.Future
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.{Future, Promise}
import scala.xml.{Elem, Node, NodeSeq}
/**
* Describes what information we have about a problem at the site in code where we discover it.
*
* @author david
* @since 8/6/15
*/
trait Problem {
def summary:String
def problemName = getClass.getName
def throwable:Option[Throwable] = None
def stamp:Stamp
def description:String
def exceptionXml(exception:Option[Throwable]): Option[Elem] = {
exception.map{x =>
{x.getClass.getName}
{x.getMessage}
{x.getStackTrace.map(line => {line})}{exceptionXml(Option(x.getCause)).getOrElse("")}
}}
def throwableDetail: Option[Elem] = exceptionXml(throwable)
def detailsXml: NodeSeq = NodeSeq.fromSeq({throwableDetail.getOrElse("")} )
def toDigest:ProblemDigest = ProblemDigest(problemName,stamp.pretty,summary,description,detailsXml, stamp.time)
def logDigest:Problem = {
if (!ProblemConfigSource.turnOffConnector) {
val problems = Problems
problems.DatabaseConnector.insertProblem(toDigest)
}
this
}
def createAndLog:Problem = {
if (!ProblemConfigSource.turnOffConnector)
Problems.DatabaseConnector.insertProblem(toDigest)
this
}
+ /**
+ * The hack that will get us through until onCreate in 2.13
+ * The problem is that we want to insert the createAndLog call after a problem is constructed.
+ * The only way to currently do that is with DelayedInit... which is just no.
+ * Thus, the hack (that's still better than DelayedInit) is to watch the summary, description,
+ * and throwable field, and call createAndLog once we know they've been initialized. The one
+ * caveat is that creating throwable is optional, so in the worst case we wait 25 ms then decide
+ * it's not gettting initialized.
+ * @return
+ */
+ def logAfterInitialization:Future[Problem] = {
+ Future {
+ while (synchronized(summary) == null || synchronized(description) == null) {
+ Thread.sleep(10)
+ }
+
+ var count = 0
+ while(count < 5 && synchronized(throwable).isEmpty) {
+ Thread.sleep(5)
+ count += 1
+ }
+
+ createAndLog
+ }
+ }
}
case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq, epoch: Long) extends XmlMarshaller {
override def toXml: Node = {
{codec}
{stampText}
{summary}
{description}
{epoch}
{detailsXml}
}
/**
* Ignores detailXml. equals with scala.xml is impossible. See http://www.scala-lang.org/api/2.10.3/index.html#scala.xml.Equality$
*/
override def equals(other: Any): Boolean =
other match {
case that: ProblemDigest =>
(that canEqual this) &&
codec == that.codec &&
stampText == that.stampText &&
summary == that.summary &&
description == that.description &&
epoch == that.epoch
case _ => false
}
/**
* Ignores detailXml
*/
override def hashCode: Int = {
val prime = 67
codec.hashCode + prime * (stampText.hashCode + prime *(summary.hashCode + prime * (description.hashCode + prime * epoch.hashCode())))
}
}
object ProblemDigest extends XmlUnmarshaller[ProblemDigest] with Loggable {
override def fromXml(xml: NodeSeq): ProblemDigest = {
val problemNode = xml \ "problem"
require(problemNode.nonEmpty,s"No problem tag in $xml")
def extractText(tagName:String) = (problemNode \ tagName).text
val codec = extractText("codec")
val stampText = extractText("stamp")
val summary = extractText("summary")
val description = extractText("description")
val detailsXml: NodeSeq = problemNode \ "details"
val epoch =
try { extractText("epoch").toLong }
catch { case nx:NumberFormatException =>
error(s"While parsing xml representing a ProblemDigest, the epoch could not be parsed into a long", nx)
0
}
ProblemDigest(codec,stampText,summary,description,detailsXml,epoch)
}
}
case class Stamp(host:InetAddress,time:Long,source:ProblemSources.ProblemSource) {
def pretty = s"${new Date(time)} on ${host.getHostName} ${source.pretty}"
}
object Stamp {
//TODO: val dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")?
//TODO: Currently the stamp text is locale specific, which can change depending on the jre/computer running it...
def apply(source:ProblemSources.ProblemSource, timer: => Long): Stamp = Stamp(InetAddress.getLocalHost, timer,source)
}
/**
* An abstract problem to enable easy creation of Problems. Note that when overriding fields,
* you should only use def or lazy val, and not val.
* See: http://stackoverflow.com/questions/15346600/field-inside-object-which-extends-app-trait-is-set-to-null-why-is-that-so
* @param source
*/
abstract class AbstractProblem(source:ProblemSources.ProblemSource) extends Problem {
println(s"Problem $getClass created")
def timer = System.currentTimeMillis
- lazy val stamp = Stamp(source, timer)
+ override val stamp = Stamp(source, System.currentTimeMillis)
+ logAfterInitialization
}
trait ProblemHandler {
def handleProblem(problem:Problem)
}
/**
* An example problem handler
*/
object LoggingProblemHandler extends ProblemHandler with Loggable {
override def handleProblem(problem: Problem): Unit = {
problem.throwable.fold(error(problem.toString))(throwable =>
error(problem.toString,throwable)
)
}
}
//object DatabaseProblemhandler extends ProblemHandler {
// override def handleProblem(problem: Problem): Unit = {
// Problems.DatabaseConnector.insertProblem(problem.toDigest)
// }
//
//}
object ProblemSources{
sealed trait ProblemSource {
def pretty = getClass.getSimpleName.dropRight(1)
}
case object Adapter extends ProblemSource
case object Hub extends ProblemSource
case object Qep extends ProblemSource
case object Dsa extends ProblemSource
case object Unknown extends ProblemSource
def problemSources = Set(Adapter,Hub,Qep,Dsa,Unknown)
}
case class ProblemNotYetEncoded(internalSummary:String,t:Option[Throwable] = None) extends AbstractProblem(ProblemSources.Unknown){
override val summary = "An unanticipated problem encountered."
override val throwable = {
val rx = t.fold(new IllegalStateException(s"$summary"))(
new IllegalStateException(s"$summary",_)
)
rx.fillInStackTrace()
Some(rx)
}
val reportedAtStackTrace = new IllegalStateException("Capture reporting stack trace.")
override val description = "This problem is not yet classified in Shrine source code. Please report the details to the Shrine dev team."
override val detailsXml: NodeSeq = NodeSeq.fromSeq(
{internalSummary}
{throwableDetail.getOrElse("")}
)
- createAndLog
}
object ProblemNotYetEncoded {
def apply(summary:String,x:Throwable):ProblemNotYetEncoded = ProblemNotYetEncoded(summary,Some(x))
}
diff --git a/commons/util/src/main/scala/net/shrine/problem/TestProblem.scala b/commons/util/src/main/scala/net/shrine/problem/TestProblem.scala
index 5a021356d..55a3419c9 100644
--- a/commons/util/src/main/scala/net/shrine/problem/TestProblem.scala
+++ b/commons/util/src/main/scala/net/shrine/problem/TestProblem.scala
@@ -1,12 +1,11 @@
package net.shrine.problem
/**
* @author david
* @since 1.22
*/
case class TestProblem(override val summary: String = "test summary",
override val description:String = "test description",
override val throwable: Option[Throwable] = None) extends AbstractProblem(ProblemSources.Unknown) {
override def timer = 0
- createAndLog
}
diff --git a/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala b/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala
index 6963ae766..df03a47f1 100644
--- a/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala
+++ b/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala
@@ -1,92 +1,92 @@
package net.shrine.problem
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import slick.driver.H2Driver.api._
import scala.concurrent.duration._
/**
* Test creation, insertion, querying, and deletion of ProblemDigest values into an
* in-memory H2 database. Demonstrates proof of concept mapping of ProblemDigest
* case class into a database.
*/
class DashboardProblemDatabaseTest extends FlatSpec with BeforeAndAfter with ScalaFutures with Matchers {
implicit val timeout = 10.seconds
val connector = Problems.DatabaseConnector
val IO = connector.IO
val problemDigests = Seq(
ProblemDigest("MJPG", "01:01:01", "summary here", "description here" , uh not sure , 2),
ProblemDigest("wewu", "01:02:01", "coffee spill", "coffee everywhere" , He chose decaf , 1),
ProblemDigest("wuwu", "02:01:01", "squirrel" , "chewed all the cables", Like ALL of them , 0),
ProblemDigest("code", "10:01:02", "such summary", "such description" , Wow , 3))
before {
connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false
connector.runBlocking(IO.createIfNotExists >> IO.tableExists) shouldBe true
connector.runBlocking(IO.createIfNotExists) shouldBe NoOperation
connector.runBlocking(IO.selectAll) shouldBe empty
}
after {
connector.runBlocking(IO.tableExists) shouldBe true
connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false
connector.runBlocking(IO.dropIfExists) shouldBe NoOperation
}
"The Database" should "Connect without any problems" in {
// Insert the suppliers and ProblemDigests
connector.executeTransactionBlocking(IO.problems ++= problemDigests)
// Test that they are all in the table
var * = connector.runBlocking(IO.selectAll)
* should contain theSameElementsAs problemDigests
* should have length problemDigests.length
// Reset the table
connector.runBlocking(IO.resetTable >> IO.selectAll) shouldBe empty
// Run the test again
connector.executeTransactionBlocking(IO.problems += problemDigests.head,
IO.problems += problemDigests(1),
IO.problems += problemDigests(2),
IO.problems += problemDigests(3))
// Test that they are all in the table
* = connector.runBlocking(IO.selectAll)
* should contain theSameElementsAs problemDigests
* should have length problemDigests.length
// Test that the simple select and filter queries work
val filtered = connector.runBlocking(IO.problems.filter(_.codec === "code").map(_.description).result)
filtered should have length 1
filtered.head shouldBe problemDigests(3).description
// This also tests that our conversion from xml to strings works
val xml = connector.runBlocking(IO.problems.map(_.xml).result)
xml should have length problemDigests.length
xml should contain theSameElementsAs problemDigests.map(_.detailsXml.toString())
val result = connector.runBlocking(IO.sizeAndProblemDigest(2))
result._1 should have length 2
result._2 shouldBe problemDigests.length
result._1.head shouldBe problemDigests(3)
result._1(1) shouldBe problemDigests.head
val resultOverLength = connector.runBlocking(IO.sizeAndProblemDigest(10))
resultOverLength._1 should have length 4
resultOverLength._1 should contain theSameElementsAs problemDigests
connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size
val testProblem = ProblemDatabaseTestProblem(ProblemSources.Unknown)
+ Thread.sleep(50)
connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size + 1
}
}
case class ProblemDatabaseTestProblem(source: ProblemSources.ProblemSource) extends AbstractProblem(source: ProblemSources.ProblemSource) {
override def summary: String = "This is a test problem! No user should ever see this."
override def description: String = "Wow, this is a nice looking problem. I mean really, just look at it."
- createAndLog
}
\ No newline at end of file
diff --git a/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala b/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala
index 7dbdab662..0b444d164 100644
--- a/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala
+++ b/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala
@@ -1,90 +1,89 @@
package net.shrine.hms.authorization
import java.net.URL
import com.typesafe.config.Config
import net.shrine.authentication.{AuthenticationResult, Authenticator}
import net.shrine.authorization.{AuthorizationResult, QueryAuthorizationService}
import net.shrine.client.EndpointConfig
import net.shrine.log.Loggable
import net.shrine.protocol.{AuthenticationInfo, CredentialConfig, ErrorResponse, ReadApprovedQueryTopicsRequest, ReadApprovedQueryTopicsResponse, RunQueryRequest}
import net.shrine.config.ConfigExtensions
import net.shrine.problem.{AbstractProblem, ProblemSources}
/**
* @author Bill Simons
* @since 1/30/12
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
final case class HmsDataStewardAuthorizationService(
sheriffClient: SheriffClient,
authenticator: Authenticator
) extends QueryAuthorizationService with Loggable {
import net.shrine.hms.authorization.HmsDataStewardAuthorizationService._
override def readApprovedEntries(request: ReadApprovedQueryTopicsRequest): Either[ErrorResponse, ReadApprovedQueryTopicsResponse] = {
val authn = request.authn
authenticate(authn) match {
case None => Left(ErrorResponse(HMSNotAuthenticatedProblem(authn)))
case Some(ecommonsUsername) =>
val topics = sheriffClient.getApprovedEntries(ecommonsUsername)
Right(ReadApprovedQueryTopicsResponse(topics))
}
}
override def authorizeRunQueryRequest(request: RunQueryRequest): AuthorizationResult = {
val authn = request.authn
if (request.topicId.isEmpty) {
AuthorizationResult.NotAuthorized(s"HMS queries require a topic id; couldn't authenticate user ${toDomainAndUser(authn)}")
} else {
authenticate(authn) match {
case None => AuthorizationResult.NotAuthorized(s"Requested topic is not approved; couldn't authenticate user ${toDomainAndUser(authn)}")
case Some(ecommonsUsername) =>
sheriffClient.isAuthorized(ecommonsUsername, request.topicId.get, request.queryDefinition.toI2b2String)
}
}
}
private def authenticate(authn: AuthenticationInfo): Option[String] = {
val authenticationResult = authenticator.authenticate(authn)
identifyEcommonsUsername(authenticationResult)
}
}
object HmsDataStewardAuthorizationService {
def apply(config:Config,authenticator: Authenticator):HmsDataStewardAuthorizationService = {
val endpointUrl = config.getString("sheriffEndpoint"+EndpointConfig.Keys.url)
val credentials = config.getConfigured("sheriffCredentials", CredentialConfig(_))
val sheriffClient = JerseySheriffClient(endpointUrl, credentials.username, credentials.password)
HmsDataStewardAuthorizationService(sheriffClient, authenticator)
}
private def toDomainAndUser(authn: AuthenticationInfo): String = s"${authn.domain}:${authn.username}"
def identifyEcommonsUsername(authenticationResult: AuthenticationResult): Option[String] = authenticationResult match {
case AuthenticationResult.Authenticated(_, ecommonsUsername) => Option(ecommonsUsername)
case _ => None
}
}
case class HMSNotAuthenticatedProblem(authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep){
override val summary = s"Can not authenticate ${authn.domain}:${authn.username}."
override val description = s"Can not authenticate ${authn.domain}:${authn.username}."
- createAndLog
}
\ No newline at end of file
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
index 37338324f..0a1de8fe6 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
@@ -1,125 +1,121 @@
package net.shrine.aggregation
import java.net.{ConnectException, UnknownHostException}
import com.sun.jersey.api.client.ClientHandlerException
import net.shrine.broadcaster.CouldNotParseResultsException
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemNotYetEncoded, ProblemSources}
import scala.concurrent.duration.Duration
import net.shrine.protocol.{BaseShrineResponse, ErrorResponse, FailureResult, FailureResult$, NodeId, Result, SingleNodeResult, Timeout}
/**
*
* @author Clint Gilbert
* @since Sep 16, 2011
* @see http://cbmi.med.harvard.edu
*
* This software is licensed under the LGPL
* @see http://www.gnu.org/licenses/lgpl.html
*
* Represents the basic aggregation strategy shared by several aggregators:
* - Parses a sequence of SpinResultEntries into a sequence of some
* combination of valid responses, ErrorResponses, and invalid
* responses (cases where ShrineResponse.fromXml returns None)
* - Filters the valid responses, weeding out responses that aren't of
* the expected type
* Invokes an abstract method with the valid responses, errors, and
* invalid responses.
*
* Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest)
*/
abstract class BasicAggregator[T <: BaseShrineResponse: Manifest] extends Aggregator with Loggable {
private[aggregation] def isAggregatable(response: BaseShrineResponse): Boolean = {
manifest[T].runtimeClass.isAssignableFrom(response.getClass)
}
import BasicAggregator._
override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): BaseShrineResponse = {
val resultsOrErrors: Iterable[ParsedResult[T]] = {
for {
result <- results
} yield {
val parsedResponse: ParsedResult[T] = result match {
case Result(origin, _, errorResponse: ErrorResponse) => Error(Option(origin), errorResponse)
case Result(origin, elapsed, response: T) if isAggregatable(response) => Valid(origin, elapsed, response)
case Timeout(origin) => Error(Option(origin), ErrorResponse(TimedOutWithAdapter(origin)))
case FailureResult(origin, cause) => cause match {
case cx: ConnectException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, cx)))
case uhx: UnknownHostException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, uhx)))
case chx: ClientHandlerException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, chx)))
case cnprx:CouldNotParseResultsException =>
if(cnprx.statusCode >= 400) Error(Option(origin), ErrorResponse(HttpErrorResponseProblem(cnprx)))
else Error(Option(origin), ErrorResponse(CouldNotParseResultsProblem(cnprx)))
case x => Error(Option(origin), ErrorResponse(ProblemNotYetEncoded(s"Failure querying node ${origin.name}",x)))
}
case _ => Invalid(None, s"Unexpected response in $getClass:\r\n $result")
}
parsedResponse
}
}
val invalidResponses = resultsOrErrors.collect { case invalid: Invalid => invalid }
val validResponses = resultsOrErrors.collect { case valid: Valid[T] => valid }
val errorResponses: Iterable[Error] = resultsOrErrors.collect { case error: Error => error }
//Log all parsing errors
invalidResponses.map(_.errorMessage).foreach(this.error(_))
val previouslyDetectedErrors = errors.map(Error(None, _))
makeResponseFrom(validResponses, errorResponses ++ previouslyDetectedErrors, invalidResponses)
}
private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse
}
object BasicAggregator {
private[aggregation] sealed abstract class ParsedResult[+T]
private[aggregation] final case class Valid[T](origin: NodeId, elapsed: Duration, response: T) extends ParsedResult[T]
private[aggregation] final case class Error(origin: Option[NodeId], response: ErrorResponse) extends ParsedResult[Nothing]
private[aggregation] final case class Invalid(origin: Option[NodeId], errorMessage: String) extends ParsedResult[Nothing]
}
case class CouldNotConnectToAdapter(origin:NodeId,cx: Exception) extends AbstractProblem(ProblemSources.Hub) {
override val throwable = Some(cx)
override val summary: String = "Shrine could not connect to the adapter."
override val description: String = s"Shrine could not connect to the adapter at ${origin.name} due to ${throwable.get}."
- createAndLog
}
case class TimedOutWithAdapter(origin:NodeId) extends AbstractProblem(ProblemSources.Hub) {
override val throwable = None
override val summary: String = "Timed out with adapter."
override val description: String = s"Shrine observed a timeout with the adapter at ${origin.name}."
- createAndLog
}
case class CouldNotParseResultsProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) {
override val throwable = Some(cnrpx)
override val summary: String = "Could not parse response."
override val description = s"While parsing a response from ${cnrpx.url} with http code ${cnrpx.statusCode} caught '${cnrpx.cause}'"
override val detailsXml =
Message body is {cnrpx.body}
{throwableDetail.getOrElse("")}
- createAndLog
}
case class HttpErrorResponseProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) {
override val throwable = Some(cnrpx)
override val summary: String = "Adapter error."
override val description = s"Observed http status code ${cnrpx.statusCode} from ${cnrpx.url} and caught ${cnrpx.cause}."
override val detailsXml =
Message body is {cnrpx.body}
{throwableDetail.getOrElse("")}
- createAndLog
}
\ No newline at end of file
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala
index ab6099403..97789b287 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala
@@ -1,44 +1,43 @@
package net.shrine.aggregation
import net.shrine.aggregation.BasicAggregator.{Error, Invalid, Valid}
import net.shrine.problem.{AbstractProblem, ProblemSources}
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.BaseShrineResponse
/**
*
* @author Clint Gilbert
* @since Sep 16, 2011
*
* @see http://cbmi.med.harvard.edu
*
* This software is licensed under the LGPL
* @see http://www.gnu.org/licenses/lgpl.html
*
* Extends BasicAggregator to ignore Errors and Invalid responses
*
* Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest)
*/
abstract class IgnoresErrorsAggregator[T <: BaseShrineResponse : Manifest] extends BasicAggregator[T] {
private[aggregation] override def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse = {
//Filter out errors and invalid responses
makeResponseFrom(validResponses)
}
//Default implementation, just returns first valid response, or if there are none, an ErrorResponse
private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]]): BaseShrineResponse = {
validResponses.map(_.response).toSet.headOption.getOrElse{
val problem = NoValidResponsesToAggregate()
ErrorResponse(problem)
}
}
}
case class NoValidResponsesToAggregate() extends AbstractProblem(ProblemSources.Hub) {
override val summary: String = "No valid responses to aggregate."
override val description:String = "The hub received no valid responses to aggregate."
- createAndLog
}
\ No newline at end of file
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala
index d87b0381b..12d42f4e9 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/PackagesErrorsAggregator.scala
@@ -1,55 +1,54 @@
package net.shrine.aggregation
import net.shrine.protocol.ShrineResponse
import net.shrine.aggregation.BasicAggregator.{Error, Invalid, Valid}
import net.shrine.problem.{AbstractProblem, ProblemSources}
import net.shrine.protocol.QueryResult
/**
*
* @author Clint Gilbert
* @since Sep 16, 2011
*
* @see http://cbmi.med.harvard.edu
*
* This software is licensed under the LGPL
* @see http://www.gnu.org/licenses/lgpl.html
*
* Extends BasicAggregator to package Errors and Invalid responses into QueryResults
*
* Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest)
*/
abstract class PackagesErrorsAggregator[T <: ShrineResponse : Manifest](
errorMessage: Option[String] = None,
invalidMessage: Option[String] = None) extends BasicAggregator[T] {
private[aggregation] def makeErrorResult(error: Error): QueryResult = {
val Error(originOption, errorResponse) = error
//Use node name as the description, to avoid giving the web UI more data than it can display
val desc = originOption.map(_.name)
QueryResult.errorResult(desc, errorMessage.getOrElse(errorResponse.errorMessage),errorResponse.problemDigest)
}
private[aggregation] def makeInvalidResult(invalid: Invalid): QueryResult = {
val Invalid(originOption, errorMessage) = invalid
//Use node name as the description, to avoid giving the web UI more data than it can display
val desc = originOption.map(_.name)
QueryResult.errorResult(desc, invalidMessage.getOrElse(errorMessage),InvalidResultProblem(invalid))
}
private[aggregation] final override def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): ShrineResponse = {
makeResponse(validResponses, errorResponses.map(makeErrorResult), invalidResponses.map(makeInvalidResult))
}
private[aggregation] def makeResponse(validResponses: Iterable[Valid[T]], errorResponses: Iterable[QueryResult], invalidResponses: Iterable[QueryResult]): ShrineResponse
}
case class InvalidResultProblem(invalid:Invalid) extends AbstractProblem(ProblemSources.Hub) {
override def summary: String = s"Invalid response."
override def description: String = s"${invalid.errorMessage} from ${invalid.origin.getOrElse("an unknown node")}"
- createAndLog
}
\ No newline at end of file
diff --git a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
index 79bf27266..b61c2a5b0 100644
--- a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
+++ b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
@@ -1,534 +1,533 @@
package net.shrine.qep.queries
import java.sql.SQLException
import java.util.concurrent.TimeoutException
import javax.sql.DataSource
import com.typesafe.config.Config
import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName}
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources}
import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, FlagQueryRequest, I2b2ResultEnvelope, QueryMaster, QueryResult, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RenameQueryRequest, ResultOutputType, ResultOutputTypes, RunQueryRequest, UnFlagQueryRequest}
import net.shrine.qep.QepConfigSource
import net.shrine.slick.{CouldNotRunDbIoActionException, TestableDataSourceCreator}
import net.shrine.util.XmlDateHelper
import slick.driver.JdbcProfile
import scala.collection.immutable.Iterable
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, Future, blocking}
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.NonFatal
import scala.xml.XML
/**
* DB code for the QEP's query instances and query results.
*
* @author david
* @since 1/19/16
*/
case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource,timeout:Duration) extends Loggable {
import schemaDef._
import jdbcProfile.api._
val database = Database.forDataSource(dataSource)
def createTables() = schemaDef.createTables(database)
def dropTables() = schemaDef.dropTables(database)
def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = {
val future: Future[R] = database.run(action)
try {
blocking {
Await.result(future, timeout)
}
}
catch {
case tx:TimeoutException => throw CouldNotRunDbIoActionException(dataSource,tx)
case NonFatal(x) => throw CouldNotRunDbIoActionException(dataSource,x)
}
}
def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = {
debug(s"insertQepQuery $runQueryRequest")
insertQepQuery(QepQuery(runQueryRequest))
}
def insertQepQuery(qepQuery: QepQuery):Unit = {
dbRun(allQepQueryQuery += qepQuery)
}
def selectAllQepQueries:Seq[QepQuery] = {
dbRun(mostRecentVisibleQepQueries.result)
}
def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = {
val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain,request.fetchSize)
val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set])
val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId)))
ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2)))
}
def selectPreviousQueriesByUserAndDomain(userName: UserName, domain: String, limit:Int):Seq[QepQuery] = {
dbRun(mostRecentVisibleQepQueries.filter(_.userName === userName).filter(_.userDomain === domain).sortBy(x => x.changeDate.desc).take(limit).result)
}
def renamePreviousQuery(request:RenameQueryRequest):Unit = {
val networkQueryId = request.networkQueryId
dbRun(
for {
queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
_ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName,changeDate = System.currentTimeMillis()))
} yield queryResults
)
}
def markDeleted(request:DeleteQueryRequest):Unit = {
val networkQueryId = request.networkQueryId
dbRun(
for {
queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
_ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true,changeDate = System.currentTimeMillis()))
} yield queryResults
)
}
def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(flagQueryRequest))
}
def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(unflagQueryRequest))
}
def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = {
dbRun(allQepQueryFlags += qepQueryFlag)
}
def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = {
val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result)
flags.map(x => x.networkQueryId -> x).toMap
}
def insertQepResultRow(qepQueryRow:QueryResultRow) = {
dbRun(allQueryResultRows += qepQueryRow)
}
def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = {
val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node"))
val queryResultRow = QueryResultRow(networkQueryId,result)
val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_))
val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq]
dbRun(
for {
_ <- allQueryResultRows += queryResultRow
_ <- allBreakdownResultsRows ++= breakdowns
_ <- allProblemDigestRows ++= problem
} yield ()
)
}
def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = {
dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result)
}
def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = {
val (queryResults, breakdowns,problems) = dbRun(
for {
queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result
breakdowns <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result
problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result
} yield (queryResults, breakdowns, problems)
)
val resultIdsToI2b2ResultEnvelopes: Map[Long, Map[ResultOutputType, I2b2ResultEnvelope]] = breakdowns.groupBy(_.resultId).map(rIdToB => rIdToB._1 -> QepQueryBreakdownResultsRow.resultEnvelopesFrom(rIdToB._2))
def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = {
if(problemSeq.size == 1) problemSeq.head.toProblemDigest
else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq")
}
val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) )
queryResults.map(r => r.toQueryResult(
resultIdsToI2b2ResultEnvelopes.getOrElse(r.resultId,Map.empty),
adapterNodesToProblemDigests.get(r.adapterNode)
))
}
def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = {
dbRun(allBreakdownResultsRows += breakdownResultsRow)
}
def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = {
dbRun(allBreakdownResultsRows.result)
}
}
object QepQueryDb extends Loggable {
val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config)
val timeout = QepQuerySchema.config.getInt("timeout") seconds
val db = QepQueryDb(QepQuerySchema.schema,dataSource,timeout)
val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart")
if(createTablesOnStart) QepQueryDb.db.createTables()
}
/**
* Separate class to support schema generation without actually connecting to the database.
*
* @param jdbcProfile Database profile to use for the schema
*/
case class QepQuerySchema(jdbcProfile: JdbcProfile,moreBreakdowns: Set[ResultOutputType]) extends Loggable {
import jdbcProfile.api._
def ddlForAllTables: jdbcProfile.DDL = {
allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema
}
//to get the schema, use the REPL
//println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n"))
def createTables(database:Database) = {
try {
val future = database.run(ddlForAllTables.create)
Await.result(future,10 seconds)
} catch {
//I'd prefer to check and create schema only if absent. No way to do that with Oracle.
case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x)
}
}
def dropTables(database:Database) = {
val future = database.run(ddlForAllTables.drop)
//Really wait forever for the cleanup
Await.result(future,Duration.Inf)
}
class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") {
def networkId = column[NetworkQueryId]("networkId")
def userName = column[UserName]("userName")
def userDomain = column[String]("domain")
def queryName = column[QueryName]("queryName")
def expression = column[Option[String]]("expression")
def dateCreated = column[Time]("dateCreated")
def deleted = column[Boolean]("deleted")
def queryXml = column[String]("queryXml")
def changeDate = column[Long]("changeDate")
def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply)
}
val allQepQueryQuery = TableQuery[QepQueries]
val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for(
queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists
) yield queries
val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false)
class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") {
def networkId = column[NetworkQueryId]("networkId")
def flagged = column[Boolean]("flagged")
def flagMessage = column[String]("flagMessage")
def changeDate = column[Long]("changeDate")
def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply)
}
val allQepQueryFlags = TableQuery[QepQueryFlags]
val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for(
queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists
) yield queryFlags
val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values ++ moreBreakdowns
val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap
val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap)
implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({
(resultType: ResultOutputType) => queryResultTypesToString(resultType)
},{
(string: String) => stringsToQueryResultTypes(string)
})
implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({
statusType => statusType.name
},{
name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}"))
})
class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") {
def resultId = column[Long]("resultId")
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def instanceId = column[Long]("instanceId")
def adapterNode = column[String]("adapterNode")
def resultType = column[Option[ResultOutputType]]("resultType")
def size = column[Long]("size")
def startDate = column[Option[Long]]("startDate")
def endDate = column[Option[Long]]("endDate")
def status = column[QueryResult.StatusType]("status")
def statusMessage = column[Option[String]]("statusMessage")
def changeDate = column[Long]("changeDate")
def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply)
}
val allQueryResultRows = TableQuery[QepQueryResults]
//Most recent query result rows for each queryId from each adapter
val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for(
queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists
) yield queryResultRows
class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def resultId = column[Long]("resultId")
def resultType = column[ResultOutputType]("resultType")
def dataKey = column[String]("dataKey")
def value = column[Long]("value")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply)
}
val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults]
//Most recent query result rows for each queryId from each adapter
val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for(
breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists
) yield breakdownResultsRows
/*
case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller {
*/
class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def codec = column[String]("codec")
def stamp = column[String]("stamp")
def summary = column[String]("summary")
def description = column[String]("description")
def details = column[String]("details")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply)
}
val allProblemDigestRows = TableQuery[QepResultProblemDigests]
val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for(
problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists
) yield problemDigests
}
object QepQuerySchema {
val allConfig:Config = QepConfigSource.config
val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database")
val slickProfileClassName = config.getString("slickProfileClassName")
val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName)
import net.shrine.config.{ConfigExtensions, Keys}
val moreBreakdowns: Set[ResultOutputType] = config.getOptionConfigured("breakdownResultOutputTypes",ResultOutputTypes.fromConfig).getOrElse(Set.empty)
val schema = QepQuerySchema(slickProfile,moreBreakdowns)
}
case class QepQuery(
networkId:NetworkQueryId,
userName: UserName,
userDomain: String,
queryName: QueryName,
expression: Option[String],
dateCreated: Time,
deleted: Boolean,
queryXml: String,
changeDate: Time
){
def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = {
QueryMaster(
queryMasterId = networkId.toString,
networkQueryId = networkId,
name = queryName,
userId = userName,
groupId = userDomain,
createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated),
flagged = qepQueryFlag.map(_.flagged),
flagMessage = qepQueryFlag.map(_.flagMessage)
)
}
}
object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,Option[String],Time,Boolean,String,Time) => QepQuery) {
def apply(runQueryRequest: RunQueryRequest):QepQuery = {
new QepQuery(
networkId = runQueryRequest.networkQueryId,
userName = runQueryRequest.authn.username,
userDomain = runQueryRequest.authn.domain,
queryName = runQueryRequest.queryDefinition.name,
expression = runQueryRequest.queryDefinition.expr.map(_.toString),
dateCreated = System.currentTimeMillis(),
deleted = false,
queryXml = runQueryRequest.toXmlString,
changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryFlag(
networkQueryId: NetworkQueryId,
flagged:Boolean,
flagMessage:String,
changeDate:Long
)
object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) {
def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = flagQueryRequest.networkQueryId,
flagged = true,
flagMessage = flagQueryRequest.message.getOrElse(""),
changeDate = System.currentTimeMillis()
)
}
def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = unflagQueryRequest.networkQueryId,
flagged = false,
flagMessage = "",
changeDate = System.currentTimeMillis()
)
}
}
case class QueryResultRow(
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:Option[ResultOutputType],
size:Long,
startDate:Option[Long],
endDate:Option[Long],
status:QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long
) {
def toQueryResult(breakdowns:Map[ResultOutputType,I2b2ResultEnvelope],problemDigest:Option[ProblemDigest]) = QueryResult(
resultId = resultId,
instanceId = instanceId,
resultType = resultType,
setSize = size,
startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar),
endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar),
description = Some(adapterNode),
statusType = status,
statusMessage = statusMessage,
breakdowns = breakdowns,
problemDigest = problemDigest
)
}
object QueryResultRow extends ((Long,NetworkQueryId,Long,String,Option[ResultOutputType],Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow)
{
def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = {
new QueryResultRow(
resultId = result.resultId,
networkQueryId = networkQueryId,
instanceId = result.instanceId,
adapterNode = result.description.getOrElse(s"$result has None in its description field, not a name of an adapter node."),
resultType = result.resultType,
size = result.setSize,
startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis),
endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis),
status = result.statusType,
statusMessage = result.statusMessage,
changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryBreakdownResultsRow(
networkQueryId: NetworkQueryId,
adapterNode:String,
resultId:Long,
resultType: ResultOutputType,
dataKey:String,
value:Long,
changeDate:Long
)
object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){
def breakdownRowsFor(networkQueryId:NetworkQueryId,
adapterNode:String,
resultId:Long,
breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = {
breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis()))
}
def resultEnvelopesFrom(breakdowns:Seq[QepQueryBreakdownResultsRow]): Map[ResultOutputType, I2b2ResultEnvelope] = {
def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = {
val data = breakdowns.map(b => b.dataKey -> b.value).toMap
I2b2ResultEnvelope(resultType,data)
}
breakdowns.groupBy(_.resultType).map(r => r._1 -> resultEnvelopeFrom(r._1,r._2))
}
}
case class QepProblemDigestRow(
networkQueryId: NetworkQueryId,
adapterNode: String,
codec: String,
stampText: String,
summary: String,
description: String,
details: String,
changeDate:Long
){
def toProblemDigest = {
ProblemDigest(
codec,
stampText,
summary,
description,
if(!details.isEmpty) XML.loadString(details)
else ,
//TODO: FIGURE OUT HOW TO GET AN ACUTAL EPOCH INTO HERE
0
)
}
}
case class QepDatabaseProblem(x:Exception) extends AbstractProblem(ProblemSources.Qep){
override val summary = "A problem encountered while using a database."
override val throwable = Some(x)
override val description = x.getMessage
- createAndLog
}
\ No newline at end of file