diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
index c5ebc214f..e3c38b2fe 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
@@ -1,367 +1,367 @@
package net.shrine.protocol
import javax.xml.datatype.XMLGregorianCalendar
-import net.shrine.problem.{ProblemNotYetEncoded, Problem, ProblemDigest}
+import net.shrine.problem.{Problem, ProblemDigest}
import net.shrine.protocol.QueryResult.StatusType
import scala.xml.NodeSeq
import net.shrine.util.{Tries, XmlUtil, NodeSeqEnrichments, SEnum, XmlDateHelper, OptionEnrichments}
import net.shrine.serialization.{ I2b2Marshaller, XmlMarshaller }
import scala.util.Try
/**
* @author Bill Simons
* @since 4/15/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*
* NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing
*/
final case class QueryResult(
resultId: Long,
instanceId: Long,
resultType: Option[ResultOutputType],
setSize: Long,
startDate: Option[XMLGregorianCalendar],
endDate: Option[XMLGregorianCalendar],
description: Option[String],
statusType: StatusType,
statusMessage: Option[String],
problemDigest: Option[ProblemDigest] = None,
breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty
) extends XmlMarshaller with I2b2Marshaller {
//only used in tests
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
None, //description
statusType,
None) //statusMessage
}
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
description: String,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
Option(description),
statusType,
None) //statusMessage
}
def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match {
case Some(rt) => rt == testedResultType
case _ => false
}
import QueryResult._
//NB: Fragile, non-type-safe ==
def isError = statusType == StatusType.Error
def elapsed: Option[Long] = {
def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis
for {
start <- startDate
end <- endDate
} yield inMillis(end) - inMillis(start)
}
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = {
breakdowns.values.toSeq.sortBy(_.resultType.name)
}
override def toI2b2: NodeSeq = {
import OptionEnrichments._
XmlUtil.stripWhitespace {
{ resultId }
{ instanceId }
{ description.toXml() }
{
resultType match {
case Some(rt) if !rt.isError => //noinspection RedundantBlock
{
if (rt.isBreakdown) { rt.toI2b2NameOnly() }
else { rt.toI2b2 }
}
case _ => ResultOutputType.ERROR.toI2b2NameOnly("")
}
}
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ statusType }
{ statusType.toI2b2(this) }
{
//NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case,
//so if we're going to do that, let's produce saner XML.
sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data"))
}
}
}
override def toXml: NodeSeq = XmlUtil.stripWhitespace {
import OptionEnrichments._
{ resultId }
{ instanceId }
{ resultType.toXml(_.toXml) }
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ description.toXml() }
{ statusType }
{ statusMessage.toXml() }
{
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
sortedBreakdowns.map(_.toXml)
}
{ problemDigest.map(_.toXml).getOrElse("") }
}
def withId(id: Long): QueryResult = copy(resultId = id)
def withInstanceId(id: Long): QueryResult = copy(instanceId = id)
def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize))
def withSetSize(size: Long): QueryResult = copy(setSize = size)
def withDescription(desc: String): QueryResult = copy(description = Option(desc))
def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType))
def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData))
def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns)
}
object QueryResult {
final case class StatusType(
name: String,
isDone: Boolean,
i2b2Id: Option[Int] = Some(-1),
private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2) extends StatusType.Value {
def isError = this == StatusType.Error
def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult)
}
object StatusType extends SEnum[StatusType] {
private val defaultToI2b2: QueryResult => NodeSeq = { queryResult =>
val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{
throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id")
}
{ i2b2Id }{ queryResult.statusType.name }
}
val noMessage:NodeSeq = null
val Error = StatusType("ERROR", isDone = true, None, { queryResult =>
(queryResult.statusMessage, queryResult.problemDigest) match {
case (Some(msg),Some(pd)) => { msg } ++ pd.toXml
case (Some(msg),None) => { msg }
case (None,Some(pd)) => pd.toXml
case (None, None) => noMessage
}
})
/*
msg =>
net.shrine.something.is.Broken
Something is borked
{ msg }
Herein is a stack trace, multiple lines
))
*/
val Finished = StatusType("FINISHED", isDone = true, Some(3))
//TODO: Can we use the same for Queued, Processing, and Incomplete?
val Processing = StatusType("PROCESSING", isDone = false, Some(2))
val Queued = StatusType("QUEUED", isDone = false, Some(2))
val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2))
//TODO: What s should these have? Does anyone care?
val Held = StatusType("HELD", isDone = false)
val SmallQueue = StatusType("SMALL_QUEUE", isDone = false)
val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false)
val LargeQueue = StatusType("LARGE_QUEUE", isDone = false)
val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false)
}
def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong
private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption
def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _)
def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim
def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = {
import ResultOutputType.valueOf
val typeName = asText(elemNames: _*)(xml)
valueOf(typeName) orElse valueOf(breakdownTypes)(typeName)
}
def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = {
val attempt = parse(xml)
attempt.toOption
}
def extractProblemDigest(xml: NodeSeq):Option[ProblemDigest] = {
val subXml = xml \ "problem"
if(subXml.nonEmpty) Some(ProblemDigest.fromXml(xml))
else None
}
def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def extract(elemName: String): Option[String] = {
Option((xml \ elemName).text.trim).filter(!_.isEmpty)
}
def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate)
val asLong = extractLong(xml) _
import NodeSeqEnrichments.Strictness._
import Tries.sequence
def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = {
//noinspection ScalaUnnecessaryParentheses
val mapAttempt = for {
subXml <- xml.withChild(elemName)
envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes)))
mappings = envelopes.map(envelope => (envelope.resultType -> envelope))
} yield Map.empty ++ mappings
mapAttempt.getOrElse(Map.empty)
}
QueryResult(
resultId = asLong("resultId"),
instanceId = asLong("instanceId"),
resultType = extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml),
setSize = asLong("setSize"),
startDate = extractDate("startDate"),
endDate = extractDate("endDate"),
description = extract("description"),
statusType = StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call
statusMessage = extract("statusMessage"),
problemDigest = extractProblemDigest(xml),
breakdowns = extractBreakdowns("resultEnvelope")
)
}
def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def asLong = extractLong(xml) _
def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim)
def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate)
QueryResult(
resultId = asLong("result_instance_id"),
instanceId = asLong("query_instance_id"),
resultType = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2),
setSize = asLong("set_size"),
startDate = asXmlGcOption("start_date"),
endDate = asXmlGcOption("end_date"),
description = asTextOption("description"),
statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get, //TODO: Avoid fragile .get call
statusMessage = asTextOption("query_status_type", "description"),
problemDigest = extractProblemDigest(xml \ "query_status_type"))
}
def errorResult(description: Option[String], statusMessage: String,problemDigest:ProblemDigest):QueryResult = {
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
def errorResult(description: Option[String], statusMessage: String,problem:Problem):QueryResult = {
val problemDigest = problem.toDigest
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
/**
* For reconstituting errorResults from a database
*/
//todo remove and replace with real Problems
def errorResult(description:Option[String], statusMessage:String, codec:String,stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq): QueryResult = {
val problemDigest = ProblemDigest(codec,stampText,summary,digestDescription,detailsXml)
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
}
\ No newline at end of file
diff --git a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
index bf4c5467a..2f14e3ebd 100644
--- a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
+++ b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
@@ -1,215 +1,221 @@
package net.shrine.qep
import net.shrine.log.Loggable
import net.shrine.qep.audit.QepAuditDb
import net.shrine.qep.dao.AuditDao
import net.shrine.authentication.Authenticator
import net.shrine.authorization.QueryAuthorizationService
import net.shrine.broadcaster.BroadcastAndAggregationService
import net.shrine.qep.queries.QepQueryDb
import scala.concurrent.duration.Duration
import net.shrine.util.XmlDateHelper
import scala.concurrent.Future
import scala.concurrent.Await
import net.shrine.protocol.{ReadPreviousQueriesResponse, RunQueryRequest, BaseShrineRequest, AuthenticationInfo, Credential, BaseShrineResponse, ReadQueryInstancesRequest, QueryInstance, ReadQueryInstancesResponse, ReadQueryDefinitionRequest, DeleteQueryRequest, ReadApprovedQueryTopicsRequest, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, RenameQueryRequest, ReadPdoRequest, FlagQueryRequest, UnFlagQueryRequest, ReadResultOutputTypesRequest, ReadResultOutputTypesResponse, ResultOutputType}
import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized}
import net.shrine.authentication.AuthenticationResult
import net.shrine.authentication.NotAuthenticatedException
import net.shrine.aggregation.RunQueryAggregator
import net.shrine.aggregation.Aggregators
import net.shrine.aggregation.Aggregator
import net.shrine.aggregation.ReadQueryDefinitionAggregator
import net.shrine.aggregation.DeleteQueryAggregator
import net.shrine.aggregation.ReadPdoResponseAggregator
import net.shrine.aggregation.RenameQueryAggregator
import net.shrine.aggregation.ReadInstanceResultsAggregator
import net.shrine.aggregation.FlagQueryAggregator
import net.shrine.aggregation.UnFlagQueryAggregator
/**
* @author clint
* @since Feb 19, 2014
*/
trait AbstractQepService[BaseResp <: BaseShrineResponse] extends Loggable {
val commonName:String
val auditDao: AuditDao
val authenticator: Authenticator
val authorizationService: QueryAuthorizationService
val includeAggregateResult: Boolean
val broadcastAndAggregationService: BroadcastAndAggregationService
val queryTimeout: Duration
val breakdownTypes: Set[ResultOutputType]
val collectQepAudit:Boolean
protected def doReadResultOutputTypes(request: ReadResultOutputTypesRequest): BaseResp = {
info(s"doReadResultOutputTypes($request)")
authenticateAndThen(request) { authResult =>
val resultOutputTypes = ResultOutputType.nonErrorTypes ++ breakdownTypes
//TODO: XXX: HACK: Would like to remove the cast
ReadResultOutputTypesResponse(resultOutputTypes).asInstanceOf[BaseResp]
}
}
protected def doFlagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = {
QepQueryDb.db.insertQepQueryFlag(request)
doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast)
}
protected def doUnFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = {
QepQueryDb.db.insertQepQueryFlag(request)
doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast)
}
protected def doRunQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseResp = {
info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor")
//store the query in the qep's database
doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast)
}
protected def doReadQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseResp = {
info(s"doReadQueryDefinition($request,$shouldBroadcast)")
doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast)
}
protected def doReadPdo(request: ReadPdoRequest, shouldBroadcast: Boolean): BaseResp = {
info(s"doReadPdo($request,$shouldBroadcast)")
doBroadcastQuery(request, new ReadPdoResponseAggregator, shouldBroadcast)
}
protected def doReadInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseResp = {
info(s"doReadInstanceResults($request,$shouldBroadcast)")
+
+ //todo try reading directly from the QEP database code here
+
doBroadcastQuery(request, new ReadInstanceResultsAggregator(request.shrineNetworkQueryId, false), shouldBroadcast)
}
protected def doReadQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseResp = {
info(s"doReadQueryInstances($request,$shouldBroadcast)")
authenticateAndThen(request) { authResult =>
val now = XmlDateHelper.now
val networkQueryId = request.queryId
val username = request.authn.username
val groupId = request.projectId
//NB: Return a dummy response, with a dummy QueryInstance containing the network (Shrine) id of the query we'd like
//to get "instances" for. This allows the legacy web client to formulate a request for query results that Shrine
//can understand, while meeting the conversational requirements of the legacy web client.
val instance = QueryInstance(networkQueryId.toString, networkQueryId.toString, username, groupId, now, now)
//TODO: XXX: HACK: Would like to remove the cast
//NB: Munge in username from authentication result
ReadQueryInstancesResponse(networkQueryId, authResult.username, groupId, Seq(instance)).asInstanceOf[BaseResp]
}
}
protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): ReadPreviousQueriesResponse = {
info(s"doReadPreviousQueries($request,$shouldBroadcast)")
- //pull results from the local database.
+
+ //check results. If any results are in one of many pending states, go ahead and request them. (Maybe go async)
+
+ //pull queries from the local database.
QepQueryDb.db.selectPreviousQueries(request)
}
protected def doRenameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseResp = {
info(s"doRenameQuery($request,$shouldBroadcast)")
doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast)
}
protected def doDeleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseResp = {
info(s"doDeleteQuery($request,$shouldBroadcast)")
doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast)
}
protected def doReadApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseResp = authenticateAndThen(request) { _ =>
info(s"doReadApprovedQueryTopics($request,$shouldBroadcast)")
//TODO: XXX: HACK: Would like to remove the cast
authorizationService.readApprovedEntries(request) match {
case Left(errorResponse) => errorResponse.asInstanceOf[BaseResp]
case Right(validResponse) => validResponse.asInstanceOf[BaseResp]
}
}
import broadcastAndAggregationService.sendAndAggregate
protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
debug(s"doBroadcastQuery($request) authResult is $authResult")
//NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another
//When making BroadcastMessages
val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false))
//NB: Only audit RunQueryRequests
request match {
case runQueryRequest: RunQueryRequest =>
// inject modified, authorized runQueryRequest
auditAuthorizeAndThen(runQueryRequest) { authorizedRequest =>
debug(s"doBroadcastQuery authorizedRequest is $authorizedRequest")
// tuck the ACT audit metrics data into a database here
if (collectQepAudit) QepAuditDb.db.insertQepQuery(authorizedRequest,commonName)
QepQueryDb.db.insertQepQuery(authorizedRequest)
doSynchronousQuery(networkAuthn,authorizedRequest,aggregator,shouldBroadcast)
}
case _ => doSynchronousQuery(networkAuthn,request,aggregator,shouldBroadcast)
}
}
}
private def doSynchronousQuery(networkAuthn: AuthenticationInfo,request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean) = {
info(s"doSynchronousQuery($request) started")
val response = waitFor(sendAndAggregate(networkAuthn, request, aggregator, shouldBroadcast)).asInstanceOf[BaseResp]
info(s"doSynchronousQuery($request) completed with response $response")
response
}
private[qep] val runQueryAggregatorFor: RunQueryRequest => RunQueryAggregator = Aggregators.forRunQueryRequest(includeAggregateResult)
protected def waitFor[R](futureResponse: Future[R]): R = {
XmlDateHelper.time("Waiting for aggregated results")(debug(_)) {
Await.result(futureResponse, queryTimeout)
}
}
private[qep] def auditAuthorizeAndThen[T](request: RunQueryRequest)(body: (RunQueryRequest => T)): T = {
auditTransactionally(request) {
debug(s"auditAuthorizeAndThen($request) with $authorizationService")
val authorizedRequest = authorizationService.authorizeRunQueryRequest(request) match {
case na: NotAuthorized => throw na.toException
case authorized: Authorized => request.copy(topicName = authorized.topicIdAndName.map(x => x._2))
}
body(authorizedRequest)
}
}
private[qep] def auditTransactionally[T](request: RunQueryRequest)(body: => T): T = {
try { body } finally {
auditDao.addAuditEntry(
request.projectId,
request.authn.domain,
request.authn.username,
request.queryDefinition.toI2b2String, //TODO: Use i2b2 format Still?
request.topicId)
}
}
import AuthenticationResult._
private[qep] def authenticateAndThen[T](request: BaseShrineRequest)(f: Authenticated => T): T = {
val AuthenticationInfo(domain, username, _) = request.authn
val authResult = authenticator.authenticate(request.authn)
authResult match {
case a: Authenticated => f(a)
case NotAuthenticated(_, _, reason) => throw new NotAuthenticatedException(s"User $domain:$username could not be authenticated: $reason")
}
}
}
\ No newline at end of file
diff --git a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
index 7d04a171d..fe0593626 100644
--- a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
+++ b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
@@ -1,264 +1,302 @@
package net.shrine.qep.queries
import java.sql.SQLException
import java.util.GregorianCalendar
import javax.sql.DataSource
import javax.xml.datatype.DatatypeFactory
import com.typesafe.config.Config
import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName}
import net.shrine.log.Loggable
import net.shrine.protocol.{UnFlagQueryRequest, FlagQueryRequest, QueryMaster, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RunQueryRequest}
import net.shrine.qep.QepConfigSource
import net.shrine.slick.TestableDataSourceCreator
import slick.driver.JdbcProfile
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, Future, blocking}
import scala.language.postfixOps
/**
* DB code for the QEP's query instances and query results.
*
* @author david
* @since 1/19/16
*/
case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource) extends Loggable {
import schemaDef._
import jdbcProfile.api._
val database = Database.forDataSource(dataSource)
def createTables() = schemaDef.createTables(database)
def dropTables() = schemaDef.dropTables(database)
def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = {
val future: Future[R] = database.run(action)
blocking {
Await.result(future, 10 seconds)
}
}
def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = {
debug(s"insertQepQuery $runQueryRequest")
insertQepQuery(QepQuery(runQueryRequest))
}
def insertQepQuery(qepQuery: QepQuery):Unit = {
dbRun(allQepQueryQuery += qepQuery)
}
def selectAllQepQueries:Seq[QepQuery] = {
dbRun(allQepQueryQuery.result)
}
def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = {
val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain)
val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set])
val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId)))
ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2)))
}
def selectPreviousQueriesByUserAndDomain(userName: UserName,domain: String):Seq[QepQuery] = {
dbRun(allQepQueryQuery.filter(_.userName === userName).filter(_.userDomain === domain).result)
}
def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(flagQueryRequest))
}
def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(unflagQueryRequest))
}
def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = {
debug(s"insertQepQueryFlag $qepQueryFlag")
dbRun(allQepQueryFlags += qepQueryFlag)
}
def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = {
val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result)
flags.map(x => x.networkQueryId -> x).toMap
}
}
object QepQueryDb extends Loggable {
val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config)
val db = QepQueryDb(QepQuerySchema.schema,dataSource)
val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart")
if(createTablesOnStart) QepQueryDb.db.createTables()
}
/**
* Separate class to support schema generation without actually connecting to the database.
*
* @param jdbcProfile Database profile to use for the schema
*/
case class QepQuerySchema(jdbcProfile: JdbcProfile) extends Loggable {
import jdbcProfile.api._
def ddlForAllTables: jdbcProfile.DDL = {
allQepQueryQuery.schema ++ allQepQueryFlags.schema
}
//to get the schema, use the REPL
//println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n"))
def createTables(database:Database) = {
try {
val future = database.run(ddlForAllTables.create)
Await.result(future,10 seconds)
} catch {
//I'd prefer to check and create schema only if absent. No way to do that with Oracle.
case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x)
}
}
def dropTables(database:Database) = {
val future = database.run(ddlForAllTables.drop)
//Really wait forever for the cleanup
Await.result(future,Duration.Inf)
}
- /**
- * The adapter's query table looks like this:
- *
- mysql> describe SHRINE_QUERY;
-+------------------+--------------+------+-----+-------------------+----------------+
-| Field | Type | Null | Key | Default | Extra |
-+------------------+--------------+------+-----+-------------------+----------------+
-| id | int(11) | NO | PRI | NULL | auto_increment |
-| local_id | varchar(255) | NO | MUL | NULL | |
-| network_id | bigint(20) | NO | MUL | NULL | |
-| username | varchar(255) | NO | MUL | NULL | |
-| domain | varchar(255) | NO | | NULL | |
-| query_name | varchar(255) | NO | | NULL | |
-| query_expression | text | YES | | NULL | |
-| date_created | timestamp | NO | | CURRENT_TIMESTAMP | |
-| has_been_run | tinyint(1) | NO | | 0 | |
-| flagged | tinyint(1) | NO | | 0 | |
-| flag_message | varchar(255) | YES | | NULL | |
-| query_xml | text | YES | | NULL | |
-+------------------+--------------+------+-----+-------------------+----------------+
- */
-
class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") {
def networkId = column[NetworkQueryId]("networkId")
def userName = column[UserName]("userName")
def userDomain = column[String]("domain")
def queryName = column[QueryName]("queryName")
def expression = column[String]("expression")
def dateCreated = column[Time]("dateCreated")
def queryXml = column[String]("queryXml")
def * = (networkId,userName,userDomain,queryName,expression,dateCreated,queryXml) <> (QepQuery.tupled,QepQuery.unapply)
}
val allQepQueryQuery = TableQuery[QepQueries]
class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") {
def networkId = column[NetworkQueryId]("networkId")
def flagged = column[Boolean]("flagged")
def flagMessage = column[String]("flagMessage")
def changeDate = column[Long]("changeDate")
def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply)
}
val allQepQueryFlags = TableQuery[QepQueryFlags]
val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for(
queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists
) yield queryFlags
+
+ /**
+ * The adapter's QUERY_RESULTS table looks like this:
+ *
+ * mysql> describe QUERY_RESULT;
++--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----+-------------------+----------------+
+| Field | Type | Null | Key | Default | Extra |
++--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----+-------------------+----------------+
+| id | int(11) | NO | PRI | NULL | auto_increment |
+| local_id | varchar(255) | NO | | NULL | |
+| query_id | int(11) | NO | MUL | NULL | |
+| type | enum('PATIENTSET','PATIENT_COUNT_XML','PATIENT_AGE_COUNT_XML','PATIENT_RACE_COUNT_XML','PATIENT_VITALSTATUS_COUNT_XML','PATIENT_GENDER_COUNT_XML','ERROR') | NO | | NULL | |
+| status | enum('FINISHED','ERROR','PROCESSING','QUEUED') | NO | | NULL | |
+| time_elapsed | int(11) | YES | | NULL | |
+| last_updated | timestamp | NO | | CURRENT_TIMESTAMP | |
++--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----+-------------------+----------------+
+
+ with some other aux tables to hold specifics:
+
+ mysql> describe COUNT_RESULT;
++------------------+-----------+------+-----+-------------------+----------------+
+| Field | Type | Null | Key | Default | Extra |
++------------------+-----------+------+-----+-------------------+----------------+
+| id | int(11) | NO | PRI | NULL | auto_increment |
+| result_id | int(11) | NO | MUL | NULL | |
+| original_count | int(11) | NO | | NULL | |
+| obfuscated_count | int(11) | NO | | NULL | |
+| date_created | timestamp | NO | | CURRENT_TIMESTAMP | |
++------------------+-----------+------+-----+-------------------+----------------+
+
+ mysql> describe BREAKDOWN_RESULT;
++------------------+--------------+------+-----+---------+----------------+
+| Field | Type | Null | Key | Default | Extra |
++------------------+--------------+------+-----+---------+----------------+
+| id | int(11) | NO | PRI | NULL | auto_increment |
+| result_id | int(11) | NO | MUL | NULL | |
+| data_key | varchar(255) | NO | | NULL | |
+| original_value | int(11) | NO | | NULL | |
+| obfuscated_value | int(11) | NO | | NULL | |
++------------------+--------------+------+-----+---------+----------------+
+
+ mysql> describe ERROR_RESULT;
++---------------------+--------------+------+-----+--------------------------+----------------+
+| Field | Type | Null | Key | Default | Extra |
++---------------------+--------------+------+-----+--------------------------+----------------+
+| id | int(11) | NO | PRI | NULL | auto_increment |
+| result_id | int(11) | NO | MUL | NULL | |
+| message | varchar(255) | NO | | NULL | |
+| CODEC | varchar(256) | NO | | Pre-1.20 Error | |
+| SUMMARY | text | NO | | NULL | |
+| DESCRIPTION | text | NO | | NULL | |
+| PROBLEM_DESCRIPTION | text | NO | | NULL | |
+| DETAILS | text | NO | | NULL | |
+| STAMP | varchar(256) | NO | | Unknown time and machine | |
++---------------------+--------------+------+-----+--------------------------+----------------+
+
+ */
+
+
+
}
object QepQuerySchema {
val allConfig:Config = QepConfigSource.config
val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database")
val slickProfileClassName = config.getString("slickProfileClassName")
val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName)
val schema = QepQuerySchema(slickProfile)
}
case class QepQuery(
networkId:NetworkQueryId,
userName: UserName,
userDomain: String,
queryName: QueryName,
expression: String,
dateCreated: Time,
queryXml:String
){
def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = {
val gregorianCalendar = new GregorianCalendar()
gregorianCalendar.setTimeInMillis(dateCreated)
val xmlGregorianCalendar = DatatypeFactory.newInstance().newXMLGregorianCalendar(gregorianCalendar)
QueryMaster(
queryMasterId = networkId.toString,
networkQueryId = networkId,
name = queryName,
userId = userName,
groupId = userDomain,
createDate = xmlGregorianCalendar,
held = None, //todo if a query is held at the adapter, how will we know? do we care? Question out to Bill and leadership
flagged = qepQueryFlag.map(_.flagged),
flagMessage = qepQueryFlag.map(_.flagMessage)
)
}
}
object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,String,Time,String) => QepQuery) {
def apply(runQueryRequest: RunQueryRequest):QepQuery = {
new QepQuery(
networkId = runQueryRequest.networkQueryId,
userName = runQueryRequest.authn.username,
userDomain = runQueryRequest.authn.domain,
queryName = runQueryRequest.queryDefinition.name,
expression = runQueryRequest.queryDefinition.expr.getOrElse("No Expression").toString,
dateCreated = System.currentTimeMillis(),
queryXml = runQueryRequest.toXmlString
)
}
}
case class QepQueryFlag(
networkQueryId: NetworkQueryId,
flagged:Boolean,
flagMessage:String,
changeDate:Long
)
object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) {
def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = flagQueryRequest.networkQueryId,
flagged = true,
flagMessage = flagQueryRequest.message.getOrElse(""),
changeDate = System.currentTimeMillis()
)
}
def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = unflagQueryRequest.networkQueryId,
flagged = false,
flagMessage = "",
changeDate = System.currentTimeMillis()
)
}
}