diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala
index 5a4a8cc89..9049bec52 100644
--- a/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala
+++ b/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala
@@ -1,150 +1,176 @@
package net.shrine.metadata
import com.typesafe.config.Config
import net.shrine.config.ConfigExtensions
import net.shrine.hornetqclient.CouldNotCreateQueueButOKToRetryException
import net.shrine.log.Log
import net.shrine.messagequeueservice.protocol.Envelope
import net.shrine.messagequeueservice.{Message, MessageQueueService, Queue}
import net.shrine.problem.{AbstractProblem, ProblemSources}
-import net.shrine.protocol.{AggregatedRunQueryResponse, ResultOutputType, ResultOutputTypes}
-import net.shrine.qep.querydb.QepQueryDb
+import net.shrine.protocol.{AggregatedRunQueryResponse, QueryResult, ResultOutputType, ResultOutputTypes}
+import net.shrine.qep.querydb.{QepQueryDb, QueryResultRow}
import net.shrine.source.ConfigSource
+import net.shrine.status.protocol.IncrementalQueryResult
import net.shrine.util.Versions
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
/**
* Receives messages and writes the result to the QEP's cache
*
* @author david
* @since 8/18/17
*/
object QepReceiver {
val config: Config = ConfigSource.config
val nodeName = config.getString("shrine.humanReadableNodeName")
//create a daemon thread that long-polls for messages forever
val pollingThread = new Thread(QepReceiverRunner(nodeName),s"${getClass.getSimpleName} poller")
pollingThread.setDaemon(true)
//todo pollingThread.setUncaughtExceptionHandler() SHRINE-2198
pollingThread.start()
Log.debug(s"Started the QepReceiver thread for $nodeName")
case class QepReceiverRunner(nodeName:String) extends Runnable {
val pollDuration = Duration("15 seconds") //todo from config
val breakdownTypes: Set[ResultOutputType] = ConfigSource.config.getOptionConfigured("shrine.breakdownResultOutputTypes", ResultOutputTypes.fromConfig).getOrElse(Set.empty)
override def run(): Unit = {
val queue = createQueue(nodeName)
while (true) {
//forever
try {
//todo only ask to receive a message if there are incomplete queries SHRINE-2196
Log.debug("About to call receive.")
receiveAMessage(queue)
Log.debug("Called receive.")
} catch {
case NonFatal(x) => ExceptionWhileReceivingMessage(queue,x)
//pass-through to blow up the thread, receive no more results, do something dramatic in UncaughtExceptionHandler.
case x => Log.error("Fatal exception while receiving a message", x)
throw x
}
}
}
def receiveAMessage(queue:Queue): Unit = {
val maybeMessage: Try[Option[Message]] = MessageQueueService.service.receive(queue, pollDuration) //todo make pollDuration configurable (and testable)
maybeMessage.transform({m =>
m.map(interpretAMessage(_,queue)).getOrElse(Success())
},{x =>
x match {
case NonFatal(nfx) => ExceptionWhileReceivingMessage(queue,x)
case _ => //pass through
}
Failure(x)
})
}
def interpretAMessage(message: Message,queue: Queue): Try[Unit] = {
val unit = ()
Log.debug(s"Received a message from $queue of $message")
val contents = message.contents
Envelope.fromJson(contents).flatMap{
case e:Envelope if e.shrineVersion == Versions.version => Success(e)
case e:Envelope => Failure(new IllegalArgumentException(s"Envelope version is not ${Versions.version}")) //todo better exception
case notE => Failure(new IllegalArgumentException(s"Not an expected message Envelope but a ${notE.getClass}")) //todo better exception
}.flatMap {
case Envelope(contentsType, contents, shrineVersion) if contentsType == AggregatedRunQueryResponse.getClass.getSimpleName => {
- AggregatedRunQueryResponse.fromXmlString(breakdownTypes)(contents)
+ AggregatedRunQueryResponse.fromXmlString(breakdownTypes)(contents).flatMap{ rqr =>
+ QepQueryDb.db.insertQueryResult(rqr.queryId, rqr.results.head)
+ Log.debug(s"Inserted result from ${rqr.results.head.description} for query ${rqr.queryId}")
+ Success(unit)
+ }
}
- case _ => Failure(new IllegalArgumentException("Not an expected type of message from this queue")) //todo better exception
- }.transform({ rqr =>
- QepQueryDb.db.insertQueryResult(rqr.queryId, rqr.results.head)
- Log.debug(s"Inserted result from ${rqr.results.head.description} for query ${rqr.queryId}")
+ case Envelope(contentsType, contents, shrineVersion) if contentsType == IncrementalQueryResult.incrementalQueryResultsEnvelopeContentsType => {
+ val changeDate = System.currentTimeMillis()
+ IncrementalQueryResult.seqFromJson(contents).flatMap { iqrs: Seq[IncrementalQueryResult] =>
+ val rows = iqrs.map(iqr => QueryResultRow(
+ resultId = -1L,
+ networkQueryId = iqr.networkQueryId,
+ instanceId = -1L,
+ adapterNode = iqr.adapterNodeName,
+ resultType = None,
+ size = 0L,
+ startDate = None,
+ endDate = None,
+ status = QueryResult.StatusType.valueOf(iqr.statusTypeName).get,
+ statusMessage = Some(iqr.statusMessage),
+ changeDate = changeDate
+ ))
+
+ QepQueryDb.db.insertQueryResultRows(rows)
+ Log.debug(s"Inserted incremental results $iqrs")
+ Success(unit)
+ }
+ }
+ case _ => Failure(new IllegalArgumentException(s"Not an expected type of message from this queue: $contents")) //todo better exception
+ }.transform({ s =>
message.complete()
Success(unit)
},{ x =>
x match {
case NonFatal(nfx) => QepReceiverCouldNotDecodeMessage(contents,queue,x)
- case _ => //pass through
+ case x => throw x//blow something up
}
+ message.complete() //complete anyway. Can't be interpreted, so we don't want to see it again
Failure(x)
})
}
def createQueue(nodeName:String):Queue = {
//Either come back with the right exception to try again, or a Queue
def tryToCreateQueue():Try[Queue] =
MessageQueueService.service.createQueueIfAbsent(nodeName)
def keepGoing(attempt:Try[Queue]):Try[Boolean] = attempt.transform({queue => Success(false)}, {
case okIsh: CouldNotCreateQueueButOKToRetryException => Success(true)
case x => Failure(x)
})
//todo for fun figure out how to do this without the var. maybe a Stream ? SHRINE-2211
var lastAttempt:Try[Queue] = tryToCreateQueue()
while(keepGoing(lastAttempt).get) {
Log.debug(s"Last attempt to create a queue resulted in $lastAttempt. Sleeping $pollDuration before next attempt")
Thread.sleep(pollDuration.toMillis)
lastAttempt = tryToCreateQueue()
}
Log.info(s"Finishing createQueue with $lastAttempt")
lastAttempt.get
}
}
}
case class ExceptionWhileReceivingMessage(queue:Queue, x:Throwable) extends AbstractProblem(ProblemSources.Qep) {
override val throwable = Some(x)
override def summary: String = s"The QEP encountered an exception while trying to receive a message from $queue"
override def description: String = s"The QEP encountered an exception while trying to receive a message from $queue on ${Thread.currentThread().getName}: ${x.getMessage}"
}
case class QepReceiverCouldNotDecodeMessage(messageString:String,queue:Queue, x:Throwable) extends AbstractProblem(ProblemSources.Qep) {
override val throwable = Some(x)
override def summary: String = s"The QEP could not decode a message from $queue"
override def description: String =
s"""The QEP encountered an exception while trying to decode a message from $queue on ${Thread.currentThread().getName}:
|${x.getMessage}
|$messageString""".stripMargin
}
\ No newline at end of file
diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala
index 718a36536..dad8b51ad 100644
--- a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala
+++ b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala
@@ -1,387 +1,392 @@
package net.shrine.metadata
import java.util.UUID
import akka.actor.ActorSystem
import net.shrine.audit.{NetworkQueryId, QueryName, Time}
import net.shrine.authorization.steward.UserName
+import net.shrine.config.ConfigExtensions
import net.shrine.i2b2.protocol.pm.User
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources}
-import net.shrine.protocol.ResultOutputType
+import net.shrine.protocol.{QueryResult, ResultOutputType}
import net.shrine.qep.querydb.{FullQueryResult, QepQuery, QepQueryBreakdownResultsRow, QepQueryDb, QepQueryDbChangeNotifier, QepQueryFlag}
import net.shrine.source.ConfigSource
-import net.shrine.config.ConfigExtensions
import rapture.json._
import rapture.json.formatters.humanReadable
import rapture.json.jsonBackends.jawn._
import spray.http.{StatusCode, StatusCodes}
import spray.routing._
import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.{blocking, Promise}
+import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Try
import scala.util.control.NonFatal
/**
* An API to support the web client's work with queries.
*
* The current API supplies information about previous running queries. Eventually this will support accessing
* information about queries running now and the ability to submit queries.
*/
//todo move this to the qep/service module, or somewhere in the qep subproject
trait QepService extends HttpService with Loggable {
def system: ActorSystem
val qepQueryDbChangeNotifier = QepQueryDbChangeNotifier(system)
val qepReceiver = QepReceiver //start the QepReceiver by bringing it into context
val qepInfo =
"""
|The SHRINE query entry point service.
|
|This API gives a researcher access to queries, and (eventually) the ability to run queries.
|
""".stripMargin
def qepRoute(user: User): Route = pathPrefix("qep") {
get {
detach(){
queryResult(user) ~ queryResultsTable(user)
}
} ~
pathEndOrSingleSlash{complete(qepInfo)} ~
respondWithStatus(StatusCodes.NotFound){complete(qepInfo)}
}
/*
Races to complete are OK in spray. They're already happening, in fact.
When a request comes in
if the request can be fulfilled immediately then do that
if not
create a promise to fulfil to trigger the complete
create a promise to bump that first one on timeout
schedule a runnable to bump the timeout promise
create a promise to bump that first one if the conditions are right
create a promise to bump the conditional one and stuff it in a concurrent map for other parts of the system to find
onSuccess remove the conditional promise and cancel the scheduled timeout.
*/
def queryResult(user:User):Route = path("queryResult" / LongNumber) { queryId: NetworkQueryId =>
//take optional parameters for version and an awaitTime, but insist on both
//If the timeout parameter isn't supplied then the deadline is now so it will reply immediately
parameters('afterVersion.as[Long] ? 0L, 'timeoutSeconds.as[Long] ? 0L) { (afterVersion: Long, timeoutSeconds: Long) =>
//check that the timeout is less than the spray "give up" timeout
val sprayRequestTimeout = ConfigSource.config.get("spray.can.server.request-timeout",Duration(_)).toSeconds
val maximumTimeout = sprayRequestTimeout - 1
if (maximumTimeout <= timeoutSeconds) warn(s"""spray.can.server.request-timeout $sprayRequestTimeout is too short
|relative to timeoutSeconds $timeoutSeconds . The server may produce a timeout-related error. Using
|$maximumTimeout instead of $timeoutSeconds to try to prevent that.""".stripMargin)
val timeout = Seq(maximumTimeout,timeoutSeconds).min
//times for local races.
val requestStartTime = System.currentTimeMillis()
val deadline = requestStartTime + (timeout * 1000)
detach(){
val troubleOrResultsRow = selectResultsRow(queryId, user)
if (shouldRespondNow(deadline, afterVersion, troubleOrResultsRow)) {
//bypass all the concurrent/interrupt business. Just reply.
completeWithQueryResult(queryId,troubleOrResultsRow)
}
else {
debug(s"Creating promises to respond about $queryId with a version later than $afterVersion by $deadline ")
// the Promise used to respond
val okToRespond = Promise[Either[(StatusCode,String),ResultsRow]]()
//Schedule the timeout
val okToRespondTimeout = Promise[Unit]()
okToRespondTimeout.future.transform({unit =>
okToRespond.tryComplete(Try(selectResultsRow(queryId, user)))
},{x:Throwable => x match {case NonFatal(t) => ExceptionWhilePreparingTimeoutResponse(queryId,t)}
x
})
val timeLeft = (deadline - System.currentTimeMillis()) milliseconds
case class TriggerRunnable(networkQueryId: NetworkQueryId,promise: Promise[Unit]) extends Runnable {
val unit:Unit = ()
override def run(): Unit = promise.trySuccess(unit)
}
val timeoutCanceller = system.scheduler.scheduleOnce(timeLeft,TriggerRunnable(queryId,okToRespondTimeout))
//Set up for an interrupt from new data
val okToRespondIfNewData = Promise[Unit]()
okToRespondIfNewData.future.transform({unit =>
val latestResultsRow = selectResultsRow(queryId, user)
if(shouldRespondNow(deadline,afterVersion,latestResultsRow)) {
okToRespond.tryComplete(Try(selectResultsRow(queryId, user)))
}
},{x:Throwable => x match {case NonFatal(t) => ExceptionWhilePreparingTriggeredResponse(queryId,t)}
x
})
val requestId = UUID.randomUUID()
//put id -> okToRespondIfNewData in a map so that outside processes can trigger it
qepQueryDbChangeNotifier.putLongPollRequest(requestId,queryId,okToRespondIfNewData)
onSuccess(okToRespond.future){ latestResultsRow:Either[(StatusCode,String),ResultsRow] =>
//clean up concurrent bits before responding
qepQueryDbChangeNotifier.removeLongPollRequest(requestId)
timeoutCanceller.cancel()
completeWithQueryResult(queryId,latestResultsRow)
}
}
}
}
}
/**
* @param deadline time when a response must go
* @param afterVersion last timestamp the requester knows about
* @param resultsRow either the result row or something is not right
* @return true to respond now, false to dither
*/
def shouldRespondNow(deadline: Long,
afterVersion: Long,
resultsRow:Either[(StatusCode,String),ResultsRow]
):Boolean = {
val currentTime = System.currentTimeMillis()
if (currentTime >= deadline) true
else resultsRow.fold(
{_._1 != StatusCodes.NotFound},
{_.dataVersion > afterVersion}
)
}
def completeWithQueryResult(networkQueryId: NetworkQueryId,troubleOrResultsRow:Either[(StatusCode,String),ResultsRow]): Route = {
debug(s"Responding to a request for $networkQueryId with $troubleOrResultsRow")
troubleOrResultsRow.fold({ trouble =>
//something is wrong. Respond now.
respondWithStatus(trouble._1) {
complete(trouble._2)
}
}, { queryAndResults =>
//everything is fine. Respond now.
val json: Json = Json(queryAndResults)
val formattedJson: String = Json.format(json)(humanReadable())
complete(formattedJson)
})
}
def selectResultsRow(queryId:NetworkQueryId,user:User):Either[(StatusCode,String),ResultsRow] = {
//query once and determine if the latest change > afterVersion
val queryOption: Option[QepQuery] = QepQueryDb.db.selectQueryById(queryId)
queryOption.map { query: QepQuery =>
if (user.sameUserAs(query.userName, query.userDomain)) {
+
+ debug(s"Query from the database is $query")
+
val mostRecentQueryResults: Seq[Result] = QepQueryDb.db.selectMostRecentFullQueryResultsFor(queryId).map(Result(_))
val flag = QepQueryDb.db.selectMostRecentQepQueryFlagFor(queryId).map(QueryFlag(_))
val queryCell = QueryCell(query, flag)
val queryAndResults = ResultsRow(queryCell, mostRecentQueryResults)
+ debug(s"queryAndResults is $queryAndResults")
+
Right(queryAndResults)
}
else Left((StatusCodes.Forbidden, s"Query $queryId belongs to a different user"))
}.getOrElse(Left[(StatusCode, String), ResultsRow]((StatusCodes.NotFound, s"No query with id $queryId found")))
}
def queryResultsTable(user: User): Route = path("queryResultsTable") {
matchQueryParameters(Some(user.username)) { queryParameters: QueryParameters =>
val queryRowCount: Int = QepQueryDb.db.countPreviousQueriesByUserAndDomain(
userName = user.username,
domain = user.domain
)
val queries: Seq[QepQuery] = QepQueryDb.db.selectPreviousQueriesByUserAndDomain(
userName = user.username,
domain = user.domain,
skip = queryParameters.skipOption,
limit = queryParameters.limitOption
)
//todo revisit json structure to remove things the front-end doesn't use
val adapters: Seq[String] = QepQueryDb.db.selectDistinctAdaptersWithResults
val flags: Map[NetworkQueryId, QueryFlag] = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(queries.map(q => q.networkId).to[Set])
.map(q => q._1 -> QueryFlag(q._2))
val queryResults: Seq[ResultsRow] = queries.map(q => ResultsRow(
query = QueryCell(q, flags.get(q.networkId)),
results = QepQueryDb.db.selectMostRecentFullQueryResultsFor(q.networkId).map(Result(_))))
val table: ResultsTable = ResultsTable(queryRowCount, queryParameters.skipOption.getOrElse(0), adapters, queryResults)
val jsonTable: Json = Json(table)
val formattedTable: String = Json.format(jsonTable)(humanReadable())
complete(formattedTable)
}
}
def matchQueryParameters(userName: Option[UserName])(parameterRoute: QueryParameters => Route): Route = {
parameters('skip.as[Int].?, 'limit.as[Int].?) { (skipOption, limitOption) =>
val qp = QueryParameters(
userName,
skipOption,
limitOption
)
parameterRoute(qp)
}
}
}
//todo maybe move to QepQueryDb class
case class QueryParameters(
researcherIdOption:Option[UserName] = None,
skipOption:Option[Int] = None,
limitOption:Option[Int] = None
)
case class ResultsTable(
rowCount:Int,
rowOffset:Int,
adapters:Seq[String], //todo type for adapter name
queryResults:Seq[ResultsRow]
)
case class ResultsRow(
query:QueryCell,
results: Seq[Result],
isComplete: Boolean, //a member variable to appear in json
dataVersion:Long //a time stamp in 1.23, a counting integer in a future release
)
object ResultsRow {
def apply(
query: QueryCell,
results: Seq[Result]
): ResultsRow = {
val isComplete = if (results.isEmpty) false
else results.forall(_.isComplete)
val dataVersion = (Seq(query.changeDate) ++ results.map(_.changeDate)).max //the latest change date
ResultsRow(query, results, isComplete, dataVersion)
}
}
case class QueryCell(
networkId:String, //easier to support in json, lessens the impact of using a GUID iff we can get there
queryName: QueryName,
dateCreated: Time,
queryXml: String,
changeDate: Time,
flag:Option[QueryFlag]
)
object QueryCell {
def apply(qepQuery: QepQuery,flag: Option[QueryFlag]): QueryCell = QueryCell(
networkId = qepQuery.networkId.toString,
queryName = qepQuery.queryName,
dateCreated = qepQuery.dateCreated,
queryXml = qepQuery.queryXml,
changeDate = qepQuery.changeDate,
flag
)
}
case class QueryFlag(
flagged:Boolean,
flagMessage:String,
changeDate:Long
)
object QueryFlag{
def apply(qepQueryFlag: QepQueryFlag): QueryFlag = QueryFlag(qepQueryFlag.flagged, qepQueryFlag.flagMessage, qepQueryFlag.changeDate)
}
case class Result (
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:Option[ResultOutputType],
count:Long,
status:String, //todo QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long,
breakdowns: Seq[BreakdownResultsForType],
problemDigest:Option[ProblemDigestForJson]
) {
- def isComplete = true //todo until I get to SHRINE-2148
+ def isComplete = QueryResult.StatusType.valueOf(status).get.isCrcCallCompleted
}
object Result {
def apply(fullQueryResult: FullQueryResult): Result = new Result(
resultId = fullQueryResult.resultId,
networkQueryId = fullQueryResult.networkQueryId,
instanceId = fullQueryResult.instanceId,
adapterNode = fullQueryResult.adapterNode,
resultType = fullQueryResult.resultType,
count = fullQueryResult.count,
status = fullQueryResult.status.toString,
statusMessage = fullQueryResult.statusMessage,
changeDate = fullQueryResult.changeDate,
breakdowns = fullQueryResult.breakdownTypeToResults.map(tToR => BreakdownResultsForType(fullQueryResult.adapterNode,tToR._1,tToR._2)).to[Seq],
problemDigest = fullQueryResult.problemDigest.map(ProblemDigestForJson(_))
)
}
//todo replace when you figure out how to json-ize xml in rapture
case class ProblemDigestForJson(codec: String,
stampText: String,
summary: String,
description: String,
detailsString: String,
epoch: Long)
object ProblemDigestForJson {
def apply(problemDigest: ProblemDigest): ProblemDigestForJson = ProblemDigestForJson(
problemDigest.codec,
problemDigest.stampText,
problemDigest.summary,
problemDigest.description,
problemDigest.detailsXml.text,
problemDigest.epoch)
}
case class BreakdownResultsForType(resultType:ResultOutputType,results:Seq[BreakdownResult])
object BreakdownResultsForType {
def apply(adapterName: String, breakdownType: ResultOutputType, breakdowns: Seq[QepQueryBreakdownResultsRow]): BreakdownResultsForType = {
val breakdownResults = breakdowns.filter(_.adapterNode == adapterName).map(row => BreakdownResult(row.dataKey,row.value,row.changeDate))
BreakdownResultsForType(breakdownType,breakdownResults)
}
}
case class BreakdownResult(dataKey:String,value:Long,changeDate:Long)
case class ExceptionWhilePreparingTriggeredResponse(networkQueryId: NetworkQueryId,x:Throwable) extends AbstractProblem(ProblemSources.Qep) {
override def throwable = Some(x)
override def summary: String = "Unable to prepare a triggered response due to an exception."
override def description: String = s"Unable to prepare a promised response for query $networkQueryId due to a ${x.getClass.getSimpleName}"
}
case class ExceptionWhilePreparingTimeoutResponse(networkQueryId: NetworkQueryId,x:Throwable) extends AbstractProblem(ProblemSources.Qep) {
override def throwable = Some(x)
override def summary: String = "Unable to prepare a triggered response due to an exception."
override def description: String = s"Unable to prepare a promised response for $networkQueryId due to a ${x.getClass.getSimpleName}"
}
\ No newline at end of file
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
index 9260fa607..d19b084ce 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
@@ -1,401 +1,409 @@
package net.shrine.protocol
import javax.xml.datatype.XMLGregorianCalendar
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, Problem, ProblemDigest, ProblemSources}
import net.shrine.protocol.QueryResult.StatusType
import scala.xml.NodeSeq
import net.shrine.util.{NodeSeqEnrichments, OptionEnrichments, SEnum, Tries, XmlDateHelper, XmlUtil}
import net.shrine.serialization.{I2b2Marshaller, XmlMarshaller}
import scala.util.Try
/**
* @author Bill Simons
* @since 4/15/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*
* NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing
*/
final case class QueryResult (
resultId: Long,
instanceId: Long,
resultType: Option[ResultOutputType],
setSize: Long,
startDate: Option[XMLGregorianCalendar],
endDate: Option[XMLGregorianCalendar],
description: Option[String],
statusType: StatusType,
statusMessage: Option[String],
problemDigest: Option[ProblemDigest] = None,
breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty
) extends XmlMarshaller with I2b2Marshaller with Loggable {
//only used in tests
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
None, //description
statusType,
None) //statusMessage
}
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
description: String,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
Option(description),
statusType,
None) //statusMessage
}
def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match {
case Some(rt) => rt == testedResultType
case _ => false
}
import QueryResult._
//NB: Fragile, non-type-safe ==
def isError = statusType == StatusType.Error
def elapsed: Option[Long] = {
def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis
for {
start <- startDate
end <- endDate
} yield inMillis(end) - inMillis(start)
}
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = {
breakdowns.values.toSeq.sortBy(_.resultType.name)
}
override def toI2b2: NodeSeq = {
import OptionEnrichments._
XmlUtil.stripWhitespace {
{ resultId }
{ instanceId }
{ description.toXml() }
{
resultType.fold( ResultOutputType.ERROR.toI2b2NameOnly("") ){ rt =>
if(rt.isBreakdown) rt.toI2b2NameOnly()
else if (rt.isError) rt.toI2b2NameOnly() //The result type can be an error
else if (statusType.isError) rt.toI2b2NameOnly() //Or the status type can be an error
else rt.toI2b2
}
}
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ statusType }
{ statusType.toI2b2(this) }
{
//NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case,
//so if we're going to do that, let's produce saner XML.
sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data"))
}
}
}
override def toXml: NodeSeq = XmlUtil.stripWhitespace {
import OptionEnrichments._
{ resultId }
{ instanceId }
{ resultType.toXml(_.toXml) }
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ description.toXml() }
{ statusType }
{ statusMessage.toXml() }
{
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
sortedBreakdowns.map(_.toXml)
}
{ problemDigest.map(_.toXml).getOrElse("") }
}
def withId(id: Long): QueryResult = copy(resultId = id)
def withInstanceId(id: Long): QueryResult = copy(instanceId = id)
def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize))
def withSetSize(size: Long): QueryResult = copy(setSize = size)
def withDescription(desc: String): QueryResult = copy(description = Option(desc))
def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType))
def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData))
def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns)
}
object QueryResult {
//todo add two new Statuses for the intermediate observations at the hub
final case class StatusType(
name: String,
isDone: Boolean,
i2b2Id: Option[Int] = Some(-1),
- private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2) extends StatusType.Value {
+ private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2,
+ lifeCycle:Int = Int.MaxValue //MaxValue indicates that the CRC is done with the request. Greater integers are further along in the lifecycle
+ ) extends StatusType.Value {
def isError = this == StatusType.Error
def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult)
+
+ def isCrcCallCompleted = lifeCycle == Int.MaxValue
}
object StatusType extends SEnum[StatusType] {
private val defaultToI2b2: QueryResult => NodeSeq = { queryResult =>
val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{
throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id")
}
{ i2b2Id }{ queryResult.statusType.name }
}
val noMessage:NodeSeq = null
val Error = StatusType("ERROR", isDone = true, None, { queryResult =>
(queryResult.statusMessage, queryResult.problemDigest) match {
case (Some(msg),Some(pd)) => { if(msg != "ERROR") msg else pd.summary } ++ pd.toXml
case (Some(msg),None) => { msg }
case (None,Some(pd)) => { pd.summary } ++ pd.toXml
case (None, None) => noMessage
}
})
val Finished = StatusType("FINISHED", isDone = true, Some(3))
//TODO: Can we use the same for Queued, Processing, and Incomplete?
val Processing = StatusType("PROCESSING", isDone = false, Some(2)) //todo only used in tests
val Queued = StatusType("QUEUED", isDone = false, Some(2))
val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2))
//TODO: What s should these have? Does anyone care?
val Held = StatusType("HELD", isDone = false)
val SmallQueue = StatusType("SMALL_QUEUE", isDone = false)
val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false)
val LargeQueue = StatusType("LARGE_QUEUE", isDone = false)
val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false)
+
+ //SHRINE's internal states as reported by the hub
+ val HubWillSubmit = StatusType("HUB_WILL_SUBMIT",isDone = false,lifeCycle = 10)
+ val HubSubmittedRequest = StatusType("HUB_SUBMITTED_REQUEST",isDone = false,lifeCycle = 20)
}
def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong
private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption
def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _)
def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim
def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = {
import ResultOutputType.valueOf
val typeName = asText(elemNames: _*)(xml)
valueOf(typeName) orElse valueOf(breakdownTypes)(typeName)
}
def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = {
val attempt = parse(xml)
attempt.toOption
}
def extractProblemDigest(xml: NodeSeq):Option[ProblemDigest] = {
val subXml = xml \ "problem"
if(subXml.nonEmpty) Some(ProblemDigest.fromXml(xml))
else None
}
def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def extract(elemName: String): Option[String] = {
Option((xml \ elemName).text.trim).filter(!_.isEmpty)
}
def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate)
val asLong = extractLong(xml) _
import NodeSeqEnrichments.Strictness._
import Tries.sequence
def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = {
//noinspection ScalaUnnecessaryParentheses
val mapAttempt = for {
subXml <- xml.withChild(elemName)
envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes)))
mappings = envelopes.map(envelope => (envelope.resultType -> envelope))
} yield Map.empty ++ mappings
mapAttempt.getOrElse(Map.empty)
}
QueryResult(
resultId = asLong("resultId"),
instanceId = asLong("instanceId"),
resultType = extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml),
setSize = asLong("setSize"),
startDate = extractDate("startDate"),
endDate = extractDate("endDate"),
description = extract("description"),
statusType = StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call
statusMessage = extract("statusMessage"),
problemDigest = extractProblemDigest(xml),
breakdowns = extractBreakdowns("resultEnvelope")
)
}
def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def asLong = extractLong(xml) _
def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim)
def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate)
val statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get //TODO: Avoid fragile .get call
val statusMessage: Option[String] = asTextOption("query_status_type", "description")
val encodedProblemDigest = extractProblemDigest(xml \ "query_status_type")
val problemDigest = if (encodedProblemDigest.isDefined) encodedProblemDigest
else if (statusType.isError) Some(ErrorStatusFromCrc(statusMessage,xml.text).toDigest)
else None
case class Filling(
resultType:Option[ResultOutputType],
setSize:Long,
startDate:Option[XMLGregorianCalendar],
endDate:Option[XMLGregorianCalendar]
)
val filling = if(!statusType.isError) {
val resultType: Option[ResultOutputType] = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2)
val setSize = asLong("set_size")
val startDate = asXmlGcOption("start_date")
val endDate = asXmlGcOption("end_date")
Filling(resultType,setSize,startDate,endDate)
}
else {
val resultType = None
val setSize = 0L
val startDate = None
val endDate = None
Filling(resultType,setSize,startDate,endDate)
}
QueryResult(
resultId = asLong("result_instance_id"),
instanceId = asLong("query_instance_id"),
resultType = filling.resultType,
setSize = filling.setSize,
startDate = filling.startDate,
endDate = filling.endDate,
description = asTextOption("description"),
statusType = statusType,
statusMessage = statusMessage,
problemDigest = problemDigest
)
}
def errorResult(description: Option[String], statusMessage: String,problemDigest:ProblemDigest):QueryResult = {
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
def errorResult(description: Option[String], statusMessage: String,problem:Problem):QueryResult = {
val problemDigest = problem.toDigest
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
/**
* For reconstituting errorResults from a database
*/
def errorResult(description:Option[String], statusMessage:String, codec:String,stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq): QueryResult = {
// This would require parsing the stamp text to change, and without a standard locale that's nigh impossible.
// If this is replaced with real problems, then this can be addressed then. For now, passing on zero is the best bet.
val problemDigest = ProblemDigest(codec,stampText,summary,digestDescription,detailsXml,0)
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
}
case class ErrorStatusFromCrc(messageFromCrC:Option[String], xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) {
override val summary: String = "The I2B2 CRC reported an internal error."
override val description:String = s"The I2B2 CRC responded with status type ERROR ${messageFromCrC.fold(" but no message")(message => s"and a message of '$message'")}"
override val detailsXml =
CRC's Response is {xmlResponseFromCrc}
}
diff --git a/commons/protocol/src/main/scala/net/shrine/status/protocol/IncrementalQueryResult.scala b/commons/protocol/src/main/scala/net/shrine/status/protocol/IncrementalQueryResult.scala
new file mode 100644
index 000000000..e52148b3a
--- /dev/null
+++ b/commons/protocol/src/main/scala/net/shrine/status/protocol/IncrementalQueryResult.scala
@@ -0,0 +1,43 @@
+package net.shrine.status.protocol
+
+import net.shrine.audit.NetworkQueryId
+import org.json4s.ShortTypeHints
+import org.json4s.native.Serialization
+
+import scala.util.Try
+
+/**
+ * Carrier for incremental query result progress for SHRINE-1.23. Should not be considered a model for future work.
+ *
+ * @author david
+ * @since 9/8/17
+ */
+case class IncrementalQueryResult(
+ networkQueryId:NetworkQueryId,
+ adapterNodeName:String,
+ statusTypeName:String,
+ statusMessage:String
+ ) {
+
+ def toJson:String = {
+ Serialization.write(this)(IncrementalQueryResult.formats)
+ }
+
+}
+
+object IncrementalQueryResult {
+ val formats = Serialization.formats(ShortTypeHints(List(classOf[IncrementalQueryResult])))
+
+ val incrementalQueryResultsEnvelopeContentsType = s"Seq of ${classOf[IncrementalQueryResult].getSimpleName}s"
+
+ def seqToJson(incrementalQueryResults: Seq[IncrementalQueryResult]):String = {
+ Serialization.write(incrementalQueryResults)(formats)
+ }
+
+ def seqFromJson(jsonString:String):Try[Seq[IncrementalQueryResult]] = Try{
+ implicit val formats = IncrementalQueryResult.formats
+ Serialization.read[Seq[IncrementalQueryResult]](jsonString)
+ }
+
+
+}
\ No newline at end of file
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala
index 3908c0ccf..b9f511bd8 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala
@@ -1,147 +1,169 @@
package net.shrine.broadcaster
import net.shrine.adapter.client.{AdapterClient, RemoteAdapterClient}
import net.shrine.aggregation.RunQueryAggregator
import net.shrine.audit.NetworkQueryId
import net.shrine.broadcaster.dao.HubDao
import net.shrine.client.TimeoutException
import net.shrine.log.Loggable
import net.shrine.messagequeueservice.MessageQueueService
import net.shrine.messagequeueservice.protocol.Envelope
import net.shrine.problem.{AbstractProblem, ProblemSources}
-import net.shrine.protocol.{AggregatedRunQueryResponse, BaseShrineResponse, BroadcastMessage, FailureResult, RunQueryRequest, SingleNodeResult, Timeout}
+import net.shrine.protocol.{AggregatedRunQueryResponse, BaseShrineResponse, BroadcastMessage, FailureResult, QueryResult, RunQueryRequest, SingleNodeResult, Timeout}
+import net.shrine.status.protocol.IncrementalQueryResult
import scala.concurrent.Future
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
/**
* @author clint
* @since Nov 15, 2013
*/
final case class AdapterClientBroadcaster(destinations: Set[NodeHandle], dao: HubDao) extends Broadcaster with Loggable {
logStartup()
import scala.concurrent.ExecutionContext.Implicits.global
override def broadcast(message: BroadcastMessage): Multiplexer = {
logOutboundIfNecessary(message)
+ //send back json containing just enough to fill a QueryResultRow
+ message.request match {
+ case runQueryRequest: RunQueryRequest =>
+ debug(s"RunQueryRequest's nodeId is ${runQueryRequest.nodeId}")
+ runQueryRequest.nodeId.fold {
+ error(s"Did not send to queue because nodeId is None")
+ } { nodeId => {
+ val hubWillSubmitStatuses = destinations.map { nodeHandle =>
+ IncrementalQueryResult(
+ runQueryRequest.networkQueryId,
+ nodeHandle.nodeId.name,
+ QueryResult.StatusType.HubWillSubmit.name,
+ s"The hub is about to submit query ${runQueryRequest.networkQueryId} to ${nodeHandle.nodeId.name}"
+ )
+ }.to[Seq]
+ val envelope = Envelope(IncrementalQueryResult.incrementalQueryResultsEnvelopeContentsType,IncrementalQueryResult.seqToJson(hubWillSubmitStatuses))
+ sendToQep(envelope,nodeId.name,s"Status update - the hub will submit the query to the adapters for ${runQueryRequest.networkQueryId}")
+ }
+ }
+ case _ => //don't care
+ }
+
val multiplexer: Multiplexer = new BufferingMultiplexer(destinations.map(_.nodeId))
for {
nodeHandle <- destinations
shrineResponse: SingleNodeResult <- callAdapter(message, nodeHandle)
} {
try {
message.request match {
case runQueryRequest:RunQueryRequest =>
debug(s"RunQueryRequest's nodeId is ${runQueryRequest.nodeId}")
runQueryRequest.nodeId.fold{
error(s"Did not send to queue because nodeId is None")
}{ nodeId =>
// make an AggregateRunQueryResponse from the SingleNodeResult
val aggregator = new RunQueryAggregator( //to convert the SingleNodeResult into an AggregateRunQueryResponse
runQueryRequest.networkQueryId,
runQueryRequest.authn.username,
runQueryRequest.authn.domain,
runQueryRequest.queryDefinition,
false
)
val response: BaseShrineResponse = aggregator.aggregate(Seq(shrineResponse),Seq.empty,message)
response match {
- case runQueryResponse:AggregatedRunQueryResponse => sendToQep(runQueryResponse,nodeId.name)
+ case runQueryResponse:AggregatedRunQueryResponse =>
+ val envelope = Envelope(AggregatedRunQueryResponse.getClass.getSimpleName,runQueryResponse.toXmlString)
+ sendToQep(envelope,nodeId.name,s"Result from ${runQueryResponse.results.head.description.get}")
case _ => error(s"response is not a AggregatedRunQueryResponse. It is ${response.toString}")
}
}
case _ => debug(s"Not a RunQueryRequest but a ${message.request.getClass.getSimpleName}.")
}
multiplexer.processResponse(shrineResponse) }
finally { logResultsIfNecessary(message, shrineResponse) }
}
multiplexer
}
- private def sendToQep(runQueryResponse: AggregatedRunQueryResponse,queueName:String):Unit = {
- val envelope = Envelope(AggregatedRunQueryResponse.getClass.getSimpleName,runQueryResponse.toXmlString)
+ private def sendToQep(envelope: Envelope,queueName:String,logDescription:String):Unit = {
val s: Try[Unit] = for {
queue <- MessageQueueService.service.createQueueIfAbsent(queueName)
sent <- MessageQueueService.service.send(envelope.toJson, queue)
} yield sent
s.transform({itWorked =>
- debug(s"Result from ${runQueryResponse.results.head.description.get} sent to queue")
+ debug(s"$logDescription sent to queue")
Success(itWorked)
},{throwable: Throwable =>
throwable match
{
- case NonFatal(x) =>ExceptionWhileSendingMessage(runQueryResponse.queryId,queueName,x)
+ case NonFatal(x) =>ExceptionWhileSendingMessage(logDescription,queueName,x)
case _ => //no op
}
Failure(throwable)
})
}
private[broadcaster] def callAdapter(message: BroadcastMessage, nodeHandle: NodeHandle): Future[SingleNodeResult] = {
val NodeHandle(nodeId, client) = nodeHandle
- //todo more status for SHRINE-2122
- // todo send back json containing just enough to fill a QueryResultRow
// may need to do SHRINE-2177 to make this work
client.query(message).recover {
case e: TimeoutException =>
error(s"Broadcasting to $nodeId timed out")
Timeout(nodeId)
case NonFatal(e) =>
error(s"Broadcasting to $nodeId failed with ", e)
FailureResult(nodeId, e)
}
//todo more status for SHRINE-2123
// todo send back json containing just enough to fill a QueryResultRow
}
private[broadcaster] def logResultsIfNecessary(message: BroadcastMessage, result: SingleNodeResult): Unit = logIfNecessary(message) { _ =>
dao.logQueryResult(message.requestId, result)
}
private[broadcaster] def logOutboundIfNecessary(message: BroadcastMessage): Unit = logIfNecessary(message) { runQueryReq =>
dao.logOutboundQuery(message.requestId, message.networkAuthn, runQueryReq.queryDefinition)
}
private[broadcaster] def logIfNecessary(message: BroadcastMessage)(f: RunQueryRequest => Any): Unit = {
message.request match {
case runQueryReq: RunQueryRequest => f(runQueryReq)
case _ => ()
}
}
private def logStartup(): Unit = {
def clientToString(client: AdapterClient): String = client match {
case r: RemoteAdapterClient => r.poster.url.toString
case _ => ""
}
info(s"Initialized ${getClass.getSimpleName}, will broadcast to the following destinations:")
destinations.toSeq.sortBy(_.nodeId.name).foreach { handle =>
info(s" ${handle.nodeId}: ${clientToString(handle.client)}")
}
}
}
-case class ExceptionWhileSendingMessage(networkQueryId: NetworkQueryId,queueName:String, x:Throwable) extends AbstractProblem(ProblemSources.Hub) {
+case class ExceptionWhileSendingMessage(logDescription:String,queueName:String, x:Throwable) extends AbstractProblem(ProblemSources.Hub) {
override val throwable = Some(x)
override def summary: String = s"The Hub encountered an exception while trying to send a message to $queueName"
- override def description: String = s"The Hub encountered an exception while trying to send a message about $networkQueryId from $queueName : ${x.getMessage}"
+ override def description: String = s"The Hub encountered an exception while trying to send a message about $logDescription from $queueName : ${x.getMessage}"
}
\ No newline at end of file
diff --git a/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala
index c26a95d88..59b4392dc 100644
--- a/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala
+++ b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala
@@ -1,610 +1,619 @@
package net.shrine.qep.querydb
import java.sql.SQLException
import java.util.concurrent.TimeoutException
import javax.sql.DataSource
import com.typesafe.config.Config
import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName}
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources}
import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, FlagQueryRequest, I2b2ResultEnvelope, QueryMaster, QueryResult, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RenameQueryRequest, ResultOutputType, ResultOutputTypes, RunQueryRequest, UnFlagQueryRequest}
import net.shrine.slick.{CouldNotRunDbIoActionException, TestableDataSourceCreator, TimeoutInDbIoActionException}
import net.shrine.source.ConfigSource
import net.shrine.util.XmlDateHelper
import slick.driver.JdbcProfile
import scala.collection.immutable.Iterable
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, Future, blocking}
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.NonFatal
import scala.xml.XML
/**
* DB code for the QEP's query instances and query results.
*
* @author david
* @since 1/19/16
*/
case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource,timeout:Duration) extends Loggable {
import schemaDef._
import jdbcProfile.api._
val database = Database.forDataSource(dataSource)
def createTables() = schemaDef.createTables(database)
def dropTables() = schemaDef.dropTables(database)
def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = {
val future: Future[R] = database.run(action)
try {
Await.result(future, timeout)
}
catch {
case tx:TimeoutException => throw TimeoutInDbIoActionException(dataSource, timeout, tx)
case NonFatal(x) => throw CouldNotRunDbIoActionException(dataSource,x)
}
}
def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = {
debug(s"insertQepQuery $runQueryRequest")
insertQepQuery(QepQuery(runQueryRequest))
}
def insertQepQuery(qepQuery: QepQuery):Unit = {
dbRun(allQepQueryQuery += qepQuery)
}
def selectAllQepQueries:Seq[QepQuery] = {
dbRun(mostRecentVisibleQepQueries.result)
}
def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = {
val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(
request.authn.username,
request.authn.domain,
None,
Some(request.fetchSize))
val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set])
val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId)))
ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2)))
}
def countPreviousQueriesByUserAndDomain(userName: UserName, domain: String):Int = {
val q = mostRecentVisibleQepQueries.filter(r => r.userName === userName && r.userDomain === domain)
dbRun(q.size.result)
}
def selectQueryById(networkQueryId: NetworkQueryId): Option[QepQuery] =
dbRun(mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result).lastOption
def selectPreviousQueriesByUserAndDomain(userName: UserName, domain: String, skip:Option[Int] = None, limit:Option[Int] = None):Seq[QepQuery] = {
debug(s"start selectPreviousQueriesByUserAndDomain $userName $domain")
val q = mostRecentVisibleQepQueries.filter(r => r.userName === userName && r.userDomain === domain).sortBy(x => x.changeDate.desc)
val qWithSkip = skip.fold(q)(q.drop)
val qWithLimit = limit.fold(qWithSkip)(qWithSkip.take)
val result = dbRun(qWithLimit.result)
debug(s"finished selectPreviousQueriesByUserAndDomain with $result")
result
}
def renamePreviousQuery(request:RenameQueryRequest):Unit = {
val networkQueryId = request.networkQueryId
dbRun(
for {
queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
_ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName,changeDate = System.currentTimeMillis()))
} yield queryResults
)
}
def markDeleted(request:DeleteQueryRequest):Unit = {
val networkQueryId = request.networkQueryId
dbRun(
for {
queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
_ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true,changeDate = System.currentTimeMillis()))
} yield queryResults
)
}
def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(flagQueryRequest))
}
def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(unflagQueryRequest))
}
def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = {
dbRun(allQepQueryFlags += qepQueryFlag)
}
def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = {
val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result)
flags.map(x => x.networkQueryId -> x).toMap
}
def selectMostRecentQepQueryFlagFor(networkQueryId: NetworkQueryId): Option[QepQueryFlag] =
dbRun(mostRecentQueryFlags.filter(_.networkId === networkQueryId).result).lastOption
def insertQepResultRow(qepQueryRow:QueryResultRow) = {
dbRun(allQueryResultRows += qepQueryRow)
}
def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = {
val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node"))
val queryResultRow = QueryResultRow(networkQueryId,result)
val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_))
val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq]
dbRun(
for {
_ <- allQueryResultRows += queryResultRow
_ <- allBreakdownResultsRows ++= breakdowns
_ <- allProblemDigestRows ++= problem
} yield ()
)
}
-//todo only used in tests. Is that OK?
+ def insertQueryResultRow(queryResultRow: QueryResultRow) = {
+ dbRun(allQueryResultRows += queryResultRow)
+ }
+
+ def insertQueryResultRows(queryResultRows: Seq[QueryResultRow]) = {
+ dbRun(allQueryResultRows ++= queryResultRows)
+ }
+
+ //todo only used in tests. Is that OK?
def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = {
dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result)
}
def selectMostRecentFullQueryResultsFor(networkId:NetworkQueryId): Seq[FullQueryResult] = {
val (queryResults, breakdowns,problems) = dbRun(
for {
queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result
breakdowns: Seq[QepQueryBreakdownResultsRow] <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result
problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result
} yield (queryResults, breakdowns, problems)
)
val breakdownTypeToResults: Map[ResultOutputType, Seq[QepQueryBreakdownResultsRow]] = breakdowns.groupBy(_.resultType)
def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = {
if(problemSeq.size == 1) problemSeq.head.toProblemDigest
else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq")
}
val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) )
queryResults.map(r => FullQueryResult(
r,
breakdownTypeToResults,
adapterNodesToProblemDigests.get(r.adapterNode)
))
}
def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = {
val fullQueryResults = selectMostRecentFullQueryResultsFor(networkId)
fullQueryResults.map(_.toQueryResult)
}
def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = {
dbRun(allBreakdownResultsRows += breakdownResultsRow)
}
def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = {
dbRun(allBreakdownResultsRows.result)
}
def selectDistinctAdaptersWithResults:Seq[String] = {
dbRun(allQueryResultRows.map(_.adapterNode).distinct.result).sorted
}
}
object QepQueryDb extends Loggable {
val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config)
val timeout = QepQuerySchema.config.getInt("timeout") seconds
val db = QepQueryDb(QepQuerySchema.schema,dataSource,timeout)
val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart")
if(createTablesOnStart) QepQueryDb.db.createTables()
}
/**
* Separate class to support schema generation without actually connecting to the database.
*
* @param jdbcProfile Database profile to use for the schema
*/
case class QepQuerySchema(jdbcProfile: JdbcProfile,moreBreakdowns: Set[ResultOutputType]) extends Loggable {
import jdbcProfile.api._
def ddlForAllTables: jdbcProfile.DDL = {
allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema
}
//to get the schema, use the REPL
//println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n"))
def createTables(database:Database) = {
try {
val future = database.run(ddlForAllTables.create)
Await.result(future,10 seconds)
} catch {
//I'd prefer to check and create schema only if absent. No way to do that with Oracle.
case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x)
}
}
def dropTables(database:Database) = {
val future = database.run(ddlForAllTables.drop)
//Really wait forever for the cleanup
Await.result(future,Duration.Inf)
}
class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") {
def networkId = column[NetworkQueryId]("networkId")
def userName = column[UserName]("userName")
def userDomain = column[String]("domain")
def queryName = column[QueryName]("queryName")
def expression = column[Option[String]]("expression")
def dateCreated = column[Time]("dateCreated")
def deleted = column[Boolean]("deleted")
def queryXml = column[String]("queryXml")
def changeDate = column[Long]("changeDate")
def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply)
}
val allQepQueryQuery = TableQuery[QepQueries]
val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for(
queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists
) yield queries
val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false)
class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") {
def networkId = column[NetworkQueryId]("networkId")
def flagged = column[Boolean]("flagged")
def flagMessage = column[String]("flagMessage")
def changeDate = column[Long]("changeDate")
def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply)
}
val allQepQueryFlags = TableQuery[QepQueryFlags]
val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for(
queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists
) yield queryFlags
val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values ++ moreBreakdowns
val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap
val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap)
implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({
(resultType: ResultOutputType) => queryResultTypesToString(resultType)
},{
(string: String) => stringsToQueryResultTypes(string)
})
implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({
statusType => statusType.name
},{
name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}"))
})
class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") {
def resultId = column[Long]("resultId")
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def instanceId = column[Long]("instanceId")
def adapterNode = column[String]("adapterNode")
def resultType = column[Option[ResultOutputType]]("resultType")
def size = column[Long]("size")
def startDate = column[Option[Long]]("startDate")
def endDate = column[Option[Long]]("endDate")
def status = column[QueryResult.StatusType]("status")
def statusMessage = column[Option[String]]("statusMessage")
def changeDate = column[Long]("changeDate")
def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply)
}
val allQueryResultRows = TableQuery[QepQueryResults]
//Most recent query result rows for each queryId from each adapter
+ //todo check result lifecycles for SHRINE-2122
val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for(
queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists
) yield queryResultRows
class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def resultId = column[Long]("resultId")
def resultType = column[ResultOutputType]("resultType")
def dataKey = column[String]("dataKey")
def value = column[Long]("value")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply)
}
val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults]
//Most recent query result rows for each queryId from each adapter
val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for(
breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists
) yield breakdownResultsRows
/*
case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller {
*/
class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def codec = column[String]("codec")
def stamp = column[String]("stamp")
def summary = column[String]("summary")
def description = column[String]("description")
def details = column[String]("details")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply)
}
val allProblemDigestRows = TableQuery[QepResultProblemDigests]
val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for(
problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists
) yield problemDigests
}
object QepQuerySchema {
val allConfig:Config = ConfigSource.config
val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database")
val slickProfile:JdbcProfile = ConfigSource.getObject("slickProfileClassName", config)
import net.shrine.config.ConfigExtensions
val moreBreakdowns: Set[ResultOutputType] = config.getOptionConfigured("breakdownResultOutputTypes",ResultOutputTypes.fromConfig).getOrElse(Set.empty)
val schema = QepQuerySchema(slickProfile,moreBreakdowns)
}
case class QepQuery(
networkId:NetworkQueryId,
userName: UserName,
userDomain: String,
queryName: QueryName,
expression: Option[String],
dateCreated: Time,
deleted: Boolean,
queryXml: String,
changeDate: Time
){
def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = {
QueryMaster(
queryMasterId = networkId.toString,
networkQueryId = networkId,
name = queryName,
userId = userName,
groupId = userDomain,
createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated),
flagged = qepQueryFlag.map(_.flagged),
flagMessage = qepQueryFlag.map(_.flagMessage)
)
}
}
object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,Option[String],Time,Boolean,String,Time) => QepQuery) {
def apply(runQueryRequest: RunQueryRequest):QepQuery = {
new QepQuery(
networkId = runQueryRequest.networkQueryId,
userName = runQueryRequest.authn.username,
userDomain = runQueryRequest.authn.domain,
queryName = runQueryRequest.queryDefinition.name,
expression = runQueryRequest.queryDefinition.expr.map(_.toString),
dateCreated = System.currentTimeMillis(),
deleted = false,
queryXml = runQueryRequest.toXmlString,
changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryFlag(
networkQueryId: NetworkQueryId,
flagged:Boolean,
flagMessage:String,
changeDate:Long
)
object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) {
def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = flagQueryRequest.networkQueryId,
flagged = true,
flagMessage = flagQueryRequest.message.getOrElse(""),
changeDate = System.currentTimeMillis()
)
}
def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = unflagQueryRequest.networkQueryId,
flagged = false,
flagMessage = "",
changeDate = System.currentTimeMillis()
)
}
}
//todo replace with a class per state
case class FullQueryResult(
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:Option[ResultOutputType],
count:Long,
startDate:Option[Long],
endDate:Option[Long],
status:QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long,
breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]],
problemDigest:Option[ProblemDigest]
) {
def toQueryResult = {
def resultEnvelopesFrom(breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]]): Map[ResultOutputType, I2b2ResultEnvelope] = {
def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = {
val data = breakdowns.map(b => b.dataKey -> b.value).toMap
I2b2ResultEnvelope(resultType,data)
}
breakdownTypeToResults.map(r => r._1 -> resultEnvelopeFrom(r._1,r._2))
}
QueryResult(
resultId = resultId,
instanceId = instanceId,
resultType = resultType,
setSize = count,
startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar),
endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar),
description = Some(adapterNode),
statusType = status,
statusMessage = statusMessage,
breakdowns = resultEnvelopesFrom(breakdownTypeToResults),
problemDigest = problemDigest
)
}
}
object FullQueryResult {
def apply(row:QueryResultRow,
breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]],
problemDigest:Option[ProblemDigest]):FullQueryResult = {
FullQueryResult(resultId = row.resultId,
networkQueryId = row.networkQueryId,
instanceId = row.instanceId,
adapterNode = row.adapterNode,
resultType = row.resultType,
count = row.size,
startDate = row.startDate,
endDate = row.endDate,
status = row.status,
statusMessage = row.statusMessage,
changeDate = row.changeDate,
breakdownTypeToResults = breakdownTypeToResults,
problemDigest = problemDigest
)
}
}
case class QueryResultRow(
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:Option[ResultOutputType],
size:Long,
startDate:Option[Long],
endDate:Option[Long],
status:QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long
) {
}
object QueryResultRow extends ((Long,NetworkQueryId,Long,String,Option[ResultOutputType],Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow)
{
def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = {
new QueryResultRow(
resultId = result.resultId,
networkQueryId = networkQueryId,
instanceId = result.instanceId,
adapterNode = result.description.getOrElse(s"$result has None in its description field, instead of the name of an adapter node."),
resultType = result.resultType,
size = result.setSize,
startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis),
endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis),
status = result.statusType,
statusMessage = result.statusMessage,
changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryBreakdownResultsRow(
networkQueryId: NetworkQueryId,
adapterNode:String,
resultId:Long,
resultType: ResultOutputType,
dataKey:String,
value:Long,
changeDate:Long
)
object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){
def breakdownRowsFor(networkQueryId:NetworkQueryId,
adapterNode:String,
resultId:Long,
breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = {
breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis()))
}
}
case class QepProblemDigestRow(
networkQueryId: NetworkQueryId,
adapterNode: String,
codec: String,
stampText: String,
summary: String,
description: String,
details: String,
changeDate:Long
){
def toProblemDigest = {
ProblemDigest(
codec,
stampText,
summary,
description,
if(!details.isEmpty) XML.loadString(details)
else ,
//TODO: FIGURE OUT HOW TO GET AN ACUTAL EPOCH INTO HERE
0
)
}
}
case class QepDatabaseProblem(x:Exception) extends AbstractProblem(ProblemSources.Qep){
override val summary = "A problem encountered while using a database."
override val throwable = Some(x)
override val description = x.getMessage
}
\ No newline at end of file