diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala index 5a4a8cc89..9049bec52 100644 --- a/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala +++ b/apps/meta-app/src/main/scala/net/shrine/metadata/QepReceiver.scala @@ -1,150 +1,176 @@ package net.shrine.metadata import com.typesafe.config.Config import net.shrine.config.ConfigExtensions import net.shrine.hornetqclient.CouldNotCreateQueueButOKToRetryException import net.shrine.log.Log import net.shrine.messagequeueservice.protocol.Envelope import net.shrine.messagequeueservice.{Message, MessageQueueService, Queue} import net.shrine.problem.{AbstractProblem, ProblemSources} -import net.shrine.protocol.{AggregatedRunQueryResponse, ResultOutputType, ResultOutputTypes} -import net.shrine.qep.querydb.QepQueryDb +import net.shrine.protocol.{AggregatedRunQueryResponse, QueryResult, ResultOutputType, ResultOutputTypes} +import net.shrine.qep.querydb.{QepQueryDb, QueryResultRow} import net.shrine.source.ConfigSource +import net.shrine.status.protocol.IncrementalQueryResult import net.shrine.util.Versions import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal /** * Receives messages and writes the result to the QEP's cache * * @author david * @since 8/18/17 */ object QepReceiver { val config: Config = ConfigSource.config val nodeName = config.getString("shrine.humanReadableNodeName") //create a daemon thread that long-polls for messages forever val pollingThread = new Thread(QepReceiverRunner(nodeName),s"${getClass.getSimpleName} poller") pollingThread.setDaemon(true) //todo pollingThread.setUncaughtExceptionHandler() SHRINE-2198 pollingThread.start() Log.debug(s"Started the QepReceiver thread for $nodeName") case class QepReceiverRunner(nodeName:String) extends Runnable { val pollDuration = Duration("15 seconds") //todo from config val breakdownTypes: Set[ResultOutputType] = ConfigSource.config.getOptionConfigured("shrine.breakdownResultOutputTypes", ResultOutputTypes.fromConfig).getOrElse(Set.empty) override def run(): Unit = { val queue = createQueue(nodeName) while (true) { //forever try { //todo only ask to receive a message if there are incomplete queries SHRINE-2196 Log.debug("About to call receive.") receiveAMessage(queue) Log.debug("Called receive.") } catch { case NonFatal(x) => ExceptionWhileReceivingMessage(queue,x) //pass-through to blow up the thread, receive no more results, do something dramatic in UncaughtExceptionHandler. case x => Log.error("Fatal exception while receiving a message", x) throw x } } } def receiveAMessage(queue:Queue): Unit = { val maybeMessage: Try[Option[Message]] = MessageQueueService.service.receive(queue, pollDuration) //todo make pollDuration configurable (and testable) maybeMessage.transform({m => m.map(interpretAMessage(_,queue)).getOrElse(Success()) },{x => x match { case NonFatal(nfx) => ExceptionWhileReceivingMessage(queue,x) case _ => //pass through } Failure(x) }) } def interpretAMessage(message: Message,queue: Queue): Try[Unit] = { val unit = () Log.debug(s"Received a message from $queue of $message") val contents = message.contents Envelope.fromJson(contents).flatMap{ case e:Envelope if e.shrineVersion == Versions.version => Success(e) case e:Envelope => Failure(new IllegalArgumentException(s"Envelope version is not ${Versions.version}")) //todo better exception case notE => Failure(new IllegalArgumentException(s"Not an expected message Envelope but a ${notE.getClass}")) //todo better exception }.flatMap { case Envelope(contentsType, contents, shrineVersion) if contentsType == AggregatedRunQueryResponse.getClass.getSimpleName => { - AggregatedRunQueryResponse.fromXmlString(breakdownTypes)(contents) + AggregatedRunQueryResponse.fromXmlString(breakdownTypes)(contents).flatMap{ rqr => + QepQueryDb.db.insertQueryResult(rqr.queryId, rqr.results.head) + Log.debug(s"Inserted result from ${rqr.results.head.description} for query ${rqr.queryId}") + Success(unit) + } } - case _ => Failure(new IllegalArgumentException("Not an expected type of message from this queue")) //todo better exception - }.transform({ rqr => - QepQueryDb.db.insertQueryResult(rqr.queryId, rqr.results.head) - Log.debug(s"Inserted result from ${rqr.results.head.description} for query ${rqr.queryId}") + case Envelope(contentsType, contents, shrineVersion) if contentsType == IncrementalQueryResult.incrementalQueryResultsEnvelopeContentsType => { + val changeDate = System.currentTimeMillis() + IncrementalQueryResult.seqFromJson(contents).flatMap { iqrs: Seq[IncrementalQueryResult] => + val rows = iqrs.map(iqr => QueryResultRow( + resultId = -1L, + networkQueryId = iqr.networkQueryId, + instanceId = -1L, + adapterNode = iqr.adapterNodeName, + resultType = None, + size = 0L, + startDate = None, + endDate = None, + status = QueryResult.StatusType.valueOf(iqr.statusTypeName).get, + statusMessage = Some(iqr.statusMessage), + changeDate = changeDate + )) + + QepQueryDb.db.insertQueryResultRows(rows) + Log.debug(s"Inserted incremental results $iqrs") + Success(unit) + } + } + case _ => Failure(new IllegalArgumentException(s"Not an expected type of message from this queue: $contents")) //todo better exception + }.transform({ s => message.complete() Success(unit) },{ x => x match { case NonFatal(nfx) => QepReceiverCouldNotDecodeMessage(contents,queue,x) - case _ => //pass through + case x => throw x//blow something up } + message.complete() //complete anyway. Can't be interpreted, so we don't want to see it again Failure(x) }) } def createQueue(nodeName:String):Queue = { //Either come back with the right exception to try again, or a Queue def tryToCreateQueue():Try[Queue] = MessageQueueService.service.createQueueIfAbsent(nodeName) def keepGoing(attempt:Try[Queue]):Try[Boolean] = attempt.transform({queue => Success(false)}, { case okIsh: CouldNotCreateQueueButOKToRetryException => Success(true) case x => Failure(x) }) //todo for fun figure out how to do this without the var. maybe a Stream ? SHRINE-2211 var lastAttempt:Try[Queue] = tryToCreateQueue() while(keepGoing(lastAttempt).get) { Log.debug(s"Last attempt to create a queue resulted in $lastAttempt. Sleeping $pollDuration before next attempt") Thread.sleep(pollDuration.toMillis) lastAttempt = tryToCreateQueue() } Log.info(s"Finishing createQueue with $lastAttempt") lastAttempt.get } } } case class ExceptionWhileReceivingMessage(queue:Queue, x:Throwable) extends AbstractProblem(ProblemSources.Qep) { override val throwable = Some(x) override def summary: String = s"The QEP encountered an exception while trying to receive a message from $queue" override def description: String = s"The QEP encountered an exception while trying to receive a message from $queue on ${Thread.currentThread().getName}: ${x.getMessage}" } case class QepReceiverCouldNotDecodeMessage(messageString:String,queue:Queue, x:Throwable) extends AbstractProblem(ProblemSources.Qep) { override val throwable = Some(x) override def summary: String = s"The QEP could not decode a message from $queue" override def description: String = s"""The QEP encountered an exception while trying to decode a message from $queue on ${Thread.currentThread().getName}: |${x.getMessage} |$messageString""".stripMargin } \ No newline at end of file diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala index 718a36536..dad8b51ad 100644 --- a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala +++ b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala @@ -1,387 +1,392 @@ package net.shrine.metadata import java.util.UUID import akka.actor.ActorSystem import net.shrine.audit.{NetworkQueryId, QueryName, Time} import net.shrine.authorization.steward.UserName +import net.shrine.config.ConfigExtensions import net.shrine.i2b2.protocol.pm.User import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources} -import net.shrine.protocol.ResultOutputType +import net.shrine.protocol.{QueryResult, ResultOutputType} import net.shrine.qep.querydb.{FullQueryResult, QepQuery, QepQueryBreakdownResultsRow, QepQueryDb, QepQueryDbChangeNotifier, QepQueryFlag} import net.shrine.source.ConfigSource -import net.shrine.config.ConfigExtensions import rapture.json._ import rapture.json.formatters.humanReadable import rapture.json.jsonBackends.jawn._ import spray.http.{StatusCode, StatusCodes} import spray.routing._ import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.{blocking, Promise} +import scala.concurrent.Promise import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Try import scala.util.control.NonFatal /** * An API to support the web client's work with queries. * * The current API supplies information about previous running queries. Eventually this will support accessing * information about queries running now and the ability to submit queries. */ //todo move this to the qep/service module, or somewhere in the qep subproject trait QepService extends HttpService with Loggable { def system: ActorSystem val qepQueryDbChangeNotifier = QepQueryDbChangeNotifier(system) val qepReceiver = QepReceiver //start the QepReceiver by bringing it into context val qepInfo = """ |The SHRINE query entry point service. | |This API gives a researcher access to queries, and (eventually) the ability to run queries. | """.stripMargin def qepRoute(user: User): Route = pathPrefix("qep") { get { detach(){ queryResult(user) ~ queryResultsTable(user) } } ~ pathEndOrSingleSlash{complete(qepInfo)} ~ respondWithStatus(StatusCodes.NotFound){complete(qepInfo)} } /* Races to complete are OK in spray. They're already happening, in fact. When a request comes in if the request can be fulfilled immediately then do that if not create a promise to fulfil to trigger the complete create a promise to bump that first one on timeout schedule a runnable to bump the timeout promise create a promise to bump that first one if the conditions are right create a promise to bump the conditional one and stuff it in a concurrent map for other parts of the system to find onSuccess remove the conditional promise and cancel the scheduled timeout. */ def queryResult(user:User):Route = path("queryResult" / LongNumber) { queryId: NetworkQueryId => //take optional parameters for version and an awaitTime, but insist on both //If the timeout parameter isn't supplied then the deadline is now so it will reply immediately parameters('afterVersion.as[Long] ? 0L, 'timeoutSeconds.as[Long] ? 0L) { (afterVersion: Long, timeoutSeconds: Long) => //check that the timeout is less than the spray "give up" timeout val sprayRequestTimeout = ConfigSource.config.get("spray.can.server.request-timeout",Duration(_)).toSeconds val maximumTimeout = sprayRequestTimeout - 1 if (maximumTimeout <= timeoutSeconds) warn(s"""spray.can.server.request-timeout $sprayRequestTimeout is too short |relative to timeoutSeconds $timeoutSeconds . The server may produce a timeout-related error. Using |$maximumTimeout instead of $timeoutSeconds to try to prevent that.""".stripMargin) val timeout = Seq(maximumTimeout,timeoutSeconds).min //times for local races. val requestStartTime = System.currentTimeMillis() val deadline = requestStartTime + (timeout * 1000) detach(){ val troubleOrResultsRow = selectResultsRow(queryId, user) if (shouldRespondNow(deadline, afterVersion, troubleOrResultsRow)) { //bypass all the concurrent/interrupt business. Just reply. completeWithQueryResult(queryId,troubleOrResultsRow) } else { debug(s"Creating promises to respond about $queryId with a version later than $afterVersion by $deadline ") // the Promise used to respond val okToRespond = Promise[Either[(StatusCode,String),ResultsRow]]() //Schedule the timeout val okToRespondTimeout = Promise[Unit]() okToRespondTimeout.future.transform({unit => okToRespond.tryComplete(Try(selectResultsRow(queryId, user))) },{x:Throwable => x match {case NonFatal(t) => ExceptionWhilePreparingTimeoutResponse(queryId,t)} x }) val timeLeft = (deadline - System.currentTimeMillis()) milliseconds case class TriggerRunnable(networkQueryId: NetworkQueryId,promise: Promise[Unit]) extends Runnable { val unit:Unit = () override def run(): Unit = promise.trySuccess(unit) } val timeoutCanceller = system.scheduler.scheduleOnce(timeLeft,TriggerRunnable(queryId,okToRespondTimeout)) //Set up for an interrupt from new data val okToRespondIfNewData = Promise[Unit]() okToRespondIfNewData.future.transform({unit => val latestResultsRow = selectResultsRow(queryId, user) if(shouldRespondNow(deadline,afterVersion,latestResultsRow)) { okToRespond.tryComplete(Try(selectResultsRow(queryId, user))) } },{x:Throwable => x match {case NonFatal(t) => ExceptionWhilePreparingTriggeredResponse(queryId,t)} x }) val requestId = UUID.randomUUID() //put id -> okToRespondIfNewData in a map so that outside processes can trigger it qepQueryDbChangeNotifier.putLongPollRequest(requestId,queryId,okToRespondIfNewData) onSuccess(okToRespond.future){ latestResultsRow:Either[(StatusCode,String),ResultsRow] => //clean up concurrent bits before responding qepQueryDbChangeNotifier.removeLongPollRequest(requestId) timeoutCanceller.cancel() completeWithQueryResult(queryId,latestResultsRow) } } } } } /** * @param deadline time when a response must go * @param afterVersion last timestamp the requester knows about * @param resultsRow either the result row or something is not right * @return true to respond now, false to dither */ def shouldRespondNow(deadline: Long, afterVersion: Long, resultsRow:Either[(StatusCode,String),ResultsRow] ):Boolean = { val currentTime = System.currentTimeMillis() if (currentTime >= deadline) true else resultsRow.fold( {_._1 != StatusCodes.NotFound}, {_.dataVersion > afterVersion} ) } def completeWithQueryResult(networkQueryId: NetworkQueryId,troubleOrResultsRow:Either[(StatusCode,String),ResultsRow]): Route = { debug(s"Responding to a request for $networkQueryId with $troubleOrResultsRow") troubleOrResultsRow.fold({ trouble => //something is wrong. Respond now. respondWithStatus(trouble._1) { complete(trouble._2) } }, { queryAndResults => //everything is fine. Respond now. val json: Json = Json(queryAndResults) val formattedJson: String = Json.format(json)(humanReadable()) complete(formattedJson) }) } def selectResultsRow(queryId:NetworkQueryId,user:User):Either[(StatusCode,String),ResultsRow] = { //query once and determine if the latest change > afterVersion val queryOption: Option[QepQuery] = QepQueryDb.db.selectQueryById(queryId) queryOption.map { query: QepQuery => if (user.sameUserAs(query.userName, query.userDomain)) { + + debug(s"Query from the database is $query") + val mostRecentQueryResults: Seq[Result] = QepQueryDb.db.selectMostRecentFullQueryResultsFor(queryId).map(Result(_)) val flag = QepQueryDb.db.selectMostRecentQepQueryFlagFor(queryId).map(QueryFlag(_)) val queryCell = QueryCell(query, flag) val queryAndResults = ResultsRow(queryCell, mostRecentQueryResults) + debug(s"queryAndResults is $queryAndResults") + Right(queryAndResults) } else Left((StatusCodes.Forbidden, s"Query $queryId belongs to a different user")) }.getOrElse(Left[(StatusCode, String), ResultsRow]((StatusCodes.NotFound, s"No query with id $queryId found"))) } def queryResultsTable(user: User): Route = path("queryResultsTable") { matchQueryParameters(Some(user.username)) { queryParameters: QueryParameters => val queryRowCount: Int = QepQueryDb.db.countPreviousQueriesByUserAndDomain( userName = user.username, domain = user.domain ) val queries: Seq[QepQuery] = QepQueryDb.db.selectPreviousQueriesByUserAndDomain( userName = user.username, domain = user.domain, skip = queryParameters.skipOption, limit = queryParameters.limitOption ) //todo revisit json structure to remove things the front-end doesn't use val adapters: Seq[String] = QepQueryDb.db.selectDistinctAdaptersWithResults val flags: Map[NetworkQueryId, QueryFlag] = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(queries.map(q => q.networkId).to[Set]) .map(q => q._1 -> QueryFlag(q._2)) val queryResults: Seq[ResultsRow] = queries.map(q => ResultsRow( query = QueryCell(q, flags.get(q.networkId)), results = QepQueryDb.db.selectMostRecentFullQueryResultsFor(q.networkId).map(Result(_)))) val table: ResultsTable = ResultsTable(queryRowCount, queryParameters.skipOption.getOrElse(0), adapters, queryResults) val jsonTable: Json = Json(table) val formattedTable: String = Json.format(jsonTable)(humanReadable()) complete(formattedTable) } } def matchQueryParameters(userName: Option[UserName])(parameterRoute: QueryParameters => Route): Route = { parameters('skip.as[Int].?, 'limit.as[Int].?) { (skipOption, limitOption) => val qp = QueryParameters( userName, skipOption, limitOption ) parameterRoute(qp) } } } //todo maybe move to QepQueryDb class case class QueryParameters( researcherIdOption:Option[UserName] = None, skipOption:Option[Int] = None, limitOption:Option[Int] = None ) case class ResultsTable( rowCount:Int, rowOffset:Int, adapters:Seq[String], //todo type for adapter name queryResults:Seq[ResultsRow] ) case class ResultsRow( query:QueryCell, results: Seq[Result], isComplete: Boolean, //a member variable to appear in json dataVersion:Long //a time stamp in 1.23, a counting integer in a future release ) object ResultsRow { def apply( query: QueryCell, results: Seq[Result] ): ResultsRow = { val isComplete = if (results.isEmpty) false else results.forall(_.isComplete) val dataVersion = (Seq(query.changeDate) ++ results.map(_.changeDate)).max //the latest change date ResultsRow(query, results, isComplete, dataVersion) } } case class QueryCell( networkId:String, //easier to support in json, lessens the impact of using a GUID iff we can get there queryName: QueryName, dateCreated: Time, queryXml: String, changeDate: Time, flag:Option[QueryFlag] ) object QueryCell { def apply(qepQuery: QepQuery,flag: Option[QueryFlag]): QueryCell = QueryCell( networkId = qepQuery.networkId.toString, queryName = qepQuery.queryName, dateCreated = qepQuery.dateCreated, queryXml = qepQuery.queryXml, changeDate = qepQuery.changeDate, flag ) } case class QueryFlag( flagged:Boolean, flagMessage:String, changeDate:Long ) object QueryFlag{ def apply(qepQueryFlag: QepQueryFlag): QueryFlag = QueryFlag(qepQueryFlag.flagged, qepQueryFlag.flagMessage, qepQueryFlag.changeDate) } case class Result ( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:Option[ResultOutputType], count:Long, status:String, //todo QueryResult.StatusType, statusMessage:Option[String], changeDate:Long, breakdowns: Seq[BreakdownResultsForType], problemDigest:Option[ProblemDigestForJson] ) { - def isComplete = true //todo until I get to SHRINE-2148 + def isComplete = QueryResult.StatusType.valueOf(status).get.isCrcCallCompleted } object Result { def apply(fullQueryResult: FullQueryResult): Result = new Result( resultId = fullQueryResult.resultId, networkQueryId = fullQueryResult.networkQueryId, instanceId = fullQueryResult.instanceId, adapterNode = fullQueryResult.adapterNode, resultType = fullQueryResult.resultType, count = fullQueryResult.count, status = fullQueryResult.status.toString, statusMessage = fullQueryResult.statusMessage, changeDate = fullQueryResult.changeDate, breakdowns = fullQueryResult.breakdownTypeToResults.map(tToR => BreakdownResultsForType(fullQueryResult.adapterNode,tToR._1,tToR._2)).to[Seq], problemDigest = fullQueryResult.problemDigest.map(ProblemDigestForJson(_)) ) } //todo replace when you figure out how to json-ize xml in rapture case class ProblemDigestForJson(codec: String, stampText: String, summary: String, description: String, detailsString: String, epoch: Long) object ProblemDigestForJson { def apply(problemDigest: ProblemDigest): ProblemDigestForJson = ProblemDigestForJson( problemDigest.codec, problemDigest.stampText, problemDigest.summary, problemDigest.description, problemDigest.detailsXml.text, problemDigest.epoch) } case class BreakdownResultsForType(resultType:ResultOutputType,results:Seq[BreakdownResult]) object BreakdownResultsForType { def apply(adapterName: String, breakdownType: ResultOutputType, breakdowns: Seq[QepQueryBreakdownResultsRow]): BreakdownResultsForType = { val breakdownResults = breakdowns.filter(_.adapterNode == adapterName).map(row => BreakdownResult(row.dataKey,row.value,row.changeDate)) BreakdownResultsForType(breakdownType,breakdownResults) } } case class BreakdownResult(dataKey:String,value:Long,changeDate:Long) case class ExceptionWhilePreparingTriggeredResponse(networkQueryId: NetworkQueryId,x:Throwable) extends AbstractProblem(ProblemSources.Qep) { override def throwable = Some(x) override def summary: String = "Unable to prepare a triggered response due to an exception." override def description: String = s"Unable to prepare a promised response for query $networkQueryId due to a ${x.getClass.getSimpleName}" } case class ExceptionWhilePreparingTimeoutResponse(networkQueryId: NetworkQueryId,x:Throwable) extends AbstractProblem(ProblemSources.Qep) { override def throwable = Some(x) override def summary: String = "Unable to prepare a triggered response due to an exception." override def description: String = s"Unable to prepare a promised response for $networkQueryId due to a ${x.getClass.getSimpleName}" } \ No newline at end of file diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala index 9260fa607..d19b084ce 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala @@ -1,401 +1,409 @@ package net.shrine.protocol import javax.xml.datatype.XMLGregorianCalendar import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, Problem, ProblemDigest, ProblemSources} import net.shrine.protocol.QueryResult.StatusType import scala.xml.NodeSeq import net.shrine.util.{NodeSeqEnrichments, OptionEnrichments, SEnum, Tries, XmlDateHelper, XmlUtil} import net.shrine.serialization.{I2b2Marshaller, XmlMarshaller} import scala.util.Try /** * @author Bill Simons * @since 4/15/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html * * NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing */ final case class QueryResult ( resultId: Long, instanceId: Long, resultType: Option[ResultOutputType], setSize: Long, startDate: Option[XMLGregorianCalendar], endDate: Option[XMLGregorianCalendar], description: Option[String], statusType: StatusType, statusMessage: Option[String], problemDigest: Option[ProblemDigest] = None, breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty ) extends XmlMarshaller with I2b2Marshaller with Loggable { //only used in tests def this( resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, statusType: QueryResult.StatusType) = { this( resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), None, //description statusType, None) //statusMessage } def this( resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, description: String, statusType: QueryResult.StatusType) = { this( resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), Option(description), statusType, None) //statusMessage } def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match { case Some(rt) => rt == testedResultType case _ => false } import QueryResult._ //NB: Fragile, non-type-safe == def isError = statusType == StatusType.Error def elapsed: Option[Long] = { def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis for { start <- startDate end <- endDate } yield inMillis(end) - inMillis(start) } //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = { breakdowns.values.toSeq.sortBy(_.resultType.name) } override def toI2b2: NodeSeq = { import OptionEnrichments._ XmlUtil.stripWhitespace { { resultId } { instanceId } { description.toXml() } { resultType.fold( ResultOutputType.ERROR.toI2b2NameOnly("") ){ rt => if(rt.isBreakdown) rt.toI2b2NameOnly() else if (rt.isError) rt.toI2b2NameOnly() //The result type can be an error else if (statusType.isError) rt.toI2b2NameOnly() //Or the status type can be an error else rt.toI2b2 } } { setSize } { startDate.toXml() } { endDate.toXml() } { statusType } { statusType.toI2b2(this) } { //NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case, //so if we're going to do that, let's produce saner XML. sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data")) } } } override def toXml: NodeSeq = XmlUtil.stripWhitespace { import OptionEnrichments._ { resultId } { instanceId } { resultType.toXml(_.toXml) } { setSize } { startDate.toXml() } { endDate.toXml() } { description.toXml() } { statusType } { statusMessage.toXml() } { //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. sortedBreakdowns.map(_.toXml) } { problemDigest.map(_.toXml).getOrElse("") } } def withId(id: Long): QueryResult = copy(resultId = id) def withInstanceId(id: Long): QueryResult = copy(instanceId = id) def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize)) def withSetSize(size: Long): QueryResult = copy(setSize = size) def withDescription(desc: String): QueryResult = copy(description = Option(desc)) def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType)) def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData)) def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns) } object QueryResult { //todo add two new Statuses for the intermediate observations at the hub final case class StatusType( name: String, isDone: Boolean, i2b2Id: Option[Int] = Some(-1), - private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2) extends StatusType.Value { + private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2, + lifeCycle:Int = Int.MaxValue //MaxValue indicates that the CRC is done with the request. Greater integers are further along in the lifecycle + ) extends StatusType.Value { def isError = this == StatusType.Error def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult) + + def isCrcCallCompleted = lifeCycle == Int.MaxValue } object StatusType extends SEnum[StatusType] { private val defaultToI2b2: QueryResult => NodeSeq = { queryResult => val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{ throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id") } { i2b2Id }{ queryResult.statusType.name } } val noMessage:NodeSeq = null val Error = StatusType("ERROR", isDone = true, None, { queryResult => (queryResult.statusMessage, queryResult.problemDigest) match { case (Some(msg),Some(pd)) => { if(msg != "ERROR") msg else pd.summary } ++ pd.toXml case (Some(msg),None) => { msg } case (None,Some(pd)) => { pd.summary } ++ pd.toXml case (None, None) => noMessage } }) val Finished = StatusType("FINISHED", isDone = true, Some(3)) //TODO: Can we use the same for Queued, Processing, and Incomplete? val Processing = StatusType("PROCESSING", isDone = false, Some(2)) //todo only used in tests val Queued = StatusType("QUEUED", isDone = false, Some(2)) val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2)) //TODO: What s should these have? Does anyone care? val Held = StatusType("HELD", isDone = false) val SmallQueue = StatusType("SMALL_QUEUE", isDone = false) val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false) val LargeQueue = StatusType("LARGE_QUEUE", isDone = false) val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false) + + //SHRINE's internal states as reported by the hub + val HubWillSubmit = StatusType("HUB_WILL_SUBMIT",isDone = false,lifeCycle = 10) + val HubSubmittedRequest = StatusType("HUB_SUBMITTED_REQUEST",isDone = false,lifeCycle = 20) } def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _) def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = { import ResultOutputType.valueOf val typeName = asText(elemNames: _*)(xml) valueOf(typeName) orElse valueOf(breakdownTypes)(typeName) } def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = { val attempt = parse(xml) attempt.toOption } def extractProblemDigest(xml: NodeSeq):Option[ProblemDigest] = { val subXml = xml \ "problem" if(subXml.nonEmpty) Some(ProblemDigest.fromXml(xml)) else None } def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def extract(elemName: String): Option[String] = { Option((xml \ elemName).text.trim).filter(!_.isEmpty) } def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate) val asLong = extractLong(xml) _ import NodeSeqEnrichments.Strictness._ import Tries.sequence def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = { //noinspection ScalaUnnecessaryParentheses val mapAttempt = for { subXml <- xml.withChild(elemName) envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes))) mappings = envelopes.map(envelope => (envelope.resultType -> envelope)) } yield Map.empty ++ mappings mapAttempt.getOrElse(Map.empty) } QueryResult( resultId = asLong("resultId"), instanceId = asLong("instanceId"), resultType = extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml), setSize = asLong("setSize"), startDate = extractDate("startDate"), endDate = extractDate("endDate"), description = extract("description"), statusType = StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call statusMessage = extract("statusMessage"), problemDigest = extractProblemDigest(xml), breakdowns = extractBreakdowns("resultEnvelope") ) } def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def asLong = extractLong(xml) _ def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim) def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate) val statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get //TODO: Avoid fragile .get call val statusMessage: Option[String] = asTextOption("query_status_type", "description") val encodedProblemDigest = extractProblemDigest(xml \ "query_status_type") val problemDigest = if (encodedProblemDigest.isDefined) encodedProblemDigest else if (statusType.isError) Some(ErrorStatusFromCrc(statusMessage,xml.text).toDigest) else None case class Filling( resultType:Option[ResultOutputType], setSize:Long, startDate:Option[XMLGregorianCalendar], endDate:Option[XMLGregorianCalendar] ) val filling = if(!statusType.isError) { val resultType: Option[ResultOutputType] = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2) val setSize = asLong("set_size") val startDate = asXmlGcOption("start_date") val endDate = asXmlGcOption("end_date") Filling(resultType,setSize,startDate,endDate) } else { val resultType = None val setSize = 0L val startDate = None val endDate = None Filling(resultType,setSize,startDate,endDate) } QueryResult( resultId = asLong("result_instance_id"), instanceId = asLong("query_instance_id"), resultType = filling.resultType, setSize = filling.setSize, startDate = filling.startDate, endDate = filling.endDate, description = asTextOption("description"), statusType = statusType, statusMessage = statusMessage, problemDigest = problemDigest ) } def errorResult(description: Option[String], statusMessage: String,problemDigest:ProblemDigest):QueryResult = { QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } def errorResult(description: Option[String], statusMessage: String,problem:Problem):QueryResult = { val problemDigest = problem.toDigest QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } /** * For reconstituting errorResults from a database */ def errorResult(description:Option[String], statusMessage:String, codec:String,stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq): QueryResult = { // This would require parsing the stamp text to change, and without a standard locale that's nigh impossible. // If this is replaced with real problems, then this can be addressed then. For now, passing on zero is the best bet. val problemDigest = ProblemDigest(codec,stampText,summary,digestDescription,detailsXml,0) QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } } case class ErrorStatusFromCrc(messageFromCrC:Option[String], xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) { override val summary: String = "The I2B2 CRC reported an internal error." override val description:String = s"The I2B2 CRC responded with status type ERROR ${messageFromCrC.fold(" but no message")(message => s"and a message of '$message'")}" override val detailsXml =

CRC's Response is {xmlResponseFromCrc}
} diff --git a/commons/protocol/src/main/scala/net/shrine/status/protocol/IncrementalQueryResult.scala b/commons/protocol/src/main/scala/net/shrine/status/protocol/IncrementalQueryResult.scala new file mode 100644 index 000000000..e52148b3a --- /dev/null +++ b/commons/protocol/src/main/scala/net/shrine/status/protocol/IncrementalQueryResult.scala @@ -0,0 +1,43 @@ +package net.shrine.status.protocol + +import net.shrine.audit.NetworkQueryId +import org.json4s.ShortTypeHints +import org.json4s.native.Serialization + +import scala.util.Try + +/** + * Carrier for incremental query result progress for SHRINE-1.23. Should not be considered a model for future work. + * + * @author david + * @since 9/8/17 + */ +case class IncrementalQueryResult( + networkQueryId:NetworkQueryId, + adapterNodeName:String, + statusTypeName:String, + statusMessage:String + ) { + + def toJson:String = { + Serialization.write(this)(IncrementalQueryResult.formats) + } + +} + +object IncrementalQueryResult { + val formats = Serialization.formats(ShortTypeHints(List(classOf[IncrementalQueryResult]))) + + val incrementalQueryResultsEnvelopeContentsType = s"Seq of ${classOf[IncrementalQueryResult].getSimpleName}s" + + def seqToJson(incrementalQueryResults: Seq[IncrementalQueryResult]):String = { + Serialization.write(incrementalQueryResults)(formats) + } + + def seqFromJson(jsonString:String):Try[Seq[IncrementalQueryResult]] = Try{ + implicit val formats = IncrementalQueryResult.formats + Serialization.read[Seq[IncrementalQueryResult]](jsonString) + } + + +} \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala index 3908c0ccf..b9f511bd8 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/AdapterClientBroadcaster.scala @@ -1,147 +1,169 @@ package net.shrine.broadcaster import net.shrine.adapter.client.{AdapterClient, RemoteAdapterClient} import net.shrine.aggregation.RunQueryAggregator import net.shrine.audit.NetworkQueryId import net.shrine.broadcaster.dao.HubDao import net.shrine.client.TimeoutException import net.shrine.log.Loggable import net.shrine.messagequeueservice.MessageQueueService import net.shrine.messagequeueservice.protocol.Envelope import net.shrine.problem.{AbstractProblem, ProblemSources} -import net.shrine.protocol.{AggregatedRunQueryResponse, BaseShrineResponse, BroadcastMessage, FailureResult, RunQueryRequest, SingleNodeResult, Timeout} +import net.shrine.protocol.{AggregatedRunQueryResponse, BaseShrineResponse, BroadcastMessage, FailureResult, QueryResult, RunQueryRequest, SingleNodeResult, Timeout} +import net.shrine.status.protocol.IncrementalQueryResult import scala.concurrent.Future import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} /** * @author clint * @since Nov 15, 2013 */ final case class AdapterClientBroadcaster(destinations: Set[NodeHandle], dao: HubDao) extends Broadcaster with Loggable { logStartup() import scala.concurrent.ExecutionContext.Implicits.global override def broadcast(message: BroadcastMessage): Multiplexer = { logOutboundIfNecessary(message) + //send back json containing just enough to fill a QueryResultRow + message.request match { + case runQueryRequest: RunQueryRequest => + debug(s"RunQueryRequest's nodeId is ${runQueryRequest.nodeId}") + runQueryRequest.nodeId.fold { + error(s"Did not send to queue because nodeId is None") + } { nodeId => { + val hubWillSubmitStatuses = destinations.map { nodeHandle => + IncrementalQueryResult( + runQueryRequest.networkQueryId, + nodeHandle.nodeId.name, + QueryResult.StatusType.HubWillSubmit.name, + s"The hub is about to submit query ${runQueryRequest.networkQueryId} to ${nodeHandle.nodeId.name}" + ) + }.to[Seq] + val envelope = Envelope(IncrementalQueryResult.incrementalQueryResultsEnvelopeContentsType,IncrementalQueryResult.seqToJson(hubWillSubmitStatuses)) + sendToQep(envelope,nodeId.name,s"Status update - the hub will submit the query to the adapters for ${runQueryRequest.networkQueryId}") + } + } + case _ => //don't care + } + val multiplexer: Multiplexer = new BufferingMultiplexer(destinations.map(_.nodeId)) for { nodeHandle <- destinations shrineResponse: SingleNodeResult <- callAdapter(message, nodeHandle) } { try { message.request match { case runQueryRequest:RunQueryRequest => debug(s"RunQueryRequest's nodeId is ${runQueryRequest.nodeId}") runQueryRequest.nodeId.fold{ error(s"Did not send to queue because nodeId is None") }{ nodeId => // make an AggregateRunQueryResponse from the SingleNodeResult val aggregator = new RunQueryAggregator( //to convert the SingleNodeResult into an AggregateRunQueryResponse runQueryRequest.networkQueryId, runQueryRequest.authn.username, runQueryRequest.authn.domain, runQueryRequest.queryDefinition, false ) val response: BaseShrineResponse = aggregator.aggregate(Seq(shrineResponse),Seq.empty,message) response match { - case runQueryResponse:AggregatedRunQueryResponse => sendToQep(runQueryResponse,nodeId.name) + case runQueryResponse:AggregatedRunQueryResponse => + val envelope = Envelope(AggregatedRunQueryResponse.getClass.getSimpleName,runQueryResponse.toXmlString) + sendToQep(envelope,nodeId.name,s"Result from ${runQueryResponse.results.head.description.get}") case _ => error(s"response is not a AggregatedRunQueryResponse. It is ${response.toString}") } } case _ => debug(s"Not a RunQueryRequest but a ${message.request.getClass.getSimpleName}.") } multiplexer.processResponse(shrineResponse) } finally { logResultsIfNecessary(message, shrineResponse) } } multiplexer } - private def sendToQep(runQueryResponse: AggregatedRunQueryResponse,queueName:String):Unit = { - val envelope = Envelope(AggregatedRunQueryResponse.getClass.getSimpleName,runQueryResponse.toXmlString) + private def sendToQep(envelope: Envelope,queueName:String,logDescription:String):Unit = { val s: Try[Unit] = for { queue <- MessageQueueService.service.createQueueIfAbsent(queueName) sent <- MessageQueueService.service.send(envelope.toJson, queue) } yield sent s.transform({itWorked => - debug(s"Result from ${runQueryResponse.results.head.description.get} sent to queue") + debug(s"$logDescription sent to queue") Success(itWorked) },{throwable: Throwable => throwable match { - case NonFatal(x) =>ExceptionWhileSendingMessage(runQueryResponse.queryId,queueName,x) + case NonFatal(x) =>ExceptionWhileSendingMessage(logDescription,queueName,x) case _ => //no op } Failure(throwable) }) } private[broadcaster] def callAdapter(message: BroadcastMessage, nodeHandle: NodeHandle): Future[SingleNodeResult] = { val NodeHandle(nodeId, client) = nodeHandle - //todo more status for SHRINE-2122 - // todo send back json containing just enough to fill a QueryResultRow // may need to do SHRINE-2177 to make this work client.query(message).recover { case e: TimeoutException => error(s"Broadcasting to $nodeId timed out") Timeout(nodeId) case NonFatal(e) => error(s"Broadcasting to $nodeId failed with ", e) FailureResult(nodeId, e) } //todo more status for SHRINE-2123 // todo send back json containing just enough to fill a QueryResultRow } private[broadcaster] def logResultsIfNecessary(message: BroadcastMessage, result: SingleNodeResult): Unit = logIfNecessary(message) { _ => dao.logQueryResult(message.requestId, result) } private[broadcaster] def logOutboundIfNecessary(message: BroadcastMessage): Unit = logIfNecessary(message) { runQueryReq => dao.logOutboundQuery(message.requestId, message.networkAuthn, runQueryReq.queryDefinition) } private[broadcaster] def logIfNecessary(message: BroadcastMessage)(f: RunQueryRequest => Any): Unit = { message.request match { case runQueryReq: RunQueryRequest => f(runQueryReq) case _ => () } } private def logStartup(): Unit = { def clientToString(client: AdapterClient): String = client match { case r: RemoteAdapterClient => r.poster.url.toString case _ => "" } info(s"Initialized ${getClass.getSimpleName}, will broadcast to the following destinations:") destinations.toSeq.sortBy(_.nodeId.name).foreach { handle => info(s" ${handle.nodeId}: ${clientToString(handle.client)}") } } } -case class ExceptionWhileSendingMessage(networkQueryId: NetworkQueryId,queueName:String, x:Throwable) extends AbstractProblem(ProblemSources.Hub) { +case class ExceptionWhileSendingMessage(logDescription:String,queueName:String, x:Throwable) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(x) override def summary: String = s"The Hub encountered an exception while trying to send a message to $queueName" - override def description: String = s"The Hub encountered an exception while trying to send a message about $networkQueryId from $queueName : ${x.getMessage}" + override def description: String = s"The Hub encountered an exception while trying to send a message about $logDescription from $queueName : ${x.getMessage}" } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala index c26a95d88..59b4392dc 100644 --- a/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala +++ b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala @@ -1,610 +1,619 @@ package net.shrine.qep.querydb import java.sql.SQLException import java.util.concurrent.TimeoutException import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName} import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources} import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, FlagQueryRequest, I2b2ResultEnvelope, QueryMaster, QueryResult, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RenameQueryRequest, ResultOutputType, ResultOutputTypes, RunQueryRequest, UnFlagQueryRequest} import net.shrine.slick.{CouldNotRunDbIoActionException, TestableDataSourceCreator, TimeoutInDbIoActionException} import net.shrine.source.ConfigSource import net.shrine.util.XmlDateHelper import slick.driver.JdbcProfile import scala.collection.immutable.Iterable import scala.concurrent.duration.{Duration, DurationInt} import scala.concurrent.{Await, Future, blocking} import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.util.control.NonFatal import scala.xml.XML /** * DB code for the QEP's query instances and query results. * * @author david * @since 1/19/16 */ case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource,timeout:Duration) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) try { Await.result(future, timeout) } catch { case tx:TimeoutException => throw TimeoutInDbIoActionException(dataSource, timeout, tx) case NonFatal(x) => throw CouldNotRunDbIoActionException(dataSource,x) } } def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = { debug(s"insertQepQuery $runQueryRequest") insertQepQuery(QepQuery(runQueryRequest)) } def insertQepQuery(qepQuery: QepQuery):Unit = { dbRun(allQepQueryQuery += qepQuery) } def selectAllQepQueries:Seq[QepQuery] = { dbRun(mostRecentVisibleQepQueries.result) } def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = { val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain( request.authn.username, request.authn.domain, None, Some(request.fetchSize)) val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set]) val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId))) ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2))) } def countPreviousQueriesByUserAndDomain(userName: UserName, domain: String):Int = { val q = mostRecentVisibleQepQueries.filter(r => r.userName === userName && r.userDomain === domain) dbRun(q.size.result) } def selectQueryById(networkQueryId: NetworkQueryId): Option[QepQuery] = dbRun(mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result).lastOption def selectPreviousQueriesByUserAndDomain(userName: UserName, domain: String, skip:Option[Int] = None, limit:Option[Int] = None):Seq[QepQuery] = { debug(s"start selectPreviousQueriesByUserAndDomain $userName $domain") val q = mostRecentVisibleQepQueries.filter(r => r.userName === userName && r.userDomain === domain).sortBy(x => x.changeDate.desc) val qWithSkip = skip.fold(q)(q.drop) val qWithLimit = limit.fold(qWithSkip)(qWithSkip.take) val result = dbRun(qWithLimit.result) debug(s"finished selectPreviousQueriesByUserAndDomain with $result") result } def renamePreviousQuery(request:RenameQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName,changeDate = System.currentTimeMillis())) } yield queryResults ) } def markDeleted(request:DeleteQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true,changeDate = System.currentTimeMillis())) } yield queryResults ) } def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(flagQueryRequest)) } def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(unflagQueryRequest)) } def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = { dbRun(allQepQueryFlags += qepQueryFlag) } def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = { val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result) flags.map(x => x.networkQueryId -> x).toMap } def selectMostRecentQepQueryFlagFor(networkQueryId: NetworkQueryId): Option[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId === networkQueryId).result).lastOption def insertQepResultRow(qepQueryRow:QueryResultRow) = { dbRun(allQueryResultRows += qepQueryRow) } def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = { val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node")) val queryResultRow = QueryResultRow(networkQueryId,result) val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_)) val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq] dbRun( for { _ <- allQueryResultRows += queryResultRow _ <- allBreakdownResultsRows ++= breakdowns _ <- allProblemDigestRows ++= problem } yield () ) } -//todo only used in tests. Is that OK? + def insertQueryResultRow(queryResultRow: QueryResultRow) = { + dbRun(allQueryResultRows += queryResultRow) + } + + def insertQueryResultRows(queryResultRows: Seq[QueryResultRow]) = { + dbRun(allQueryResultRows ++= queryResultRows) + } + + //todo only used in tests. Is that OK? def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = { dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result) } def selectMostRecentFullQueryResultsFor(networkId:NetworkQueryId): Seq[FullQueryResult] = { val (queryResults, breakdowns,problems) = dbRun( for { queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result breakdowns: Seq[QepQueryBreakdownResultsRow] <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result } yield (queryResults, breakdowns, problems) ) val breakdownTypeToResults: Map[ResultOutputType, Seq[QepQueryBreakdownResultsRow]] = breakdowns.groupBy(_.resultType) def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = { if(problemSeq.size == 1) problemSeq.head.toProblemDigest else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq") } val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) ) queryResults.map(r => FullQueryResult( r, breakdownTypeToResults, adapterNodesToProblemDigests.get(r.adapterNode) )) } def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = { val fullQueryResults = selectMostRecentFullQueryResultsFor(networkId) fullQueryResults.map(_.toQueryResult) } def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = { dbRun(allBreakdownResultsRows += breakdownResultsRow) } def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = { dbRun(allBreakdownResultsRows.result) } def selectDistinctAdaptersWithResults:Seq[String] = { dbRun(allQueryResultRows.map(_.adapterNode).distinct.result).sorted } } object QepQueryDb extends Loggable { val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config) val timeout = QepQuerySchema.config.getInt("timeout") seconds val db = QepQueryDb(QepQuerySchema.schema,dataSource,timeout) val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) QepQueryDb.db.createTables() } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepQuerySchema(jdbcProfile: JdbcProfile,moreBreakdowns: Set[ResultOutputType]) extends Loggable { import jdbcProfile.api._ def ddlForAllTables: jdbcProfile.DDL = { allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema } //to get the schema, use the REPL //println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") { def networkId = column[NetworkQueryId]("networkId") def userName = column[UserName]("userName") def userDomain = column[String]("domain") def queryName = column[QueryName]("queryName") def expression = column[Option[String]]("expression") def dateCreated = column[Time]("dateCreated") def deleted = column[Boolean]("deleted") def queryXml = column[String]("queryXml") def changeDate = column[Long]("changeDate") def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply) } val allQepQueryQuery = TableQuery[QepQueries] val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for( queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists ) yield queries val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false) class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") { def networkId = column[NetworkQueryId]("networkId") def flagged = column[Boolean]("flagged") def flagMessage = column[String]("flagMessage") def changeDate = column[Long]("changeDate") def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply) } val allQepQueryFlags = TableQuery[QepQueryFlags] val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for( queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists ) yield queryFlags val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values ++ moreBreakdowns val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap) implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({ (resultType: ResultOutputType) => queryResultTypesToString(resultType) },{ (string: String) => stringsToQueryResultTypes(string) }) implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({ statusType => statusType.name },{ name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}")) }) class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") { def resultId = column[Long]("resultId") def networkQueryId = column[NetworkQueryId]("networkQueryId") def instanceId = column[Long]("instanceId") def adapterNode = column[String]("adapterNode") def resultType = column[Option[ResultOutputType]]("resultType") def size = column[Long]("size") def startDate = column[Option[Long]]("startDate") def endDate = column[Option[Long]]("endDate") def status = column[QueryResult.StatusType]("status") def statusMessage = column[Option[String]]("statusMessage") def changeDate = column[Long]("changeDate") def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply) } val allQueryResultRows = TableQuery[QepQueryResults] //Most recent query result rows for each queryId from each adapter + //todo check result lifecycles for SHRINE-2122 val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for( queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists ) yield queryResultRows class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def resultId = column[Long]("resultId") def resultType = column[ResultOutputType]("resultType") def dataKey = column[String]("dataKey") def value = column[Long]("value") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply) } val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults] //Most recent query result rows for each queryId from each adapter val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for( breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists ) yield breakdownResultsRows /* case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller { */ class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def codec = column[String]("codec") def stamp = column[String]("stamp") def summary = column[String]("summary") def description = column[String]("description") def details = column[String]("details") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply) } val allProblemDigestRows = TableQuery[QepResultProblemDigests] val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for( problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists ) yield problemDigests } object QepQuerySchema { val allConfig:Config = ConfigSource.config val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database") val slickProfile:JdbcProfile = ConfigSource.getObject("slickProfileClassName", config) import net.shrine.config.ConfigExtensions val moreBreakdowns: Set[ResultOutputType] = config.getOptionConfigured("breakdownResultOutputTypes",ResultOutputTypes.fromConfig).getOrElse(Set.empty) val schema = QepQuerySchema(slickProfile,moreBreakdowns) } case class QepQuery( networkId:NetworkQueryId, userName: UserName, userDomain: String, queryName: QueryName, expression: Option[String], dateCreated: Time, deleted: Boolean, queryXml: String, changeDate: Time ){ def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = { QueryMaster( queryMasterId = networkId.toString, networkQueryId = networkId, name = queryName, userId = userName, groupId = userDomain, createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated), flagged = qepQueryFlag.map(_.flagged), flagMessage = qepQueryFlag.map(_.flagMessage) ) } } object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,Option[String],Time,Boolean,String,Time) => QepQuery) { def apply(runQueryRequest: RunQueryRequest):QepQuery = { new QepQuery( networkId = runQueryRequest.networkQueryId, userName = runQueryRequest.authn.username, userDomain = runQueryRequest.authn.domain, queryName = runQueryRequest.queryDefinition.name, expression = runQueryRequest.queryDefinition.expr.map(_.toString), dateCreated = System.currentTimeMillis(), deleted = false, queryXml = runQueryRequest.toXmlString, changeDate = System.currentTimeMillis() ) } } case class QepQueryFlag( networkQueryId: NetworkQueryId, flagged:Boolean, flagMessage:String, changeDate:Long ) object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) { def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = flagQueryRequest.networkQueryId, flagged = true, flagMessage = flagQueryRequest.message.getOrElse(""), changeDate = System.currentTimeMillis() ) } def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = unflagQueryRequest.networkQueryId, flagged = false, flagMessage = "", changeDate = System.currentTimeMillis() ) } } //todo replace with a class per state case class FullQueryResult( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:Option[ResultOutputType], count:Long, startDate:Option[Long], endDate:Option[Long], status:QueryResult.StatusType, statusMessage:Option[String], changeDate:Long, breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]], problemDigest:Option[ProblemDigest] ) { def toQueryResult = { def resultEnvelopesFrom(breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]]): Map[ResultOutputType, I2b2ResultEnvelope] = { def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = { val data = breakdowns.map(b => b.dataKey -> b.value).toMap I2b2ResultEnvelope(resultType,data) } breakdownTypeToResults.map(r => r._1 -> resultEnvelopeFrom(r._1,r._2)) } QueryResult( resultId = resultId, instanceId = instanceId, resultType = resultType, setSize = count, startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar), endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar), description = Some(adapterNode), statusType = status, statusMessage = statusMessage, breakdowns = resultEnvelopesFrom(breakdownTypeToResults), problemDigest = problemDigest ) } } object FullQueryResult { def apply(row:QueryResultRow, breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]], problemDigest:Option[ProblemDigest]):FullQueryResult = { FullQueryResult(resultId = row.resultId, networkQueryId = row.networkQueryId, instanceId = row.instanceId, adapterNode = row.adapterNode, resultType = row.resultType, count = row.size, startDate = row.startDate, endDate = row.endDate, status = row.status, statusMessage = row.statusMessage, changeDate = row.changeDate, breakdownTypeToResults = breakdownTypeToResults, problemDigest = problemDigest ) } } case class QueryResultRow( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:Option[ResultOutputType], size:Long, startDate:Option[Long], endDate:Option[Long], status:QueryResult.StatusType, statusMessage:Option[String], changeDate:Long ) { } object QueryResultRow extends ((Long,NetworkQueryId,Long,String,Option[ResultOutputType],Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow) { def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = { new QueryResultRow( resultId = result.resultId, networkQueryId = networkQueryId, instanceId = result.instanceId, adapterNode = result.description.getOrElse(s"$result has None in its description field, instead of the name of an adapter node."), resultType = result.resultType, size = result.setSize, startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis), endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis), status = result.statusType, statusMessage = result.statusMessage, changeDate = System.currentTimeMillis() ) } } case class QepQueryBreakdownResultsRow( networkQueryId: NetworkQueryId, adapterNode:String, resultId:Long, resultType: ResultOutputType, dataKey:String, value:Long, changeDate:Long ) object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){ def breakdownRowsFor(networkQueryId:NetworkQueryId, adapterNode:String, resultId:Long, breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = { breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis())) } } case class QepProblemDigestRow( networkQueryId: NetworkQueryId, adapterNode: String, codec: String, stampText: String, summary: String, description: String, details: String, changeDate:Long ){ def toProblemDigest = { ProblemDigest( codec, stampText, summary, description, if(!details.isEmpty) XML.loadString(details) else
, //TODO: FIGURE OUT HOW TO GET AN ACUTAL EPOCH INTO HERE 0 ) } } case class QepDatabaseProblem(x:Exception) extends AbstractProblem(ProblemSources.Qep){ override val summary = "A problem encountered while using a database." override val throwable = Some(x) override val description = x.getMessage } \ No newline at end of file