diff --git a/commons/auth/src/main/scala/net/shrine/authentication/Authenticator.scala b/commons/auth/src/main/scala/net/shrine/authentication/Authenticator.scala index 71bf6673e..5483dd9b3 100644 --- a/commons/auth/src/main/scala/net/shrine/authentication/Authenticator.scala +++ b/commons/auth/src/main/scala/net/shrine/authentication/Authenticator.scala @@ -1,11 +1,11 @@ package net.shrine.authentication import net.shrine.protocol.AuthenticationInfo /** * @author clint - * @date Dec 12, 2013 + * @since Dec 12, 2013 */ trait Authenticator { def authenticate(authn: AuthenticationInfo): AuthenticationResult } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala index 3e3dae7ea..dd2e1e605 100644 --- a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala +++ b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala @@ -1,236 +1,250 @@ package net.shrine.qep import net.shrine.aggregation.{Aggregator, Aggregators, DeleteQueryAggregator, FlagQueryAggregator, ReadInstanceResultsAggregator, ReadQueryDefinitionAggregator, RenameQueryAggregator, RunQueryAggregator, UnFlagQueryAggregator} +import net.shrine.authentication.AuthenticationResult.Authenticated import net.shrine.authentication.{AuthenticationResult, Authenticator, NotAuthenticatedException} import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized} import net.shrine.authorization.QueryAuthorizationService import net.shrine.broadcaster.BroadcastAndAggregationService import net.shrine.log.Loggable import net.shrine.protocol.{QueryResult, AggregatedReadInstanceResultsResponse, AggregatedRunQueryResponse, AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, Credential, DeleteQueryRequest, FlagQueryRequest, QueryInstance, ReadApprovedQueryTopicsRequest, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, ReadQueryDefinitionRequest, ReadQueryInstancesRequest, ReadQueryInstancesResponse, ReadResultOutputTypesRequest, ReadResultOutputTypesResponse, RenameQueryRequest, ResultOutputType, RunQueryRequest, UnFlagQueryRequest} import net.shrine.qep.audit.QepAuditDb import net.shrine.qep.dao.AuditDao import net.shrine.qep.queries.QepQueryDb import net.shrine.util.XmlDateHelper import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration /** * @author clint * @since Feb 19, 2014 */ trait AbstractQepService[BaseResp <: BaseShrineResponse] extends Loggable { val commonName:String val auditDao: AuditDao val authenticator: Authenticator val authorizationService: QueryAuthorizationService val includeAggregateResult: Boolean val broadcastAndAggregationService: BroadcastAndAggregationService val queryTimeout: Duration val breakdownTypes: Set[ResultOutputType] val collectQepAudit:Boolean protected def doReadResultOutputTypes(request: ReadResultOutputTypesRequest): BaseResp = { info(s"doReadResultOutputTypes($request)") authenticateAndThen(request) { authResult => val resultOutputTypes = ResultOutputType.nonErrorTypes ++ breakdownTypes //TODO: XXX: HACK: Would like to remove the cast ReadResultOutputTypesResponse(resultOutputTypes).asInstanceOf[BaseResp] } } protected def doFlagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { - QepQueryDb.db.insertQepQueryFlag(request) - doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast) + authenticateAndThen(request) { authResult => + QepQueryDb.db.insertQepQueryFlag(request) + doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast,authResult) + } } protected def doUnFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { - QepQueryDb.db.insertQepQueryFlag(request) - doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast) + authenticateAndThen(request) { authResult => + QepQueryDb.db.insertQepQueryFlag(request) + doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast,authResult) + } } protected def doRunQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseResp = { - info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor") + authenticateAndThen(request) { authResult => + info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor") - //store the query in the qep's database + //store the query in the qep's database - doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast) + doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast,authResult) + } } protected def doReadQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseResp = { - info(s"doReadQueryDefinition($request,$shouldBroadcast)") + authenticateAndThen(request) { authResult => + info(s"doReadQueryDefinition($request,$shouldBroadcast)") - doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast) + doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast,authResult) + } } protected def doReadInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseResp = { - info(s"doReadInstanceResults($request,$shouldBroadcast)") - - val networkId = request.shrineNetworkQueryId + authenticateAndThen(request) { authResult => + info(s"doReadInstanceResults($request,$shouldBroadcast)") - //read from the QEP database code here. Only broadcast if some result is in some sketchy state - val resultsFromDb:Seq[QueryResult] = QepQueryDb.db.selectMostRecentQepResultsFor(networkId) + val networkId = request.shrineNetworkQueryId - //If any query result was pending - val response = if(resultsFromDb.nonEmpty && (!resultsFromDb.exists(!_.statusType.isDone))) { - debug(s"Using qep cached results for query $networkId") - AggregatedReadInstanceResultsResponse(networkId,resultsFromDb).asInstanceOf[BaseResp] - } - else { - debug(s"Requesting results for $networkId from network") - val response = doBroadcastQuery(request, new ReadInstanceResultsAggregator(networkId, false), shouldBroadcast) + //read from the QEP database code here. Only broadcast if some result is in some sketchy state + val resultsFromDb: Seq[QueryResult] = QepQueryDb.db.selectMostRecentQepResultsFor(networkId) - //put the new results in the database if we got what we wanted - response match { - case arirr:AggregatedReadInstanceResultsResponse => arirr.results.foreach(r => QepQueryDb.db.insertQueryResult(networkId,r)) - case _ => //do nothing + //If any query result was pending + val response = if (resultsFromDb.nonEmpty && (!resultsFromDb.exists(!_.statusType.isDone))) { + debug(s"Using qep cached results for query $networkId") + AggregatedReadInstanceResultsResponse(networkId, resultsFromDb).asInstanceOf[BaseResp] } + else { + debug(s"Requesting results for $networkId from network") + val response = doBroadcastQuery(request, new ReadInstanceResultsAggregator(networkId, false), shouldBroadcast,authResult) + + //put the new results in the database if we got what we wanted + response match { + case arirr: AggregatedReadInstanceResultsResponse => arirr.results.foreach(r => QepQueryDb.db.insertQueryResult(networkId, r)) + case _ => //do nothing + } + response + } response } - response } protected def doReadQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseResp = { - info(s"doReadQueryInstances($request,$shouldBroadcast)") authenticateAndThen(request) { authResult => + info(s"doReadQueryInstances($request,$shouldBroadcast)") val now = XmlDateHelper.now val networkQueryId = request.networkQueryId val username = request.authn.username val groupId = request.projectId //NB: Return a dummy response, with a dummy QueryInstance containing the network (Shrine) id of the query we'd like //to get "instances" for. This allows the legacy web client to formulate a request for query results that Shrine //can understand, while meeting the conversational requirements of the legacy web client. val instance = QueryInstance(networkQueryId.toString, networkQueryId.toString, username, groupId, now, now) //TODO: XXX: HACK: Would like to remove the cast //NB: Munge in username from authentication result ReadQueryInstancesResponse(networkQueryId, authResult.username, groupId, Seq(instance)).asInstanceOf[BaseResp] } } protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): ReadPreviousQueriesResponse = { - info(s"doReadPreviousQueries($request,$shouldBroadcast)") - - //todo if any results are in one of the pending states go ahead and request them async (has to wait for async Shrine) + authenticateAndThen(request){ authResult => + info(s"doReadPreviousQueries($request,$shouldBroadcast)") - //pull queries from the local database. - QepQueryDb.db.selectPreviousQueries(request) + //todo if any results are in one of the pending states go ahead and request them async (has to wait for async Shrine) + //pull queries from the local database. + QepQueryDb.db.selectPreviousQueries(request) + } } protected def doRenameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseResp = { - info(s"doRenameQuery($request,$shouldBroadcast)") - QepQueryDb.db.renamePreviousQuery(request) - doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast) + authenticateAndThen(request) { authResult => + info(s"doRenameQuery($request,$shouldBroadcast)") + QepQueryDb.db.renamePreviousQuery(request) + doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast,authResult) + } } protected def doDeleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseResp = { - info(s"doDeleteQuery($request,$shouldBroadcast)") - QepQueryDb.db.deleteQuery(request) - doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast) + authenticateAndThen(request) { authResult => + info(s"doDeleteQuery($request,$shouldBroadcast)") + QepQueryDb.db.markDeleted(request) + doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast,authResult) + } } protected def doReadApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseResp = authenticateAndThen(request) { _ => info(s"doReadApprovedQueryTopics($request,$shouldBroadcast)") //TODO: XXX: HACK: Would like to remove the cast authorizationService.readApprovedEntries(request) match { case Left(errorResponse) => errorResponse.asInstanceOf[BaseResp] case Right(validResponse) => validResponse.asInstanceOf[BaseResp] } } import broadcastAndAggregationService.sendAndAggregate - protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean): BaseResp = { - - authenticateAndThen(request) { authResult => + protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean, authResult:Authenticated): BaseResp = { - debug(s"doBroadcastQuery($request) authResult is $authResult") - //NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another - //When making BroadcastMessages - val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false)) + debug(s"doBroadcastQuery($request) authResult is $authResult") + //NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another + //When making BroadcastMessages + val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false)) - //NB: Only audit RunQueryRequests - request match { - case runQueryRequest: RunQueryRequest => - // inject modified, authorized runQueryRequest + //NB: Only audit RunQueryRequests + request match { + case runQueryRequest: RunQueryRequest => + // inject modified, authorized runQueryRequest //although it might make more sense to put this whole if block in the aggregator, the RunQueryAggregator lives in the hub, far from this DB code - auditAuthorizeAndThen(runQueryRequest) { authorizedRequest => - debug(s"doBroadcastQuery authorizedRequest is $authorizedRequest") + auditAuthorizeAndThen(runQueryRequest) { authorizedRequest => + debug(s"doBroadcastQuery authorizedRequest is $authorizedRequest") - // tuck the ACT audit metrics data into a database here - if (collectQepAudit) QepAuditDb.db.insertQepQuery(authorizedRequest,commonName) - QepQueryDb.db.insertQepQuery(authorizedRequest) + // tuck the ACT audit metrics data into a database here + if (collectQepAudit) QepAuditDb.db.insertQepQuery(authorizedRequest,commonName) + QepQueryDb.db.insertQepQuery(authorizedRequest) - val response: BaseResp = doSynchronousQuery(networkAuthn,authorizedRequest,aggregator,shouldBroadcast) + val response: BaseResp = doSynchronousQuery(networkAuthn,authorizedRequest,aggregator,shouldBroadcast) - response match { - //todo do in one transaction - case aggregated:AggregatedRunQueryResponse => aggregated.results.foreach(QepQueryDb.db.insertQueryResult(runQueryRequest.networkQueryId,_)) - case _ => debug(s"Unanticipated response type $response") - } - - response + response match { + //todo do in one transaction + case aggregated:AggregatedRunQueryResponse => aggregated.results.foreach(QepQueryDb.db.insertQueryResult(runQueryRequest.networkQueryId,_)) + case _ => debug(s"Unanticipated response type $response") } - case _ => doSynchronousQuery(networkAuthn,request,aggregator,shouldBroadcast) - } + + response + } + case _ => doSynchronousQuery(networkAuthn,request,aggregator,shouldBroadcast) } } private def doSynchronousQuery(networkAuthn: AuthenticationInfo,request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean) = { info(s"doSynchronousQuery($request) started") val response = waitFor(sendAndAggregate(networkAuthn, request, aggregator, shouldBroadcast)).asInstanceOf[BaseResp] info(s"doSynchronousQuery($request) completed with response $response") response } private[qep] val runQueryAggregatorFor: RunQueryRequest => RunQueryAggregator = Aggregators.forRunQueryRequest(includeAggregateResult) protected def waitFor[R](futureResponse: Future[R]): R = { XmlDateHelper.time("Waiting for aggregated results")(debug(_)) { Await.result(futureResponse, queryTimeout) } } private[qep] def auditAuthorizeAndThen[T](request: RunQueryRequest)(body: (RunQueryRequest => T)): T = { auditTransactionally(request) { debug(s"auditAuthorizeAndThen($request) with $authorizationService") val authorizedRequest = authorizationService.authorizeRunQueryRequest(request) match { case na: NotAuthorized => throw na.toException case authorized: Authorized => request.copy(topicName = authorized.topicIdAndName.map(x => x._2)) } body(authorizedRequest) } } private[qep] def auditTransactionally[T](request: RunQueryRequest)(body: => T): T = { try { body } finally { auditDao.addAuditEntry( request.projectId, request.authn.domain, request.authn.username, request.queryDefinition.toI2b2String, //TODO: Use i2b2 format Still? request.topicId) } } import AuthenticationResult._ private[qep] def authenticateAndThen[T](request: BaseShrineRequest)(f: Authenticated => T): T = { val AuthenticationInfo(domain, username, _) = request.authn val authResult = authenticator.authenticate(request.authn) authResult match { case a: Authenticated => f(a) + //todo this exception is never caught. Fix that with SHRINE-1322 case NotAuthenticated(_, _, reason) => throw new NotAuthenticatedException(s"User $domain:$username could not be authenticated: $reason") } } } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/QepService.scala b/qep/service/src/main/scala/net/shrine/qep/QepService.scala index 6f2015c84..16731113b 100644 --- a/qep/service/src/main/scala/net/shrine/qep/QepService.scala +++ b/qep/service/src/main/scala/net/shrine/qep/QepService.scala @@ -1,62 +1,64 @@ package net.shrine.qep import net.shrine.aggregation.{ReadQueryResultAggregator, ReadTranslatedQueryDefinitionAggregator} import net.shrine.authentication.Authenticator import net.shrine.authorization.QueryAuthorizationService import net.shrine.broadcaster.BroadcastAndAggregationService import net.shrine.protocol.{BaseShrineResponse, DeleteQueryRequest, FlagQueryRequest, ReadApprovedQueryTopicsRequest, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, ReadQueryDefinitionRequest, ReadQueryInstancesRequest, ReadQueryResultRequest, ReadTranslatedQueryDefinitionRequest, RenameQueryRequest, ResultOutputType, RunQueryRequest, ShrineRequestHandler, UnFlagQueryRequest} import net.shrine.qep.dao.AuditDao import scala.concurrent.duration.Duration /** * @author Bill Simons * @since 3/23/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ final case class QepService( commonName:String, auditDao: AuditDao, authenticator: Authenticator, authorizationService: QueryAuthorizationService, includeAggregateResult: Boolean, broadcastAndAggregationService: BroadcastAndAggregationService, queryTimeout: Duration, breakdownTypes: Set[ResultOutputType], collectQepAudit:Boolean) extends AbstractQepService[BaseShrineResponse] with ShrineRequestHandler { debug(s"ShrineService collectQepAudit is $collectQepAudit") override def flagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true) = doFlagQuery(request, shouldBroadcast) override def unFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true) = doUnFlagQuery(request, shouldBroadcast) - override def readTranslatedQueryDefinition(request: ReadTranslatedQueryDefinitionRequest, shouldBroadcast: Boolean = true): BaseShrineResponse = { - doBroadcastQuery(request, new ReadTranslatedQueryDefinitionAggregator, shouldBroadcast) + override def readTranslatedQueryDefinition(request: ReadTranslatedQueryDefinitionRequest, shouldBroadcast: Boolean = true): BaseShrineResponse = authenticateAndThen(request) { authResult => { + doBroadcastQuery(request, new ReadTranslatedQueryDefinitionAggregator, shouldBroadcast,authResult) + } } override def runQuery(request: RunQueryRequest, shouldBroadcast: Boolean) = doRunQuery(request, shouldBroadcast) override def readQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean) = doReadQueryDefinition(request, shouldBroadcast) override def readInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean) = doReadInstanceResults(request, shouldBroadcast) override def readQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean) = doReadQueryInstances(request, shouldBroadcast) override def readPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean) = doReadPreviousQueries(request, shouldBroadcast) override def renameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean) = doRenameQuery(request, shouldBroadcast) override def deleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean) = doDeleteQuery(request, shouldBroadcast) override def readApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean) = doReadApprovedQueryTopics(request, shouldBroadcast) override def readQueryResult(request: ReadQueryResultRequest, shouldBroadcast: Boolean) = { - doBroadcastQuery(request, new ReadQueryResultAggregator(request.queryId, includeAggregateResult), shouldBroadcast) + authenticateAndThen(request) { authResult => doBroadcastQuery(request, new ReadQueryResultAggregator(request.queryId, includeAggregateResult), shouldBroadcast, authResult) + } } } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala index 1484a053f..d7372e826 100644 --- a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala +++ b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala @@ -1,528 +1,528 @@ package net.shrine.qep.queries import java.sql.SQLException import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName} import net.shrine.log.Loggable import net.shrine.problem.ProblemDigest import net.shrine.protocol.{ResultOutputTypes, DeleteQueryRequest, RenameQueryRequest, I2b2ResultEnvelope, QueryResult, ResultOutputType, DefaultBreakdownResultOutputTypes, UnFlagQueryRequest, FlagQueryRequest, QueryMaster, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RunQueryRequest} import net.shrine.qep.QepConfigSource import net.shrine.slick.TestableDataSourceCreator import net.shrine.util.XmlDateHelper import slick.driver.JdbcProfile import scala.collection.immutable.Iterable import scala.concurrent.duration.{Duration, DurationInt} import scala.concurrent.{Await, Future, blocking} import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.xml.XML /** * DB code for the QEP's query instances and query results. * * @author david * @since 1/19/16 */ case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) blocking { Await.result(future, 10 seconds) } } def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = { debug(s"insertQepQuery $runQueryRequest") insertQepQuery(QepQuery(runQueryRequest)) } def insertQepQuery(qepQuery: QepQuery):Unit = { dbRun(allQepQueryQuery += qepQuery) } def selectAllQepQueries:Seq[QepQuery] = { dbRun(mostRecentVisibleQepQueries.result) } //todo order def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = { val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain) val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set]) val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId))) ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2))) } //todo order def selectPreviousQueriesByUserAndDomain(userName: UserName,domain: String):Seq[QepQuery] = { dbRun(mostRecentVisibleQepQueries.filter(_.userName === userName).filter(_.userDomain === domain).result) } def renamePreviousQuery(request:RenameQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName,changeDate = System.currentTimeMillis())) } yield queryResults ) } - def deleteQuery(request:DeleteQueryRequest):Unit = { + def markDeleted(request:DeleteQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true,changeDate = System.currentTimeMillis())) } yield queryResults ) } def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(flagQueryRequest)) } def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(unflagQueryRequest)) } def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = { dbRun(allQepQueryFlags += qepQueryFlag) } def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = { val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result) flags.map(x => x.networkQueryId -> x).toMap } def insertQepResultRow(qepQueryRow:QueryResultRow) = { dbRun(allQueryResultRows += qepQueryRow) } def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = { val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node")) val queryResultRow = QueryResultRow(networkQueryId,result) val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_)) val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq] dbRun( for { _ <- allQueryResultRows += queryResultRow _ <- allBreakdownResultsRows ++= breakdowns _ <- allProblemDigestRows ++= problem } yield () ) } def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = { dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result) } def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = { val (queryResults, breakdowns,problems) = dbRun( for { queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result breakdowns <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result } yield (queryResults, breakdowns, problems) ) val resultIdsToI2b2ResultEnvelopes: Map[Long, Map[ResultOutputType, I2b2ResultEnvelope]] = breakdowns.groupBy(_.resultId).map(rIdToB => rIdToB._1 -> QepQueryBreakdownResultsRow.resultEnvelopesFrom(rIdToB._2)) def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = { if(problemSeq.size == 1) problemSeq.head.toProblemDigest else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq") } val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) ) queryResults.map(r => r.toQueryResult( resultIdsToI2b2ResultEnvelopes.getOrElse(r.resultId,Map.empty), adapterNodesToProblemDigests.get(r.adapterNode) )) } def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = { dbRun(allBreakdownResultsRows += breakdownResultsRow) } def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = { dbRun(allBreakdownResultsRows.result) } } object QepQueryDb extends Loggable { val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config) val db = QepQueryDb(QepQuerySchema.schema,dataSource) val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) QepQueryDb.db.createTables() } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepQuerySchema(jdbcProfile: JdbcProfile,moreBreakdowns: Set[ResultOutputType]) extends Loggable { import jdbcProfile.api._ def ddlForAllTables: jdbcProfile.DDL = { allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema } //to get the schema, use the REPL //println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") { def networkId = column[NetworkQueryId]("networkId") def userName = column[UserName]("userName") def userDomain = column[String]("domain") def queryName = column[QueryName]("queryName") def expression = column[String]("expression") def dateCreated = column[Time]("dateCreated") def deleted = column[Boolean]("deleted") def queryXml = column[String]("queryXml") def changeDate = column[Long]("changeDate") def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply) } val allQepQueryQuery = TableQuery[QepQueries] val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for( queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists ) yield queries val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false) class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") { def networkId = column[NetworkQueryId]("networkId") def flagged = column[Boolean]("flagged") def flagMessage = column[String]("flagMessage") def changeDate = column[Long]("changeDate") def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply) } val allQepQueryFlags = TableQuery[QepQueryFlags] val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for( queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists ) yield queryFlags //todo there may be other custom breakdowns in the config. Use that as the source val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values ++ moreBreakdowns val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap) implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({ (resultType: ResultOutputType) => queryResultTypesToString(resultType) },{ (string: String) => stringsToQueryResultTypes(string) }) implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({ statusType => statusType.name },{ name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}")) }) class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") { def resultId = column[Long]("resultId") def networkQueryId = column[NetworkQueryId]("networkQueryId") def instanceId = column[Long]("instanceId") def adapterNode = column[String]("adapterNode") def resultType = column[ResultOutputType]("resultType") def size = column[Long]("size") def startDate = column[Option[Long]]("startDate") def endDate = column[Option[Long]]("endDate") def status = column[QueryResult.StatusType]("status") def statusMessage = column[Option[String]]("statusMessage") def changeDate = column[Long]("changeDate") def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply) } val allQueryResultRows = TableQuery[QepQueryResults] //Most recent query result rows for each queryId from each adapter val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for( queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists ) yield queryResultRows class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def resultId = column[Long]("resultId") def resultType = column[ResultOutputType]("resultType") def dataKey = column[String]("dataKey") def value = column[Long]("value") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply) } val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults] //Most recent query result rows for each queryId from each adapter val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for( breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists ) yield breakdownResultsRows /* case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller { */ class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def codec = column[String]("codec") def stamp = column[String]("stamp") def summary = column[String]("summary") def description = column[String]("description") def details = column[String]("details") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply) } val allProblemDigestRows = TableQuery[QepResultProblemDigests] val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for( problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists ) yield problemDigests } object QepQuerySchema { val allConfig:Config = QepConfigSource.config val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName) import net.shrine.config.{ConfigExtensions, Keys} val moreBreakdowns: Set[ResultOutputType] = config.getOptionConfigured(Keys.breakdownResultOutputTypes,ResultOutputTypes.fromConfig).getOrElse(Set.empty) val schema = QepQuerySchema(slickProfile,moreBreakdowns) } case class QepQuery( networkId:NetworkQueryId, userName: UserName, userDomain: String, queryName: QueryName, expression: String, dateCreated: Time, deleted: Boolean, queryXml: String, changeDate: Time ){ def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = { QueryMaster( queryMasterId = networkId.toString, networkQueryId = networkId, name = queryName, userId = userName, groupId = userDomain, createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated), held = None, //todo if a query is held at the adapter, how will we know? do we care? Question out to Bill and leadership flagged = qepQueryFlag.map(_.flagged), flagMessage = qepQueryFlag.map(_.flagMessage) ) } } object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,String,Time,Boolean,String,Time) => QepQuery) { def apply(runQueryRequest: RunQueryRequest):QepQuery = { new QepQuery( networkId = runQueryRequest.networkQueryId, userName = runQueryRequest.authn.username, userDomain = runQueryRequest.authn.domain, queryName = runQueryRequest.queryDefinition.name, expression = runQueryRequest.queryDefinition.expr.getOrElse("No Expression").toString, dateCreated = System.currentTimeMillis(), deleted = false, queryXml = runQueryRequest.toXmlString, changeDate = System.currentTimeMillis() ) } } case class QepQueryFlag( networkQueryId: NetworkQueryId, flagged:Boolean, flagMessage:String, changeDate:Long ) object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) { def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = flagQueryRequest.networkQueryId, flagged = true, flagMessage = flagQueryRequest.message.getOrElse(""), changeDate = System.currentTimeMillis() ) } def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = unflagQueryRequest.networkQueryId, flagged = false, flagMessage = "", changeDate = System.currentTimeMillis() ) } } /* //todo problemDigest in a separate table problemDigest: Option[ProblemDigest] = None, //todo breakdowns in a separate table breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty */ case class QueryResultRow( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:ResultOutputType, size:Long, startDate:Option[Long], endDate:Option[Long], status:QueryResult.StatusType, statusMessage:Option[String], changeDate:Long ) { def toQueryResult(breakdowns:Map[ResultOutputType,I2b2ResultEnvelope],problemDigest:Option[ProblemDigest]) = QueryResult( resultId = resultId, instanceId = instanceId, resultType = Some(resultType), setSize = size, startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar), endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar), description = Some(adapterNode), statusType = status, statusMessage = statusMessage, breakdowns = breakdowns, problemDigest = problemDigest ) } object QueryResultRow extends ((Long,NetworkQueryId,Long,String,ResultOutputType,Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow) { def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = { new QueryResultRow( resultId = result.resultId, networkQueryId = networkQueryId, instanceId = result.instanceId, adapterNode = result.description.getOrElse(s"$result has None in its description field, not a name of an adapter node."), resultType = result.resultType.getOrElse(ResultOutputType.PATIENT_COUNT_XML), //todo how is this optional?? size = result.setSize, startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis), endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis), status = result.statusType, statusMessage = result.statusMessage, changeDate = System.currentTimeMillis() ) } } case class QepQueryBreakdownResultsRow( networkQueryId: NetworkQueryId, adapterNode:String, resultId:Long, resultType: ResultOutputType, dataKey:String, value:Long, changeDate:Long ) object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){ def breakdownRowsFor(networkQueryId:NetworkQueryId, adapterNode:String, resultId:Long, breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = { breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis())) } def resultEnvelopesFrom(breakdowns:Seq[QepQueryBreakdownResultsRow]): Map[ResultOutputType, I2b2ResultEnvelope] = { def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = { val data = breakdowns.map(b => b.dataKey -> b.value).toMap I2b2ResultEnvelope(resultType,data) } breakdowns.groupBy(_.resultType).map(r => r._1 -> resultEnvelopeFrom(r._1,r._2)) } } case class QepProblemDigestRow( networkQueryId: NetworkQueryId, adapterNode: String, codec: String, stampText: String, summary: String, description: String, details: String, changeDate:Long ){ def toProblemDigest = { ProblemDigest( codec, stampText, summary, description, if(!details.isEmpty) XML.loadString(details) else

) } }