diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala index b7b29628d..4faa9b542 100644 --- a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala +++ b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala @@ -1,206 +1,218 @@ package net.shrine.metadata import net.shrine.audit.{NetworkQueryId, QueryName, Time} import net.shrine.authorization.steward.UserName import net.shrine.i2b2.protocol.pm.User import net.shrine.log.Loggable import net.shrine.problem.ProblemDigest import net.shrine.protocol.ResultOutputType -import net.shrine.qep.querydb.{FullQueryResult, QepQuery, QepQueryDb, QepQueryFlag} +import net.shrine.qep.querydb.{FullQueryResult, QepQuery, QepQueryBreakdownResultsRow, QepQueryDb, QepQueryFlag} import spray.routing._ import rapture.json._ import rapture.json.jsonBackends.jawn._ import rapture.json.formatters.humanReadable import spray.http.StatusCodes /** * An API to support the web client's work with queries. * * The current API supplies information about previous running queries. Eventually this will support accessing * information about queries running now and the ability to submit queries. */ //todo move this to the qep/service module trait QepService extends HttpService with Loggable { val qepInfo = """ |The SHRINE query entry point service. | |This API gives a researcher access to queries, and (eventually) the ability to run queries. | """.stripMargin def qepRoute(user: User): Route = pathPrefix("qep") { get { queryResult(user) ~ queryResultsTable(user) } ~ pathEndOrSingleSlash{complete(qepInfo)} ~ respondWithStatus(StatusCodes.NotFound){complete(qepInfo)} } def queryResult(user:User):Route = path("queryResult" / LongNumber){ queryId:NetworkQueryId => val queryOption: Option[QepQuery] = QepQueryDb.db.selectQueryById(queryId) queryOption.fold{ respondWithStatus(StatusCodes.NotFound){complete(s"No query with id $queryId found")} }{query:QepQuery => if(user.sameUserAs(query.userName,query.userDomain)) { val mostRecentQueryResults: Seq[Result] = QepQueryDb.db.selectMostRecentFullQueryResultsFor(queryId).map(Result(_)) val flag = QepQueryDb.db.selectMostRecentQepQueryFlagFor(queryId).map(QueryFlag(_)) val queryCell = QueryCell(query,flag) val queryAndResults = ResultsRow(queryCell,mostRecentQueryResults) val json: Json = Json(queryAndResults) val formattedJson: String = Json.format(json)(humanReadable()) complete(formattedJson) } else { respondWithStatus(StatusCodes.Forbidden){complete(s"Query $queryId belongs to a different user")} } } } def queryResultsTable(user: User): Route = path("queryResultsTable") { matchQueryParameters(Some(user.username)){ queryParameters:QueryParameters => val queryRowCount: Int = QepQueryDb.db.countPreviousQueriesByUserAndDomain( userName = user.username, domain = user.domain ) val queries: Seq[QepQuery] = QepQueryDb.db.selectPreviousQueriesByUserAndDomain( userName = user.username, domain = user.domain, skip = queryParameters.skipOption, limit = queryParameters.limitOption ) //todo revisit json structure to remove things the front-end doesn't use val adapters: Seq[String] = QepQueryDb.db.selectDistinctAdaptersWithResults val flags: Map[NetworkQueryId, QueryFlag] = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(queries.map(q => q.networkId).to[Set]) .map(q => q._1 -> QueryFlag(q._2)) val queryResults: Seq[ResultsRow] = queries.map(q => ResultsRow( query = QueryCell(q,flags.get(q.networkId)), results = QepQueryDb.db.selectMostRecentFullQueryResultsFor(q.networkId).map(Result(_)))) val table: ResultsTable = ResultsTable(queryRowCount,queryParameters.skipOption.getOrElse(0),adapters,queryResults) val jsonTable: Json = Json(table) val formattedTable: String = Json.format(jsonTable)(humanReadable()) complete(formattedTable) } } def matchQueryParameters(userName: Option[UserName])(parameterRoute: QueryParameters => Route): Route = { parameters('skip.as[Int].?, 'limit.as[Int].?) { (skipOption, limitOption) => val qp = QueryParameters( userName, skipOption, limitOption ) parameterRoute(qp) } } } //todo maybe move to QepQueryDb class case class QueryParameters( researcherIdOption:Option[UserName] = None, skipOption:Option[Int] = None, limitOption:Option[Int] = None //todo deadline, maybe version, someday ) case class ResultsTable( rowCount:Int, rowOffset:Int, adapters:Seq[String], //todo type for adapter name queryResults:Seq[ResultsRow] ) case class ResultsRow( query:QueryCell, results: Seq[Result] ) case class QueryCell( networkId:String, //easier to support in json, lessens the impact of using a GUID iff we can get there queryName: QueryName, dateCreated: Time, queryXml: String, changeDate: Time, flag:Option[QueryFlag] ) object QueryCell { def apply(qepQuery: QepQuery,flag: Option[QueryFlag]): QueryCell = QueryCell( networkId = qepQuery.networkId.toString, queryName = qepQuery.queryName, dateCreated = qepQuery.dateCreated, queryXml = qepQuery.queryXml, changeDate = qepQuery.changeDate, flag ) } case class QueryFlag( flagged:Boolean, flagMessage:String, changeDate:Long ) object QueryFlag{ def apply(qepQueryFlag: QepQueryFlag): QueryFlag = QueryFlag(qepQueryFlag.flagged, qepQueryFlag.flagMessage, qepQueryFlag.changeDate) } case class Result ( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:Option[ResultOutputType], count:Long, status:String, //todo QueryResult.StatusType, statusMessage:Option[String], changeDate:Long, -// todo breakdowns:Option[Map[ResultOutputType,I2b2ResultEnvelope]] + breakdowns: Seq[BreakdownResultsForType], problemDigest:Option[ProblemDigestForJson] ) object Result { - def apply(fullQueryResult: FullQueryResult): Result = Result( + def apply(fullQueryResult: FullQueryResult): Result = new Result( resultId = fullQueryResult.resultId, networkQueryId = fullQueryResult.networkQueryId, instanceId = fullQueryResult.instanceId, adapterNode = fullQueryResult.adapterNode, resultType = fullQueryResult.resultType, count = fullQueryResult.count, status = fullQueryResult.status.toString, statusMessage = fullQueryResult.statusMessage, changeDate = fullQueryResult.changeDate, -// breakdowns = fullQueryResult.breakdowns + breakdowns = fullQueryResult.breakdownTypeToResults.map(tToR => BreakdownResultsForType(fullQueryResult.adapterNode,tToR._1,tToR._2)).to[Seq], problemDigest = fullQueryResult.problemDigest.map(ProblemDigestForJson(_)) ) } //todo replace when you figure out how to json-ize xml in rapture case class ProblemDigestForJson(codec: String, stampText: String, summary: String, description: String, detailsString: String, epoch: Long) object ProblemDigestForJson { def apply(problemDigest: ProblemDigest): ProblemDigestForJson = ProblemDigestForJson( problemDigest.codec, problemDigest.stampText, problemDigest.summary, problemDigest.description, problemDigest.detailsXml.text, problemDigest.epoch) -} \ No newline at end of file +} + +case class BreakdownResultsForType(name:String,displayName:String,description:String,results:Seq[BreakdownResult]) + +object BreakdownResultsForType { + def apply(adapterName: String, breakdownType: ResultOutputType, breakdowns: Seq[QepQueryBreakdownResultsRow]): BreakdownResultsForType = { + val breakdownResults = breakdowns.filter(_.adapterNode == adapterName).map(row => BreakdownResult(row.dataKey,row.value,row.changeDate)) + + BreakdownResultsForType(breakdownType.name,breakdownType.i2b2Options.displayType,breakdownType.i2b2Options.description,breakdownResults) + } +} + +case class BreakdownResult(dataKey:String,value:Long,changeDate:Long) diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/I2b2ResultEnvelope.scala b/commons/protocol/src/main/scala/net/shrine/protocol/I2b2ResultEnvelope.scala index 218b1d63f..684eab78d 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/I2b2ResultEnvelope.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/I2b2ResultEnvelope.scala @@ -1,137 +1,124 @@ package net.shrine.protocol -import scala.xml.NodeSeq -import I2b2ResultEnvelope.Column -import com.sun.org.apache.xalan.internal.xsltc.compiler.ValueOf -import scala.xml.XML -import net.shrine.serialization.XmlMarshaller -import net.shrine.serialization.XmlMarshaller -import net.shrine.serialization.I2b2Unmarshaller -import net.shrine.serialization.I2b2Unmarshaller -import net.shrine.util.XmlUtil -import net.shrine.serialization.I2b2Marshaller -import net.shrine.serialization.XmlUnmarshaller -import net.shrine.serialization.XmlMarshaller -import net.shrine.serialization.JsonMarshaller import net.liftweb.json.JsonDSL._ import net.liftweb.json._ -import net.shrine.serialization.JsonUnmarshaller -import scala.util.Try -import net.shrine.util.XmlDateHelper -import net.shrine.util.NodeSeqEnrichments -import scala.util.Success +import net.shrine.serialization.{I2b2Marshaller, JsonMarshaller, XmlMarshaller} +import net.shrine.util.{NodeSeqEnrichments, XmlUtil} + +import scala.util.{Success, Try} +import scala.xml.NodeSeq /** * @author clint * @since Aug 15, 2012 */ final case class I2b2ResultEnvelope(resultType: ResultOutputType, data: Map[String, Long]) extends I2b2Marshaller with XmlMarshaller with JsonMarshaller { import I2b2ResultEnvelope._ //Extra parameter list with dummy int value needed to disambiguate this constructor and the class-level one, which without //the extra param list have the same signature after erasure. :/ Making the dummy param implicit lets us omit the second //param list entirely when calling this constructor. def this(resultType: ResultOutputType, cols: (String, Long)*) = this(resultType, cols.toMap) def +(column: ColumnTuple): I2b2ResultEnvelope = { this.copy(data = data + column) } def ++(columns: Iterable[ColumnTuple]): I2b2ResultEnvelope = { columns.foldLeft(this)(_ + _) } def mapValues(f: Long => Long): I2b2ResultEnvelope = this.copy(data = data.mapValues(f)) def toMap: Map[String, Long] = data private def sortedData: Seq[(String, Long)] = data.toSeq.sortBy { case (columnName, _) => columnName } override def toI2b2: NodeSeq = XmlUtil.stripWhitespace { { //NB: SHRINE-863: Bill wants these sorted by name, server-side sortedData.map { case (name, value) => { value } } } } override def toXml: NodeSeq = XmlUtil.stripWhitespace { { resultType } { //NB: SHRINE-863: Bill wants these sorted by name, server-side sortedData.map { case (name, value) => { name } { value } } } } - override def toJson: JValue = (resultType.name -> data) + override def toJson: JValue = resultType.name -> data } object I2b2ResultEnvelope extends I2b2XmlUnmarshaller[I2b2ResultEnvelope] with ShrineXmlUnmarshaller[I2b2ResultEnvelope] { type ColumnTuple = (String, Long) private object Column { private def unmarshal(xml: NodeSeq, name: NodeSeq => String, value: NodeSeq => Long): Try[ColumnTuple] = { Try { (name(xml), value(xml)) } } private def from(attr: String): NodeSeq => String = xml => (xml \ attr).text def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[ColumnTuple] = unmarshal(xml, from("@column"), _.text.toLong) def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[ColumnTuple] = unmarshal(xml, from("name"), from("value") andThen (_.toLong)) def fromJson(json: JValue): Option[ColumnTuple] = json match { - case JObject(List(JField(name, JInt(value)))) => Some((name, value.toInt)) + case JObject(List(JField(name, JInt(value)))) => Some((name, value.toLong)) case _ => None } } def empty(resultType: ResultOutputType) = new I2b2ResultEnvelope(resultType) import NodeSeqEnrichments.Strictness._ override def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[I2b2ResultEnvelope] = { unmarshal( Success(xml), _.withChild("resultType").map(_.text.trim).flatMap(ResultOutputType.tryValueOf(breakdownTypes)), _.withChild("column"), Column.fromXml(breakdownTypes)) } override def fromI2b2(breakdownTypes: Set[ResultOutputType])(i2b2Xml: NodeSeq): Try[I2b2ResultEnvelope] = { unmarshal( i2b2Xml.withChild("body").withChild("result"), _.attribute("name").flatMap(ResultOutputType.tryValueOf(breakdownTypes)), x => Try(x \ "data"), Column.fromI2b2(breakdownTypes)) } private def unmarshal(xmlAttempt: Try[NodeSeq], getResultType: NodeSeq => Try[ResultOutputType], columnXmls: NodeSeq => Try[NodeSeq], toColumn: NodeSeq => Try[ColumnTuple]): Try[I2b2ResultEnvelope] = { import net.shrine.util.Tries.sequence for { xml <- xmlAttempt resultType <- getResultType(xml) columnXml <- columnXmls(xml) columnAttempts = columnXml.map(toColumn) columns <- sequence(columnAttempts) } yield new I2b2ResultEnvelope(resultType, columns: _*) } } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala index 68558aacc..6a22c3f65 100644 --- a/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala +++ b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala @@ -1,608 +1,612 @@ package net.shrine.qep.querydb import java.sql.SQLException import java.util.concurrent.TimeoutException import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName} import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources} import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, FlagQueryRequest, I2b2ResultEnvelope, QueryMaster, QueryResult, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RenameQueryRequest, ResultOutputType, ResultOutputTypes, RunQueryRequest, UnFlagQueryRequest} import net.shrine.slick.{CouldNotRunDbIoActionException, TestableDataSourceCreator, TimeoutInDbIoActionException} import net.shrine.source.ConfigSource import net.shrine.util.XmlDateHelper import slick.driver.JdbcProfile import scala.collection.immutable.Iterable import scala.concurrent.duration.{Duration, DurationInt} import scala.concurrent.{Await, Future, blocking} import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.util.control.NonFatal import scala.xml.XML /** * DB code for the QEP's query instances and query results. * * @author david * @since 1/19/16 */ case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource,timeout:Duration) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) try { blocking { Await.result(future, timeout) } } catch { case tx:TimeoutException => throw TimeoutInDbIoActionException(dataSource, timeout, tx) case NonFatal(x) => throw CouldNotRunDbIoActionException(dataSource,x) } } def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = { debug(s"insertQepQuery $runQueryRequest") insertQepQuery(QepQuery(runQueryRequest)) } def insertQepQuery(qepQuery: QepQuery):Unit = { dbRun(allQepQueryQuery += qepQuery) } def selectAllQepQueries:Seq[QepQuery] = { dbRun(mostRecentVisibleQepQueries.result) } def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = { val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain( request.authn.username, request.authn.domain, None, Some(request.fetchSize)) val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set]) val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId))) ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2))) } def countPreviousQueriesByUserAndDomain(userName: UserName, domain: String):Int = { val q = mostRecentVisibleQepQueries.filter(r => r.userName === userName && r.userDomain === domain) dbRun(q.size.result) } def selectQueryById(networkQueryId: NetworkQueryId): Option[QepQuery] = dbRun(mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result).lastOption def selectPreviousQueriesByUserAndDomain(userName: UserName, domain: String, skip:Option[Int] = None, limit:Option[Int] = None):Seq[QepQuery] = { debug(s"start selectPreviousQueriesByUserAndDomain $userName $domain") val q = mostRecentVisibleQepQueries.filter(r => r.userName === userName && r.userDomain === domain).sortBy(x => x.changeDate.desc) val qWithSkip = skip.fold(q)(q.drop) val qWithLimit = limit.fold(qWithSkip)(qWithSkip.take) val result = dbRun(qWithLimit.result) debug(s"finished selectPreviousQueriesByUserAndDomain with $result") result } def renamePreviousQuery(request:RenameQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName,changeDate = System.currentTimeMillis())) } yield queryResults ) } def markDeleted(request:DeleteQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true,changeDate = System.currentTimeMillis())) } yield queryResults ) } def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(flagQueryRequest)) } def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(unflagQueryRequest)) } def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = { dbRun(allQepQueryFlags += qepQueryFlag) } def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = { val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result) flags.map(x => x.networkQueryId -> x).toMap } def selectMostRecentQepQueryFlagFor(networkQueryId: NetworkQueryId): Option[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId === networkQueryId).result).lastOption def insertQepResultRow(qepQueryRow:QueryResultRow) = { dbRun(allQueryResultRows += qepQueryRow) } def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = { val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node")) val queryResultRow = QueryResultRow(networkQueryId,result) val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_)) val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq] dbRun( for { _ <- allQueryResultRows += queryResultRow _ <- allBreakdownResultsRows ++= breakdowns _ <- allProblemDigestRows ++= problem } yield () ) } //todo only used in tests. Is that OK? def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = { dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result) } def selectMostRecentFullQueryResultsFor(networkId:NetworkQueryId): Seq[FullQueryResult] = { val (queryResults, breakdowns,problems) = dbRun( for { queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result - breakdowns <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result + breakdowns: Seq[QepQueryBreakdownResultsRow] <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result } yield (queryResults, breakdowns, problems) ) - val resultIdsToI2b2ResultEnvelopes: Map[Long, Map[ResultOutputType, I2b2ResultEnvelope]] = breakdowns.groupBy(_.resultId).map(rIdToB => rIdToB._1 -> QepQueryBreakdownResultsRow.resultEnvelopesFrom(rIdToB._2)) + + val breakdownTypeToResults: Map[ResultOutputType, Seq[QepQueryBreakdownResultsRow]] = breakdowns.groupBy(_.resultType) def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = { if(problemSeq.size == 1) problemSeq.head.toProblemDigest else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq") } val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) ) - queryResults.map(r => FullQueryResult(r, - resultIdsToI2b2ResultEnvelopes.get(r.resultId), + queryResults.map(r => FullQueryResult( + r, + breakdownTypeToResults, adapterNodesToProblemDigests.get(r.adapterNode) )) } def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = { val fullQueryResults = selectMostRecentFullQueryResultsFor(networkId) fullQueryResults.map(_.toQueryResult) } def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = { dbRun(allBreakdownResultsRows += breakdownResultsRow) } def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = { dbRun(allBreakdownResultsRows.result) } def selectDistinctAdaptersWithResults:Seq[String] = { dbRun(allQueryResultRows.map(_.adapterNode).distinct.result).sorted } } object QepQueryDb extends Loggable { val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config) val timeout = QepQuerySchema.config.getInt("timeout") seconds val db = QepQueryDb(QepQuerySchema.schema,dataSource,timeout) val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) QepQueryDb.db.createTables() } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepQuerySchema(jdbcProfile: JdbcProfile,moreBreakdowns: Set[ResultOutputType]) extends Loggable { import jdbcProfile.api._ def ddlForAllTables: jdbcProfile.DDL = { allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema } //to get the schema, use the REPL //println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") { def networkId = column[NetworkQueryId]("networkId") def userName = column[UserName]("userName") def userDomain = column[String]("domain") def queryName = column[QueryName]("queryName") def expression = column[Option[String]]("expression") def dateCreated = column[Time]("dateCreated") def deleted = column[Boolean]("deleted") def queryXml = column[String]("queryXml") def changeDate = column[Long]("changeDate") def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply) } val allQepQueryQuery = TableQuery[QepQueries] val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for( queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists ) yield queries val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false) class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") { def networkId = column[NetworkQueryId]("networkId") def flagged = column[Boolean]("flagged") def flagMessage = column[String]("flagMessage") def changeDate = column[Long]("changeDate") def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply) } val allQepQueryFlags = TableQuery[QepQueryFlags] val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for( queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists ) yield queryFlags val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values ++ moreBreakdowns val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap) implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({ (resultType: ResultOutputType) => queryResultTypesToString(resultType) },{ (string: String) => stringsToQueryResultTypes(string) }) implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({ statusType => statusType.name },{ name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}")) }) class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") { def resultId = column[Long]("resultId") def networkQueryId = column[NetworkQueryId]("networkQueryId") def instanceId = column[Long]("instanceId") def adapterNode = column[String]("adapterNode") def resultType = column[Option[ResultOutputType]]("resultType") def size = column[Long]("size") def startDate = column[Option[Long]]("startDate") def endDate = column[Option[Long]]("endDate") def status = column[QueryResult.StatusType]("status") def statusMessage = column[Option[String]]("statusMessage") def changeDate = column[Long]("changeDate") def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply) } val allQueryResultRows = TableQuery[QepQueryResults] //Most recent query result rows for each queryId from each adapter val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for( queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists ) yield queryResultRows class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def resultId = column[Long]("resultId") def resultType = column[ResultOutputType]("resultType") def dataKey = column[String]("dataKey") def value = column[Long]("value") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply) } val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults] //Most recent query result rows for each queryId from each adapter val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for( breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists ) yield breakdownResultsRows /* case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller { */ class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def codec = column[String]("codec") def stamp = column[String]("stamp") def summary = column[String]("summary") def description = column[String]("description") def details = column[String]("details") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply) } val allProblemDigestRows = TableQuery[QepResultProblemDigests] val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for( problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists ) yield problemDigests } object QepQuerySchema { val allConfig:Config = ConfigSource.config val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database") val slickProfile:JdbcProfile = ConfigSource.getObject("slickProfileClassName", config) import net.shrine.config.ConfigExtensions val moreBreakdowns: Set[ResultOutputType] = config.getOptionConfigured("breakdownResultOutputTypes",ResultOutputTypes.fromConfig).getOrElse(Set.empty) val schema = QepQuerySchema(slickProfile,moreBreakdowns) } case class QepQuery( networkId:NetworkQueryId, userName: UserName, userDomain: String, queryName: QueryName, expression: Option[String], dateCreated: Time, deleted: Boolean, queryXml: String, changeDate: Time ){ def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = { QueryMaster( queryMasterId = networkId.toString, networkQueryId = networkId, name = queryName, userId = userName, groupId = userDomain, createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated), flagged = qepQueryFlag.map(_.flagged), flagMessage = qepQueryFlag.map(_.flagMessage) ) } } object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,Option[String],Time,Boolean,String,Time) => QepQuery) { def apply(runQueryRequest: RunQueryRequest):QepQuery = { new QepQuery( networkId = runQueryRequest.networkQueryId, userName = runQueryRequest.authn.username, userDomain = runQueryRequest.authn.domain, queryName = runQueryRequest.queryDefinition.name, expression = runQueryRequest.queryDefinition.expr.map(_.toString), dateCreated = System.currentTimeMillis(), deleted = false, queryXml = runQueryRequest.toXmlString, changeDate = System.currentTimeMillis() ) } } case class QepQueryFlag( networkQueryId: NetworkQueryId, flagged:Boolean, flagMessage:String, changeDate:Long ) object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) { def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = flagQueryRequest.networkQueryId, flagged = true, flagMessage = flagQueryRequest.message.getOrElse(""), changeDate = System.currentTimeMillis() ) } def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = unflagQueryRequest.networkQueryId, flagged = false, flagMessage = "", changeDate = System.currentTimeMillis() ) } } //todo replace with a class per state case class FullQueryResult( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:Option[ResultOutputType], count:Long, startDate:Option[Long], endDate:Option[Long], status:QueryResult.StatusType, statusMessage:Option[String], changeDate:Long, - breakdowns:Option[Map[ResultOutputType,I2b2ResultEnvelope]], + breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]], problemDigest:Option[ProblemDigest] ) { - def toQueryResult = QueryResult( - resultId = resultId, - instanceId = instanceId, - resultType = resultType, - setSize = count, - startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar), - endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar), - description = Some(adapterNode), - statusType = status, - statusMessage = statusMessage, - breakdowns = breakdowns.getOrElse(Map.empty), - problemDigest = problemDigest - ) + def toQueryResult = { + def resultEnvelopesFrom(breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]]): Map[ResultOutputType, I2b2ResultEnvelope] = { + def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = { + val data = breakdowns.map(b => b.dataKey -> b.value).toMap + I2b2ResultEnvelope(resultType,data) + } + breakdownTypeToResults.map(r => r._1 -> resultEnvelopeFrom(r._1,r._2)) + } + + QueryResult( + resultId = resultId, + instanceId = instanceId, + resultType = resultType, + setSize = count, + startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar), + endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar), + description = Some(adapterNode), + statusType = status, + statusMessage = statusMessage, + breakdowns = resultEnvelopesFrom(breakdownTypeToResults), + problemDigest = problemDigest + ) + } } object FullQueryResult { def apply(row:QueryResultRow, - breakdowns:Option[Map[ResultOutputType,I2b2ResultEnvelope]], + breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]], problemDigest:Option[ProblemDigest]):FullQueryResult = { FullQueryResult(resultId = row.resultId, networkQueryId = row.networkQueryId, instanceId = row.instanceId, adapterNode = row.adapterNode, resultType = row.resultType, count = row.size, startDate = row.startDate, endDate = row.endDate, status = row.status, statusMessage = row.statusMessage, changeDate = row.changeDate, - breakdowns = breakdowns, + breakdownTypeToResults = breakdownTypeToResults, problemDigest = problemDigest ) } } case class QueryResultRow( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:Option[ResultOutputType], size:Long, startDate:Option[Long], endDate:Option[Long], status:QueryResult.StatusType, statusMessage:Option[String], changeDate:Long ) { } object QueryResultRow extends ((Long,NetworkQueryId,Long,String,Option[ResultOutputType],Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow) { def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = { new QueryResultRow( resultId = result.resultId, networkQueryId = networkQueryId, instanceId = result.instanceId, adapterNode = result.description.getOrElse(s"$result has None in its description field, not a name of an adapter node."), resultType = result.resultType, size = result.setSize, startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis), endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis), status = result.statusType, statusMessage = result.statusMessage, changeDate = System.currentTimeMillis() ) } } case class QepQueryBreakdownResultsRow( networkQueryId: NetworkQueryId, adapterNode:String, resultId:Long, resultType: ResultOutputType, dataKey:String, value:Long, changeDate:Long ) object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){ def breakdownRowsFor(networkQueryId:NetworkQueryId, adapterNode:String, resultId:Long, breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = { breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis())) } - def resultEnvelopesFrom(breakdowns:Seq[QepQueryBreakdownResultsRow]): Map[ResultOutputType, I2b2ResultEnvelope] = { - def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = { - val data = breakdowns.map(b => b.dataKey -> b.value).toMap - I2b2ResultEnvelope(resultType,data) - } - - breakdowns.groupBy(_.resultType).map(r => r._1 -> resultEnvelopeFrom(r._1,r._2)) - } } case class QepProblemDigestRow( networkQueryId: NetworkQueryId, adapterNode: String, codec: String, stampText: String, summary: String, description: String, details: String, changeDate:Long ){ def toProblemDigest = { ProblemDigest( codec, stampText, summary, description, if(!details.isEmpty) XML.loadString(details) else
, //TODO: FIGURE OUT HOW TO GET AN ACUTAL EPOCH INTO HERE 0 ) } } case class QepDatabaseProblem(x:Exception) extends AbstractProblem(ProblemSources.Qep){ override val summary = "A problem encountered while using a database." override val throwable = Some(x) override val description = x.getMessage } \ No newline at end of file