diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala
index b7b29628d..4faa9b542 100644
--- a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala
+++ b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala
@@ -1,206 +1,218 @@
package net.shrine.metadata
import net.shrine.audit.{NetworkQueryId, QueryName, Time}
import net.shrine.authorization.steward.UserName
import net.shrine.i2b2.protocol.pm.User
import net.shrine.log.Loggable
import net.shrine.problem.ProblemDigest
import net.shrine.protocol.ResultOutputType
-import net.shrine.qep.querydb.{FullQueryResult, QepQuery, QepQueryDb, QepQueryFlag}
+import net.shrine.qep.querydb.{FullQueryResult, QepQuery, QepQueryBreakdownResultsRow, QepQueryDb, QepQueryFlag}
import spray.routing._
import rapture.json._
import rapture.json.jsonBackends.jawn._
import rapture.json.formatters.humanReadable
import spray.http.StatusCodes
/**
* An API to support the web client's work with queries.
*
* The current API supplies information about previous running queries. Eventually this will support accessing
* information about queries running now and the ability to submit queries.
*/
//todo move this to the qep/service module
trait QepService extends HttpService with Loggable {
val qepInfo =
"""
|The SHRINE query entry point service.
|
|This API gives a researcher access to queries, and (eventually) the ability to run queries.
|
""".stripMargin
def qepRoute(user: User): Route = pathPrefix("qep") {
get {
queryResult(user) ~ queryResultsTable(user)
} ~
pathEndOrSingleSlash{complete(qepInfo)} ~
respondWithStatus(StatusCodes.NotFound){complete(qepInfo)}
}
def queryResult(user:User):Route = path("queryResult" / LongNumber){ queryId:NetworkQueryId =>
val queryOption: Option[QepQuery] = QepQueryDb.db.selectQueryById(queryId)
queryOption.fold{
respondWithStatus(StatusCodes.NotFound){complete(s"No query with id $queryId found")}
}{query:QepQuery =>
if(user.sameUserAs(query.userName,query.userDomain)) {
val mostRecentQueryResults: Seq[Result] = QepQueryDb.db.selectMostRecentFullQueryResultsFor(queryId).map(Result(_))
val flag = QepQueryDb.db.selectMostRecentQepQueryFlagFor(queryId).map(QueryFlag(_))
val queryCell = QueryCell(query,flag)
val queryAndResults = ResultsRow(queryCell,mostRecentQueryResults)
val json: Json = Json(queryAndResults)
val formattedJson: String = Json.format(json)(humanReadable())
complete(formattedJson)
} else {
respondWithStatus(StatusCodes.Forbidden){complete(s"Query $queryId belongs to a different user")}
}
}
}
def queryResultsTable(user: User): Route = path("queryResultsTable") {
matchQueryParameters(Some(user.username)){ queryParameters:QueryParameters =>
val queryRowCount: Int = QepQueryDb.db.countPreviousQueriesByUserAndDomain(
userName = user.username,
domain = user.domain
)
val queries: Seq[QepQuery] = QepQueryDb.db.selectPreviousQueriesByUserAndDomain(
userName = user.username,
domain = user.domain,
skip = queryParameters.skipOption,
limit = queryParameters.limitOption
)
//todo revisit json structure to remove things the front-end doesn't use
val adapters: Seq[String] = QepQueryDb.db.selectDistinctAdaptersWithResults
val flags: Map[NetworkQueryId, QueryFlag] = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(queries.map(q => q.networkId).to[Set])
.map(q => q._1 -> QueryFlag(q._2))
val queryResults: Seq[ResultsRow] = queries.map(q => ResultsRow(
query = QueryCell(q,flags.get(q.networkId)),
results = QepQueryDb.db.selectMostRecentFullQueryResultsFor(q.networkId).map(Result(_))))
val table: ResultsTable = ResultsTable(queryRowCount,queryParameters.skipOption.getOrElse(0),adapters,queryResults)
val jsonTable: Json = Json(table)
val formattedTable: String = Json.format(jsonTable)(humanReadable())
complete(formattedTable)
}
}
def matchQueryParameters(userName: Option[UserName])(parameterRoute: QueryParameters => Route): Route = {
parameters('skip.as[Int].?, 'limit.as[Int].?) { (skipOption, limitOption) =>
val qp = QueryParameters(
userName,
skipOption,
limitOption
)
parameterRoute(qp)
}
}
}
//todo maybe move to QepQueryDb class
case class QueryParameters(
researcherIdOption:Option[UserName] = None,
skipOption:Option[Int] = None,
limitOption:Option[Int] = None //todo deadline, maybe version, someday
)
case class ResultsTable(
rowCount:Int,
rowOffset:Int,
adapters:Seq[String], //todo type for adapter name
queryResults:Seq[ResultsRow]
)
case class ResultsRow(
query:QueryCell,
results: Seq[Result]
)
case class QueryCell(
networkId:String, //easier to support in json, lessens the impact of using a GUID iff we can get there
queryName: QueryName,
dateCreated: Time,
queryXml: String,
changeDate: Time,
flag:Option[QueryFlag]
)
object QueryCell {
def apply(qepQuery: QepQuery,flag: Option[QueryFlag]): QueryCell = QueryCell(
networkId = qepQuery.networkId.toString,
queryName = qepQuery.queryName,
dateCreated = qepQuery.dateCreated,
queryXml = qepQuery.queryXml,
changeDate = qepQuery.changeDate,
flag
)
}
case class QueryFlag(
flagged:Boolean,
flagMessage:String,
changeDate:Long
)
object QueryFlag{
def apply(qepQueryFlag: QepQueryFlag): QueryFlag = QueryFlag(qepQueryFlag.flagged, qepQueryFlag.flagMessage, qepQueryFlag.changeDate)
}
case class Result (
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:Option[ResultOutputType],
count:Long,
status:String, //todo QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long,
-// todo breakdowns:Option[Map[ResultOutputType,I2b2ResultEnvelope]]
+ breakdowns: Seq[BreakdownResultsForType],
problemDigest:Option[ProblemDigestForJson]
)
object Result {
- def apply(fullQueryResult: FullQueryResult): Result = Result(
+ def apply(fullQueryResult: FullQueryResult): Result = new Result(
resultId = fullQueryResult.resultId,
networkQueryId = fullQueryResult.networkQueryId,
instanceId = fullQueryResult.instanceId,
adapterNode = fullQueryResult.adapterNode,
resultType = fullQueryResult.resultType,
count = fullQueryResult.count,
status = fullQueryResult.status.toString,
statusMessage = fullQueryResult.statusMessage,
changeDate = fullQueryResult.changeDate,
-// breakdowns = fullQueryResult.breakdowns
+ breakdowns = fullQueryResult.breakdownTypeToResults.map(tToR => BreakdownResultsForType(fullQueryResult.adapterNode,tToR._1,tToR._2)).to[Seq],
problemDigest = fullQueryResult.problemDigest.map(ProblemDigestForJson(_))
)
}
//todo replace when you figure out how to json-ize xml in rapture
case class ProblemDigestForJson(codec: String,
stampText: String,
summary: String,
description: String,
detailsString: String,
epoch: Long)
object ProblemDigestForJson {
def apply(problemDigest: ProblemDigest): ProblemDigestForJson = ProblemDigestForJson(
problemDigest.codec,
problemDigest.stampText,
problemDigest.summary,
problemDigest.description,
problemDigest.detailsXml.text,
problemDigest.epoch)
-}
\ No newline at end of file
+}
+
+case class BreakdownResultsForType(name:String,displayName:String,description:String,results:Seq[BreakdownResult])
+
+object BreakdownResultsForType {
+ def apply(adapterName: String, breakdownType: ResultOutputType, breakdowns: Seq[QepQueryBreakdownResultsRow]): BreakdownResultsForType = {
+ val breakdownResults = breakdowns.filter(_.adapterNode == adapterName).map(row => BreakdownResult(row.dataKey,row.value,row.changeDate))
+
+ BreakdownResultsForType(breakdownType.name,breakdownType.i2b2Options.displayType,breakdownType.i2b2Options.description,breakdownResults)
+ }
+}
+
+case class BreakdownResult(dataKey:String,value:Long,changeDate:Long)
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/I2b2ResultEnvelope.scala b/commons/protocol/src/main/scala/net/shrine/protocol/I2b2ResultEnvelope.scala
index 218b1d63f..684eab78d 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/I2b2ResultEnvelope.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/I2b2ResultEnvelope.scala
@@ -1,137 +1,124 @@
package net.shrine.protocol
-import scala.xml.NodeSeq
-import I2b2ResultEnvelope.Column
-import com.sun.org.apache.xalan.internal.xsltc.compiler.ValueOf
-import scala.xml.XML
-import net.shrine.serialization.XmlMarshaller
-import net.shrine.serialization.XmlMarshaller
-import net.shrine.serialization.I2b2Unmarshaller
-import net.shrine.serialization.I2b2Unmarshaller
-import net.shrine.util.XmlUtil
-import net.shrine.serialization.I2b2Marshaller
-import net.shrine.serialization.XmlUnmarshaller
-import net.shrine.serialization.XmlMarshaller
-import net.shrine.serialization.JsonMarshaller
import net.liftweb.json.JsonDSL._
import net.liftweb.json._
-import net.shrine.serialization.JsonUnmarshaller
-import scala.util.Try
-import net.shrine.util.XmlDateHelper
-import net.shrine.util.NodeSeqEnrichments
-import scala.util.Success
+import net.shrine.serialization.{I2b2Marshaller, JsonMarshaller, XmlMarshaller}
+import net.shrine.util.{NodeSeqEnrichments, XmlUtil}
+
+import scala.util.{Success, Try}
+import scala.xml.NodeSeq
/**
* @author clint
* @since Aug 15, 2012
*/
final case class I2b2ResultEnvelope(resultType: ResultOutputType, data: Map[String, Long]) extends I2b2Marshaller with XmlMarshaller with JsonMarshaller {
import I2b2ResultEnvelope._
//Extra parameter list with dummy int value needed to disambiguate this constructor and the class-level one, which without
//the extra param list have the same signature after erasure. :/ Making the dummy param implicit lets us omit the second
//param list entirely when calling this constructor.
def this(resultType: ResultOutputType, cols: (String, Long)*) = this(resultType, cols.toMap)
def +(column: ColumnTuple): I2b2ResultEnvelope = {
this.copy(data = data + column)
}
def ++(columns: Iterable[ColumnTuple]): I2b2ResultEnvelope = {
columns.foldLeft(this)(_ + _)
}
def mapValues(f: Long => Long): I2b2ResultEnvelope = this.copy(data = data.mapValues(f))
def toMap: Map[String, Long] = data
private def sortedData: Seq[(String, Long)] = data.toSeq.sortBy { case (columnName, _) => columnName }
override def toI2b2: NodeSeq = XmlUtil.stripWhitespace {
{
//NB: SHRINE-863: Bill wants these sorted by name, server-side
sortedData.map { case (name, value) => { value } }
}
}
override def toXml: NodeSeq = XmlUtil.stripWhitespace {
{ resultType }
{
//NB: SHRINE-863: Bill wants these sorted by name, server-side
sortedData.map { case (name, value) =>
{ name }
{ value }
}
}
}
- override def toJson: JValue = (resultType.name -> data)
+ override def toJson: JValue = resultType.name -> data
}
object I2b2ResultEnvelope extends I2b2XmlUnmarshaller[I2b2ResultEnvelope] with ShrineXmlUnmarshaller[I2b2ResultEnvelope] {
type ColumnTuple = (String, Long)
private object Column {
private def unmarshal(xml: NodeSeq, name: NodeSeq => String, value: NodeSeq => Long): Try[ColumnTuple] = {
Try {
(name(xml), value(xml))
}
}
private def from(attr: String): NodeSeq => String = xml => (xml \ attr).text
def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[ColumnTuple] = unmarshal(xml, from("@column"), _.text.toLong)
def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[ColumnTuple] = unmarshal(xml, from("name"), from("value") andThen (_.toLong))
def fromJson(json: JValue): Option[ColumnTuple] = json match {
- case JObject(List(JField(name, JInt(value)))) => Some((name, value.toInt))
+ case JObject(List(JField(name, JInt(value)))) => Some((name, value.toLong))
case _ => None
}
}
def empty(resultType: ResultOutputType) = new I2b2ResultEnvelope(resultType)
import NodeSeqEnrichments.Strictness._
override def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[I2b2ResultEnvelope] = {
unmarshal(
Success(xml),
_.withChild("resultType").map(_.text.trim).flatMap(ResultOutputType.tryValueOf(breakdownTypes)),
_.withChild("column"),
Column.fromXml(breakdownTypes))
}
override def fromI2b2(breakdownTypes: Set[ResultOutputType])(i2b2Xml: NodeSeq): Try[I2b2ResultEnvelope] = {
unmarshal(
i2b2Xml.withChild("body").withChild("result"),
_.attribute("name").flatMap(ResultOutputType.tryValueOf(breakdownTypes)),
x => Try(x \ "data"),
Column.fromI2b2(breakdownTypes))
}
private def unmarshal(xmlAttempt: Try[NodeSeq], getResultType: NodeSeq => Try[ResultOutputType], columnXmls: NodeSeq => Try[NodeSeq], toColumn: NodeSeq => Try[ColumnTuple]): Try[I2b2ResultEnvelope] = {
import net.shrine.util.Tries.sequence
for {
xml <- xmlAttempt
resultType <- getResultType(xml)
columnXml <- columnXmls(xml)
columnAttempts = columnXml.map(toColumn)
columns <- sequence(columnAttempts)
} yield new I2b2ResultEnvelope(resultType, columns: _*)
}
}
\ No newline at end of file
diff --git a/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala
index 68558aacc..6a22c3f65 100644
--- a/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala
+++ b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala
@@ -1,608 +1,612 @@
package net.shrine.qep.querydb
import java.sql.SQLException
import java.util.concurrent.TimeoutException
import javax.sql.DataSource
import com.typesafe.config.Config
import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName}
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources}
import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, FlagQueryRequest, I2b2ResultEnvelope, QueryMaster, QueryResult, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RenameQueryRequest, ResultOutputType, ResultOutputTypes, RunQueryRequest, UnFlagQueryRequest}
import net.shrine.slick.{CouldNotRunDbIoActionException, TestableDataSourceCreator, TimeoutInDbIoActionException}
import net.shrine.source.ConfigSource
import net.shrine.util.XmlDateHelper
import slick.driver.JdbcProfile
import scala.collection.immutable.Iterable
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, Future, blocking}
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.NonFatal
import scala.xml.XML
/**
* DB code for the QEP's query instances and query results.
*
* @author david
* @since 1/19/16
*/
case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource,timeout:Duration) extends Loggable {
import schemaDef._
import jdbcProfile.api._
val database = Database.forDataSource(dataSource)
def createTables() = schemaDef.createTables(database)
def dropTables() = schemaDef.dropTables(database)
def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = {
val future: Future[R] = database.run(action)
try {
blocking {
Await.result(future, timeout)
}
}
catch {
case tx:TimeoutException => throw TimeoutInDbIoActionException(dataSource, timeout, tx)
case NonFatal(x) => throw CouldNotRunDbIoActionException(dataSource,x)
}
}
def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = {
debug(s"insertQepQuery $runQueryRequest")
insertQepQuery(QepQuery(runQueryRequest))
}
def insertQepQuery(qepQuery: QepQuery):Unit = {
dbRun(allQepQueryQuery += qepQuery)
}
def selectAllQepQueries:Seq[QepQuery] = {
dbRun(mostRecentVisibleQepQueries.result)
}
def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = {
val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(
request.authn.username,
request.authn.domain,
None,
Some(request.fetchSize))
val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set])
val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId)))
ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2)))
}
def countPreviousQueriesByUserAndDomain(userName: UserName, domain: String):Int = {
val q = mostRecentVisibleQepQueries.filter(r => r.userName === userName && r.userDomain === domain)
dbRun(q.size.result)
}
def selectQueryById(networkQueryId: NetworkQueryId): Option[QepQuery] =
dbRun(mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result).lastOption
def selectPreviousQueriesByUserAndDomain(userName: UserName, domain: String, skip:Option[Int] = None, limit:Option[Int] = None):Seq[QepQuery] = {
debug(s"start selectPreviousQueriesByUserAndDomain $userName $domain")
val q = mostRecentVisibleQepQueries.filter(r => r.userName === userName && r.userDomain === domain).sortBy(x => x.changeDate.desc)
val qWithSkip = skip.fold(q)(q.drop)
val qWithLimit = limit.fold(qWithSkip)(qWithSkip.take)
val result = dbRun(qWithLimit.result)
debug(s"finished selectPreviousQueriesByUserAndDomain with $result")
result
}
def renamePreviousQuery(request:RenameQueryRequest):Unit = {
val networkQueryId = request.networkQueryId
dbRun(
for {
queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
_ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName,changeDate = System.currentTimeMillis()))
} yield queryResults
)
}
def markDeleted(request:DeleteQueryRequest):Unit = {
val networkQueryId = request.networkQueryId
dbRun(
for {
queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
_ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true,changeDate = System.currentTimeMillis()))
} yield queryResults
)
}
def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(flagQueryRequest))
}
def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(unflagQueryRequest))
}
def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = {
dbRun(allQepQueryFlags += qepQueryFlag)
}
def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = {
val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result)
flags.map(x => x.networkQueryId -> x).toMap
}
def selectMostRecentQepQueryFlagFor(networkQueryId: NetworkQueryId): Option[QepQueryFlag] =
dbRun(mostRecentQueryFlags.filter(_.networkId === networkQueryId).result).lastOption
def insertQepResultRow(qepQueryRow:QueryResultRow) = {
dbRun(allQueryResultRows += qepQueryRow)
}
def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = {
val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node"))
val queryResultRow = QueryResultRow(networkQueryId,result)
val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_))
val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq]
dbRun(
for {
_ <- allQueryResultRows += queryResultRow
_ <- allBreakdownResultsRows ++= breakdowns
_ <- allProblemDigestRows ++= problem
} yield ()
)
}
//todo only used in tests. Is that OK?
def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = {
dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result)
}
def selectMostRecentFullQueryResultsFor(networkId:NetworkQueryId): Seq[FullQueryResult] = {
val (queryResults, breakdowns,problems) = dbRun(
for {
queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result
- breakdowns <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result
+ breakdowns: Seq[QepQueryBreakdownResultsRow] <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result
problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result
} yield (queryResults, breakdowns, problems)
)
- val resultIdsToI2b2ResultEnvelopes: Map[Long, Map[ResultOutputType, I2b2ResultEnvelope]] = breakdowns.groupBy(_.resultId).map(rIdToB => rIdToB._1 -> QepQueryBreakdownResultsRow.resultEnvelopesFrom(rIdToB._2))
+
+ val breakdownTypeToResults: Map[ResultOutputType, Seq[QepQueryBreakdownResultsRow]] = breakdowns.groupBy(_.resultType)
def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = {
if(problemSeq.size == 1) problemSeq.head.toProblemDigest
else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq")
}
val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) )
- queryResults.map(r => FullQueryResult(r,
- resultIdsToI2b2ResultEnvelopes.get(r.resultId),
+ queryResults.map(r => FullQueryResult(
+ r,
+ breakdownTypeToResults,
adapterNodesToProblemDigests.get(r.adapterNode)
))
}
def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = {
val fullQueryResults = selectMostRecentFullQueryResultsFor(networkId)
fullQueryResults.map(_.toQueryResult)
}
def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = {
dbRun(allBreakdownResultsRows += breakdownResultsRow)
}
def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = {
dbRun(allBreakdownResultsRows.result)
}
def selectDistinctAdaptersWithResults:Seq[String] = {
dbRun(allQueryResultRows.map(_.adapterNode).distinct.result).sorted
}
}
object QepQueryDb extends Loggable {
val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config)
val timeout = QepQuerySchema.config.getInt("timeout") seconds
val db = QepQueryDb(QepQuerySchema.schema,dataSource,timeout)
val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart")
if(createTablesOnStart) QepQueryDb.db.createTables()
}
/**
* Separate class to support schema generation without actually connecting to the database.
*
* @param jdbcProfile Database profile to use for the schema
*/
case class QepQuerySchema(jdbcProfile: JdbcProfile,moreBreakdowns: Set[ResultOutputType]) extends Loggable {
import jdbcProfile.api._
def ddlForAllTables: jdbcProfile.DDL = {
allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema
}
//to get the schema, use the REPL
//println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n"))
def createTables(database:Database) = {
try {
val future = database.run(ddlForAllTables.create)
Await.result(future,10 seconds)
} catch {
//I'd prefer to check and create schema only if absent. No way to do that with Oracle.
case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x)
}
}
def dropTables(database:Database) = {
val future = database.run(ddlForAllTables.drop)
//Really wait forever for the cleanup
Await.result(future,Duration.Inf)
}
class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") {
def networkId = column[NetworkQueryId]("networkId")
def userName = column[UserName]("userName")
def userDomain = column[String]("domain")
def queryName = column[QueryName]("queryName")
def expression = column[Option[String]]("expression")
def dateCreated = column[Time]("dateCreated")
def deleted = column[Boolean]("deleted")
def queryXml = column[String]("queryXml")
def changeDate = column[Long]("changeDate")
def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply)
}
val allQepQueryQuery = TableQuery[QepQueries]
val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for(
queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists
) yield queries
val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false)
class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") {
def networkId = column[NetworkQueryId]("networkId")
def flagged = column[Boolean]("flagged")
def flagMessage = column[String]("flagMessage")
def changeDate = column[Long]("changeDate")
def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply)
}
val allQepQueryFlags = TableQuery[QepQueryFlags]
val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for(
queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists
) yield queryFlags
val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values ++ moreBreakdowns
val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap
val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap)
implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({
(resultType: ResultOutputType) => queryResultTypesToString(resultType)
},{
(string: String) => stringsToQueryResultTypes(string)
})
implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({
statusType => statusType.name
},{
name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}"))
})
class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") {
def resultId = column[Long]("resultId")
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def instanceId = column[Long]("instanceId")
def adapterNode = column[String]("adapterNode")
def resultType = column[Option[ResultOutputType]]("resultType")
def size = column[Long]("size")
def startDate = column[Option[Long]]("startDate")
def endDate = column[Option[Long]]("endDate")
def status = column[QueryResult.StatusType]("status")
def statusMessage = column[Option[String]]("statusMessage")
def changeDate = column[Long]("changeDate")
def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply)
}
val allQueryResultRows = TableQuery[QepQueryResults]
//Most recent query result rows for each queryId from each adapter
val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for(
queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists
) yield queryResultRows
class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def resultId = column[Long]("resultId")
def resultType = column[ResultOutputType]("resultType")
def dataKey = column[String]("dataKey")
def value = column[Long]("value")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply)
}
val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults]
//Most recent query result rows for each queryId from each adapter
val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for(
breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists
) yield breakdownResultsRows
/*
case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller {
*/
class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def codec = column[String]("codec")
def stamp = column[String]("stamp")
def summary = column[String]("summary")
def description = column[String]("description")
def details = column[String]("details")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply)
}
val allProblemDigestRows = TableQuery[QepResultProblemDigests]
val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for(
problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists
) yield problemDigests
}
object QepQuerySchema {
val allConfig:Config = ConfigSource.config
val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database")
val slickProfile:JdbcProfile = ConfigSource.getObject("slickProfileClassName", config)
import net.shrine.config.ConfigExtensions
val moreBreakdowns: Set[ResultOutputType] = config.getOptionConfigured("breakdownResultOutputTypes",ResultOutputTypes.fromConfig).getOrElse(Set.empty)
val schema = QepQuerySchema(slickProfile,moreBreakdowns)
}
case class QepQuery(
networkId:NetworkQueryId,
userName: UserName,
userDomain: String,
queryName: QueryName,
expression: Option[String],
dateCreated: Time,
deleted: Boolean,
queryXml: String,
changeDate: Time
){
def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = {
QueryMaster(
queryMasterId = networkId.toString,
networkQueryId = networkId,
name = queryName,
userId = userName,
groupId = userDomain,
createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated),
flagged = qepQueryFlag.map(_.flagged),
flagMessage = qepQueryFlag.map(_.flagMessage)
)
}
}
object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,Option[String],Time,Boolean,String,Time) => QepQuery) {
def apply(runQueryRequest: RunQueryRequest):QepQuery = {
new QepQuery(
networkId = runQueryRequest.networkQueryId,
userName = runQueryRequest.authn.username,
userDomain = runQueryRequest.authn.domain,
queryName = runQueryRequest.queryDefinition.name,
expression = runQueryRequest.queryDefinition.expr.map(_.toString),
dateCreated = System.currentTimeMillis(),
deleted = false,
queryXml = runQueryRequest.toXmlString,
changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryFlag(
networkQueryId: NetworkQueryId,
flagged:Boolean,
flagMessage:String,
changeDate:Long
)
object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) {
def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = flagQueryRequest.networkQueryId,
flagged = true,
flagMessage = flagQueryRequest.message.getOrElse(""),
changeDate = System.currentTimeMillis()
)
}
def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = unflagQueryRequest.networkQueryId,
flagged = false,
flagMessage = "",
changeDate = System.currentTimeMillis()
)
}
}
//todo replace with a class per state
case class FullQueryResult(
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:Option[ResultOutputType],
count:Long,
startDate:Option[Long],
endDate:Option[Long],
status:QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long,
- breakdowns:Option[Map[ResultOutputType,I2b2ResultEnvelope]],
+ breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]],
problemDigest:Option[ProblemDigest]
) {
- def toQueryResult = QueryResult(
- resultId = resultId,
- instanceId = instanceId,
- resultType = resultType,
- setSize = count,
- startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar),
- endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar),
- description = Some(adapterNode),
- statusType = status,
- statusMessage = statusMessage,
- breakdowns = breakdowns.getOrElse(Map.empty),
- problemDigest = problemDigest
- )
+ def toQueryResult = {
+ def resultEnvelopesFrom(breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]]): Map[ResultOutputType, I2b2ResultEnvelope] = {
+ def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = {
+ val data = breakdowns.map(b => b.dataKey -> b.value).toMap
+ I2b2ResultEnvelope(resultType,data)
+ }
+ breakdownTypeToResults.map(r => r._1 -> resultEnvelopeFrom(r._1,r._2))
+ }
+
+ QueryResult(
+ resultId = resultId,
+ instanceId = instanceId,
+ resultType = resultType,
+ setSize = count,
+ startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar),
+ endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar),
+ description = Some(adapterNode),
+ statusType = status,
+ statusMessage = statusMessage,
+ breakdowns = resultEnvelopesFrom(breakdownTypeToResults),
+ problemDigest = problemDigest
+ )
+ }
}
object FullQueryResult {
def apply(row:QueryResultRow,
- breakdowns:Option[Map[ResultOutputType,I2b2ResultEnvelope]],
+ breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]],
problemDigest:Option[ProblemDigest]):FullQueryResult = {
FullQueryResult(resultId = row.resultId,
networkQueryId = row.networkQueryId,
instanceId = row.instanceId,
adapterNode = row.adapterNode,
resultType = row.resultType,
count = row.size,
startDate = row.startDate,
endDate = row.endDate,
status = row.status,
statusMessage = row.statusMessage,
changeDate = row.changeDate,
- breakdowns = breakdowns,
+ breakdownTypeToResults = breakdownTypeToResults,
problemDigest = problemDigest
)
}
}
case class QueryResultRow(
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:Option[ResultOutputType],
size:Long,
startDate:Option[Long],
endDate:Option[Long],
status:QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long
) {
}
object QueryResultRow extends ((Long,NetworkQueryId,Long,String,Option[ResultOutputType],Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow)
{
def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = {
new QueryResultRow(
resultId = result.resultId,
networkQueryId = networkQueryId,
instanceId = result.instanceId,
adapterNode = result.description.getOrElse(s"$result has None in its description field, not a name of an adapter node."),
resultType = result.resultType,
size = result.setSize,
startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis),
endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis),
status = result.statusType,
statusMessage = result.statusMessage,
changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryBreakdownResultsRow(
networkQueryId: NetworkQueryId,
adapterNode:String,
resultId:Long,
resultType: ResultOutputType,
dataKey:String,
value:Long,
changeDate:Long
)
object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){
def breakdownRowsFor(networkQueryId:NetworkQueryId,
adapterNode:String,
resultId:Long,
breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = {
breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis()))
}
- def resultEnvelopesFrom(breakdowns:Seq[QepQueryBreakdownResultsRow]): Map[ResultOutputType, I2b2ResultEnvelope] = {
- def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = {
- val data = breakdowns.map(b => b.dataKey -> b.value).toMap
- I2b2ResultEnvelope(resultType,data)
- }
-
- breakdowns.groupBy(_.resultType).map(r => r._1 -> resultEnvelopeFrom(r._1,r._2))
- }
}
case class QepProblemDigestRow(
networkQueryId: NetworkQueryId,
adapterNode: String,
codec: String,
stampText: String,
summary: String,
description: String,
details: String,
changeDate:Long
){
def toProblemDigest = {
ProblemDigest(
codec,
stampText,
summary,
description,
if(!details.isEmpty) XML.loadString(details)
else ,
//TODO: FIGURE OUT HOW TO GET AN ACUTAL EPOCH INTO HERE
0
)
}
}
case class QepDatabaseProblem(x:Exception) extends AbstractProblem(ProblemSources.Qep){
override val summary = "A problem encountered while using a database."
override val throwable = Some(x)
override val description = x.getMessage
}
\ No newline at end of file