diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala index 069458450..c38b59897 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/RunQueryAdapter.scala @@ -1,238 +1,238 @@ package net.shrine.adapter import net.shrine.adapter.audit.AdapterAuditDb import scala.util.Failure import scala.util.Success import scala.util.Try import scala.xml.NodeSeq import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, Credential, I2b2ResultEnvelope, QueryResult, RawCrcRunQueryResponse, ReadResultRequest, ReadResultResponse, ResultOutputType, RunQueryRequest, RunQueryResponse, ErrorResponse, ShrineResponse} import net.shrine.client.Poster import scala.util.control.NonFatal import net.shrine.util.XmlDateHelper import scala.xml.XML /** * @author Bill Simons * @author clint * @since 4/15/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ final case class RunQueryAdapter( poster: Poster, dao: AdapterDao, override val hiveCredentials: HiveCredentials, conceptTranslator: QueryDefinitionTranslator, adapterLockoutAttemptsThreshold: Int, doObfuscation: Boolean, runQueriesImmediately: Boolean, breakdownTypes: Set[ResultOutputType], collectAdapterAudit:Boolean ) extends CrcAdapter[RunQueryRequest, RunQueryResponse](poster, hiveCredentials) { logStartup() import RunQueryAdapter._ override protected[adapter] def parseShrineResponse(xml: NodeSeq) = RawCrcRunQueryResponse.fromI2b2(breakdownTypes)(xml).get //TODO: Avoid .get call override protected[adapter] def translateNetworkToLocal(request: RunQueryRequest): RunQueryRequest = { try { request.mapQueryDefinition(conceptTranslator.translate) } catch { case NonFatal(e) => throw new AdapterMappingException(s"Error mapping query terms from network to local forms. request: ${request.elideAuthenticationInfo}", e) } } override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = { if (collectAdapterAudit) AdapterAuditDb.db.insertQueryReceived(message) if (isLockedOut(message.networkAuthn)) { throw new AdapterLockoutException(message.networkAuthn) } val runQueryReq = message.request.asInstanceOf[RunQueryRequest] //We need to use the network identity from the BroadcastMessage, since that will have the network username //(ie, ecommons) of the querying user. Using the AuthenticationInfo from the incoming request breaks the fetching //of previous queries on deployed systems where the credentials in the identity param to this method and the authn //field of the incoming request are different, like the HMS Shrine deployment. //NB: Credential field is wiped out to preserve old behavior -Clint 14 Nov, 2013 val authnToUse = message.networkAuthn.copy(credential = Credential("", false)) if (!runQueriesImmediately) { debug(s"Queueing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}") storeQuery(authnToUse, message, runQueryReq) } else { debug(s"Performing query from user ${message.networkAuthn.domain}:${message.networkAuthn.username}") val result: ShrineResponse = runQuery(authnToUse, message.copy(request = runQueryReq.withAuthn(authnToUse)), runQueryReq.withAuthn(authnToUse)) if (collectAdapterAudit) AdapterAuditDb.db.insertResultSent(result) result } } private def storeQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): RunQueryResponse = { //Use dummy ids for what we would have received from the CRC val masterId: Long = -1L val queryInstanceId: Long = -1L val resultId: Long = -1L //TODO: is this right?? Or maybe it's project id? val groupId = authnToUse.domain val invalidSetSize = -1L val now = XmlDateHelper.now val queryResult = QueryResult(resultId, queryInstanceId, Some(ResultOutputType.PATIENT_COUNT_XML), invalidSetSize, Some(now), Some(now), Some("Query enqueued for later processing"), QueryResult.StatusType.Held, Some("Query enqueued for later processing")) dao.inTransaction { val insertedQueryId = dao.insertQuery(masterId.toString, request.networkQueryId, authnToUse, request.queryDefinition, isFlagged = false, hasBeenRun = false, flagMessage = None) val insertedQueryResultIds = dao.insertQueryResults(insertedQueryId, Seq(queryResult)) //NB: We need to insert dummy QueryResult and Count records so that calls to StoredQueries.retrieve() in //AbstractReadQueryResultAdapter, called when retrieving results for previously-queued-or-incomplete //queries, will work. val countQueryResultId = insertedQueryResultIds(ResultOutputType.PATIENT_COUNT_XML).head dao.insertCountResult(countQueryResultId, -1L, -1L) } RunQueryResponse(masterId, XmlDateHelper.now, authnToUse.username, groupId, request.queryDefinition, queryInstanceId, queryResult) } private def runQuery(authnToUse: AuthenticationInfo, message: BroadcastMessage, request: RunQueryRequest): ShrineResponse = { - if (collectAdapterAudit) AdapterAuditDb.db.insertQueryReceived(message) + if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionStarted(request) //NB: Pass through ErrorResponses received from the CRC. //See: https://open.med.harvard.edu/jira/browse/SHRINE-794 val result = super.processRequest(message) match { case e: ErrorResponse => e case rawRunQueryResponse: RawCrcRunQueryResponse => { processRawCrcRunQueryResponse(authnToUse, request, rawRunQueryResponse) } } if (collectAdapterAudit) AdapterAuditDb.db.insertExecutionCompletedShrineResponse(result) result } private[adapter] def processRawCrcRunQueryResponse(authnToUse: AuthenticationInfo, request: RunQueryRequest, rawRunQueryResponse: RawCrcRunQueryResponse): RunQueryResponse = { def isBreakdown(result: QueryResult) = result.resultType.map(_.isBreakdown).getOrElse(false) val originalResults = rawRunQueryResponse.results val (originalBreakdownResults, originalNonBreakDownResults) = originalResults.partition(isBreakdown) val originalBreakdownCountAttempts = attemptToRetrieveBreakdowns(request, originalBreakdownResults) val (successfulBreakdownCountAttempts, failedBreakdownCountAttempts) = originalBreakdownCountAttempts.partition { case (_, t) => t.isSuccess } logBreakdownFailures(rawRunQueryResponse, failedBreakdownCountAttempts) val originalMergedBreakdowns = { val withBreakdownCounts = successfulBreakdownCountAttempts.collect { case (_, Success(queryResultWithBreakdowns)) => queryResultWithBreakdowns } withBreakdownCounts.map(_.breakdowns).fold(Map.empty)(_ ++ _) } val obfuscatedQueryResults = originalResults.map(Obfuscator.obfuscate) val obfuscatedNonBreakdownQueryResults = obfuscatedQueryResults.filterNot(isBreakdown) val obfuscatedMergedBreakdowns = obfuscateBreakdowns(originalMergedBreakdowns) val failedBreakdownTypes = failedBreakdownCountAttempts.flatMap { case (queryResult, _) => queryResult.resultType } dao.storeResults( authn = authnToUse, masterId = rawRunQueryResponse.queryId.toString, networkQueryId = request.networkQueryId, queryDefinition = request.queryDefinition, rawQueryResults = originalResults, obfuscatedQueryResults = obfuscatedQueryResults, failedBreakdownTypes = failedBreakdownTypes, mergedBreakdowns = originalMergedBreakdowns, obfuscatedBreakdowns = obfuscatedMergedBreakdowns) val queryResults: Seq[QueryResult] = if (doObfuscation) obfuscatedNonBreakdownQueryResults else originalNonBreakDownResults val breakdownsToReturn: Map[ResultOutputType, I2b2ResultEnvelope] = if (doObfuscation) obfuscatedMergedBreakdowns else originalMergedBreakdowns //TODO: Will fail in the case of NO non-breakdown QueryResults. Can this ever happen, and is it worth protecting against here? val resultWithBreakdowns = queryResults.head.withBreakdowns(breakdownsToReturn) if(debugEnabled) { def justBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = breakdowns.mapValues(_.data) val obfuscationMessage = s"obfuscation is ${if(doObfuscation) "ON" else "OFF"}" debug(s"Returning QueryResult with count ${resultWithBreakdowns.setSize} (original count: ${originalNonBreakDownResults.headOption.map(_.setSize)} ; $obfuscationMessage)") debug(s"Returning QueryResult with breakdowns ${justBreakdowns(resultWithBreakdowns.breakdowns)} (original breakdowns: ${justBreakdowns(originalMergedBreakdowns)} ; $obfuscationMessage)") debug(s"Full QueryResult: $resultWithBreakdowns") } rawRunQueryResponse.toRunQueryResponse.withResult(resultWithBreakdowns) } private def getResultFromCrc(parentRequest: RunQueryRequest, networkResultId: Long): Try[ReadResultResponse] = { def readResultRequest(runQueryReq: RunQueryRequest, networkResultId: Long) = ReadResultRequest(hiveCredentials.projectId, runQueryReq.waitTime, hiveCredentials.toAuthenticationInfo, networkResultId.toString) Try(XML.loadString(callCrc(readResultRequest(parentRequest, networkResultId)))).flatMap(ReadResultResponse.fromI2b2(breakdownTypes)) } private[adapter] def attemptToRetrieveCount(runQueryReq: RunQueryRequest, originalCountQueryResult: QueryResult): (QueryResult, Try[QueryResult]) = { originalCountQueryResult -> (for { countData <- getResultFromCrc(runQueryReq, originalCountQueryResult.resultId) } yield originalCountQueryResult.withSetSize(countData.metadata.setSize)) } private[adapter] def attemptToRetrieveBreakdowns(runQueryReq: RunQueryRequest, breakdownResults: Seq[QueryResult]): Seq[(QueryResult, Try[QueryResult])] = { breakdownResults.map { origBreakdownResult => origBreakdownResult -> (for { breakdownData <- getResultFromCrc(runQueryReq, origBreakdownResult.resultId).map(_.data) } yield origBreakdownResult.withBreakdown(breakdownData)) } } private[adapter] def logBreakdownFailures(response: RawCrcRunQueryResponse, failures: Seq[(QueryResult, Try[QueryResult])]) { for { (origQueryResult, Failure(e)) <- failures } { error(s"Couldn't load breakdown for QueryResult with masterId: ${response.queryId}, instanceId: ${origQueryResult.instanceId}, resultId: ${origQueryResult.resultId}. Asked for result type: ${origQueryResult.resultType}", e) } } private def isLockedOut(authn: AuthenticationInfo): Boolean = { adapterLockoutAttemptsThreshold match { case 0 => false case _ => dao.isUserLockedOut(authn, adapterLockoutAttemptsThreshold) } } private def logStartup(): Unit = { val message = { if (runQueriesImmediately) { s"${getClass.getSimpleName} will run queries immediately" } else { s"${getClass.getSimpleName} will queue queries for later execution" } } info(message) } } object RunQueryAdapter { private[adapter] def obfuscateBreakdowns(breakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Map[ResultOutputType, I2b2ResultEnvelope] = { breakdowns.mapValues(_.mapValues(Obfuscator.obfuscate)) } } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/audit/AdapterAuditDb.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/audit/AdapterAuditDb.scala index 9cb688c47..1d0e2af2b 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/audit/AdapterAuditDb.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/audit/AdapterAuditDb.scala @@ -1,347 +1,347 @@ package net.shrine.adapter.audit import java.io.PrintWriter import java.sql.{DriverManager, Connection, SQLException} import java.util.logging.Logger import javax.naming.InitialContext import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.adapter.service.AdapterConfigSource import net.shrine.crypto.KeyStoreCertCollection import net.shrine.log.Loggable import net.shrine.audit.{QueryTopicId, Time, QueryName, NetworkQueryId, UserName, ShrineNodeId} import net.shrine.protocol.{BroadcastMessage, RunQueryRequest, RunQueryResponse, ShrineResponse} import slick.driver.JdbcProfile import scala.concurrent.{Future, Await} import scala.concurrent.duration.{Duration,DurationInt} import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking /** * DB code for the Adapter audit metrics. * * @author david * @since 8/25/15 */ case class AdapterAuditDb(schemaDef:AdapterAuditSchema,dataSource: DataSource) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) blocking { Await.result(future, 10 seconds) } } def insertQueryReceived(broadcastMessage: BroadcastMessage):Unit = { QueryReceived.fromBroadcastMessage(broadcastMessage).foreach(insertQueryReceived) } def insertQueryReceived(queryReceived:QueryReceived):Unit = { dbRun(allQueriesReceived += queryReceived) } def selectAllQueriesReceived:Seq[QueryReceived] = { dbRun(allQueriesReceived.result) } def insertExecutionStarted(runQueryRequest: RunQueryRequest):Unit = { insertExecutionStarted(ExecutionStarted.fromRequest(runQueryRequest)) } def insertExecutionStarted(executionStart:ExecutionStarted):Unit = { dbRun(allExecutionStarts += executionStart) } def selectAllExecutionStarts:Seq[ExecutionStarted] = { dbRun(allExecutionStarts.result) } def insertExecutionCompletedShrineResponse(shrineResponse: ShrineResponse) = { ExecutionCompleted.fromResponse(shrineResponse).foreach(insertExecutionCompleted) } def insertExecutionCompleted(executionCompleted:ExecutionCompleted):Unit = { dbRun(allExecutionCompletes += executionCompleted) } def selectAllExecutionCompletes:Seq[ExecutionCompleted] = { dbRun(allExecutionCompletes.result) } def insertResultSent(shrineResponse:ShrineResponse):Unit = { ResultSent.fromResponse(shrineResponse).foreach(insertResultSent) } def insertResultSent(resultSent: ResultSent):Unit = { dbRun(allResultsSent += resultSent) } def selectAllResultsSent:Seq[ResultSent] = { dbRun(allResultsSent.result) } } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class AdapterAuditSchema(jdbcProfile: JdbcProfile) extends Loggable { import jdbcProfile.api._ def ddlForAllTables = { allResultsSent.schema ++ allExecutionStarts.schema ++ allExecutionCompletes.schema ++ allQueriesReceived.schema } //to get the schema, use the REPL //println(AdapterAuditSchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } class ResultsSentTable(tag:Tag) extends Table[ResultSent](tag,"resultsSent") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def queryName = column[QueryName]("queryName") - def timeQuerySent = column[Time]("timeQuerySent") + def timeResultsSent = column[Time]("timeResultsSent") - def * = (networkQueryId,queryName,timeQuerySent) <> (ResultSent.tupled,ResultSent.unapply) + def * = (networkQueryId,queryName,timeResultsSent) <> (ResultSent.tupled,ResultSent.unapply) } val allResultsSent = TableQuery[ResultsSentTable] class ExecutionStartsTable(tag:Tag) extends Table[ExecutionStarted](tag,"executionStarts") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def queryName = column[QueryName]("queryName") - def timeQuerySent = column[Time]("timeQuerySent") + def timeExecutionStarts = column[Time]("timeExecutionStarts") - def * = (networkQueryId,queryName,timeQuerySent) <> (ExecutionStarted.tupled,ExecutionStarted.unapply) + def * = (networkQueryId,queryName,timeExecutionStarts) <> (ExecutionStarted.tupled,ExecutionStarted.unapply) } val allExecutionStarts = TableQuery[ExecutionStartsTable] class ExecutionCompletesTable(tag:Tag) extends Table[ExecutionCompleted](tag,"executionCompletes") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def queryName = column[QueryName]("queryName") - def timeQuerySent = column[Time]("timeQuerySent") + def timeExecutionCompletes = column[Time]("timeExecutionCompletes") - def * = (networkQueryId,queryName,timeQuerySent) <> (ExecutionCompleted.tupled,ExecutionCompleted.unapply) + def * = (networkQueryId,queryName,timeExecutionCompletes) <> (ExecutionCompleted.tupled,ExecutionCompleted.unapply) } val allExecutionCompletes = TableQuery[ExecutionCompletesTable] class QueriesReceivedAuditTable(tag:Tag) extends Table[QueryReceived](tag,"queryReceived") { def shrineNodeId = column[ShrineNodeId]("shrineNodeId") def userName = column[UserName]("userName") def networkQueryId = column[NetworkQueryId]("networkQueryId") def queryName = column[QueryName]("queryName") def timeQuerySent = column[Time]("timeSent") def queryTopicId = column[Option[QueryTopicId]]("topicId") def timeQueryReceived = column[Time]("timeReceived") def * = (shrineNodeId,userName,networkQueryId,queryName,timeQuerySent,queryTopicId,timeQueryReceived) <> (QueryReceived.tupled,QueryReceived.unapply) } val allQueriesReceived = TableQuery[QueriesReceivedAuditTable] } object AdapterAuditSchema { val allConfig:Config = AdapterConfigSource.config //todo rename adapter2 to adapter val config:Config = allConfig.getConfig("shrine.adapter2.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = AdapterConfigSource.objectForName(slickProfileClassName) val schema = AdapterAuditSchema(slickProfile) } object AdapterAuditDb { val dataSource:DataSource = { val dataSourceFrom = AdapterAuditSchema.config.getString("dataSourceFrom") if(dataSourceFrom == "JNDI") { val jndiDataSourceName = AdapterAuditSchema.config.getString("jndiDataSourceName") val initialContext:InitialContext = new InitialContext() initialContext.lookup(jndiDataSourceName).asInstanceOf[DataSource] } else if (dataSourceFrom == "testDataSource") { val testDataSourceConfig = AdapterAuditSchema.config.getConfig("testDataSource") val driverClassName = testDataSourceConfig.getString("driverClassName") val url = testDataSourceConfig.getString("url") //Creating an instance of the driver register it. (!) From a previous epoch, but it works. Class.forName(driverClassName).newInstance() object TestDataSource extends DataSource { override def getConnection: Connection = { DriverManager.getConnection(url) } override def getConnection(username: String, password: String): Connection = { DriverManager.getConnection(url, username, password) } //unused methods override def unwrap[T](iface: Class[T]): T = ??? override def isWrapperFor(iface: Class[_]): Boolean = ??? override def setLogWriter(out: PrintWriter): Unit = ??? override def getLoginTimeout: Int = ??? override def setLoginTimeout(seconds: Int): Unit = ??? override def getParentLogger: Logger = ??? override def getLogWriter: PrintWriter = ??? } TestDataSource } else throw new IllegalArgumentException(s"shrine.steward.database.dataSourceFrom must be either JNDI or testDataSource, not $dataSourceFrom") } val db = AdapterAuditDb(AdapterAuditSchema.schema,dataSource) val createTablesOnStart = AdapterAuditSchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) AdapterAuditDb.db.createTables() } case class ResultSent( networkQueryId:NetworkQueryId, queryName:QueryName, timeQueryResponse:Time ) object ResultSent extends (( NetworkQueryId, QueryName, Time ) => ResultSent){ def fromResponse(shrineResponse:ShrineResponse) = { shrineResponse match { case rqr:RunQueryResponse => Some(ResultSent(rqr.queryId, rqr.queryName, System.currentTimeMillis())) case _ => None } } } case class ExecutionStarted( networkQueryId:NetworkQueryId, queryName:QueryName, timeQueryResponse:Time ) object ExecutionStarted extends (( NetworkQueryId, QueryName, Time ) => ExecutionStarted){ def fromRequest(rqr:RunQueryRequest) = { ExecutionStarted(rqr.networkQueryId, rqr.queryDefinition.name, System.currentTimeMillis()) } } case class ExecutionCompleted( networkQueryId:NetworkQueryId, queryName:QueryName, timeQueryResponse:Time ) object ExecutionCompleted extends (( NetworkQueryId, QueryName, Time ) => ExecutionCompleted){ def fromResponse(shrineResponse:ShrineResponse) = { shrineResponse match { case rqr:RunQueryResponse => Some(ExecutionCompleted(rqr.queryId, rqr.queryName, System.currentTimeMillis())) case _ => None } } } case class QueryReceived( shrineNodeId:ShrineNodeId, userName:UserName, networkQueryId:NetworkQueryId, queryName:QueryName, timeQuerySent:Time, queryTopicId:Option[QueryTopicId], timeQueryReceived:Time ) object QueryReceived extends (( ShrineNodeId, UserName, NetworkQueryId, QueryName, Time, Option[QueryTopicId], Time ) => QueryReceived) with Loggable { def fromBroadcastMessage(message:BroadcastMessage):Option[QueryReceived] = { message.request match { case rqr:RunQueryRequest => val timestampAndShrineNodeCn:(Time,ShrineNodeId) = message.signature.fold{ warn(s"No signature on message ${message.requestId}") (-1L,"No Cert For Message")}{signature => val timesamp = signature.timestamp.toGregorianCalendar.getTimeInMillis val shrineNodeId:ShrineNodeId = signature.signingCert.fold("Signing Cert Not Available")(x => KeyStoreCertCollection.extractCommonName(x.toCertificate).getOrElse("Common name not in cert")) (timesamp,shrineNodeId) } Some(QueryReceived(timestampAndShrineNodeCn._2, message.networkAuthn.username, rqr.networkQueryId, rqr.queryDefinition.name, timestampAndShrineNodeCn._1, rqr.topicId, System.currentTimeMillis() )) case _ => None } } } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/sql/mysql.ddl b/adapter/adapter-service/src/main/sql/mysql.ddl index 60e0abff4..3b5311447 100644 --- a/adapter/adapter-service/src/main/sql/mysql.ddl +++ b/adapter/adapter-service/src/main/sql/mysql.ddl @@ -1,4 +1,4 @@ -create table `resultsSent` (`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`timeQuerySent` BIGINT NOT NULL); -create table `executionStarts` (`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`timeQuerySent` BIGINT NOT NULL); -create table `executionCompletes` (`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`timeQuerySent` BIGINT NOT NULL); -create table `queryReceived` (`shrineNodeId` TEXT NOT NULL,`userName` TEXT NOT NULL,`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`timeSent` BIGINT NOT NULL,`topicId` TEXT,`timeReceived` BIGINT NOT NULL) \ No newline at end of file +create table "resultsSent" ("networkQueryId" BIGINT NOT NULL,"queryName" VARCHAR NOT NULL,"timeResultsSent" BIGINT NOT NULL); +create table "executionStarts" ("networkQueryId" BIGINT NOT NULL,"queryName" VARCHAR NOT NULL,"timeExecutionStarts" BIGINT NOT NULL); +create table "executionCompletes" ("networkQueryId" BIGINT NOT NULL,"queryName" VARCHAR NOT NULL,"timeExecutionCompletes" BIGINT NOT NULL); +create table "queryReceived" ("shrineNodeId" VARCHAR NOT NULL,"userName" VARCHAR NOT NULL,"networkQueryId" BIGINT NOT NULL,"queryName" VARCHAR NOT NULL,"timeSent" BIGINT NOT NULL,"topicId" VARCHAR,"timeReceived" BIGINT NOT NULL) \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/service/AbstractShrineService.scala b/qep/service/src/main/scala/net/shrine/service/AbstractShrineService.scala index 8111345a4..0aa6e4ad8 100644 --- a/qep/service/src/main/scala/net/shrine/service/AbstractShrineService.scala +++ b/qep/service/src/main/scala/net/shrine/service/AbstractShrineService.scala @@ -1,227 +1,221 @@ package net.shrine.service import net.shrine.log.Loggable import net.shrine.service.audit.{QepAuditDb, QepQueryAuditData} import net.shrine.service.dao.AuditDao import net.shrine.authentication.Authenticator import net.shrine.authorization.QueryAuthorizationService import net.shrine.broadcaster.BroadcastAndAggregationService import scala.concurrent.duration.Duration import net.shrine.util.XmlDateHelper import scala.concurrent.Future import scala.concurrent.Await import net.shrine.protocol.RunQueryRequest import net.shrine.authorization.AuthorizationResult.NotAuthorized import net.shrine.protocol.BaseShrineRequest import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.Credential import net.shrine.authentication.AuthenticationResult import net.shrine.authentication.NotAuthenticatedException import net.shrine.aggregation.RunQueryAggregator import net.shrine.aggregation.Aggregators import net.shrine.protocol.BaseShrineResponse import net.shrine.aggregation.Aggregator import net.shrine.protocol.ReadQueryInstancesRequest import net.shrine.protocol.QueryInstance import net.shrine.protocol.ReadQueryInstancesResponse import net.shrine.protocol.ReadQueryDefinitionRequest import net.shrine.aggregation.ReadQueryDefinitionAggregator import net.shrine.protocol.DeleteQueryRequest import net.shrine.aggregation.ReadPreviousQueriesAggregator import net.shrine.aggregation.DeleteQueryAggregator import net.shrine.aggregation.ReadPdoResponseAggregator import net.shrine.aggregation.RenameQueryAggregator import net.shrine.aggregation.ReadInstanceResultsAggregator import net.shrine.protocol.ReadApprovedQueryTopicsRequest import net.shrine.protocol.ReadInstanceResultsRequest import net.shrine.protocol.ReadPreviousQueriesRequest import net.shrine.protocol.RenameQueryRequest import net.shrine.protocol.ReadPdoRequest import net.shrine.protocol.FlagQueryRequest import net.shrine.aggregation.FlagQueryAggregator import net.shrine.protocol.UnFlagQueryRequest import net.shrine.aggregation.UnFlagQueryAggregator import net.shrine.protocol.ReadResultOutputTypesRequest import net.shrine.protocol.ReadResultOutputTypesResponse import net.shrine.protocol.ResultOutputType /** * @author clint * @since Feb 19, 2014 */ //todo rename? This is the heart of the QEP. trait AbstractShrineService[BaseResp <: BaseShrineResponse] extends Loggable { val commonName:String val auditDao: AuditDao val authenticator: Authenticator val authorizationService: QueryAuthorizationService val includeAggregateResult: Boolean val broadcastAndAggregationService: BroadcastAndAggregationService val queryTimeout: Duration val breakdownTypes: Set[ResultOutputType] val collectQepAudit:Boolean protected def doReadResultOutputTypes(request: ReadResultOutputTypesRequest): BaseResp = { info(s"doReadResultOutputTypes($request)") afterAuthenticating(request) { authResult => val resultOutputTypes = ResultOutputType.nonErrorTypes ++ breakdownTypes //TODO: XXX: HACK: Would like to remove the cast ReadResultOutputTypesResponse(resultOutputTypes).asInstanceOf[BaseResp] } } protected def doFlagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast) } protected def doUnFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast) } protected def doRunQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor") val result = doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast) - // tuck the ACT audit data into a database here - if(collectQepAudit) { - val qepQueryAuditData = QepQueryAuditData( - commonName, - request.authn.username, - request.networkQueryId, - request.queryDefinition.name, - request.topicId) + debug(s"collectQepAudit is $collectQepAudit") + + // tuck the ACT audit metrics data into a database here + //todo network id is -1 ! + if (collectQepAudit) QepAuditDb.db.insertQepQuery(request,commonName) - //todo network id is -1 ! - QepAuditDb.db.insertQepQuery(qepQueryAuditData) - } result } protected def doReadQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseResp = { doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast) } protected def doReadPdo(request: ReadPdoRequest, shouldBroadcast: Boolean): BaseResp = { doBroadcastQuery(request, new ReadPdoResponseAggregator, shouldBroadcast) } protected def doReadInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseResp = { doBroadcastQuery(request, new ReadInstanceResultsAggregator(request.shrineNetworkQueryId, false), shouldBroadcast) } protected def doReadQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doReadQueryInstances($request)") afterAuthenticating(request) { authResult => val now = XmlDateHelper.now val networkQueryId = request.queryId val username = request.authn.username val groupId = request.projectId //NB: Return a dummy response, with a dummy QueryInstance containing the network (Shrine) id of the query we'd like //to get "instances" for. This allows the legacy web client to formulate a request for query results that Shrine //can understand, while meeting the conversational requirements of the legacy web client. val instance = QueryInstance(networkQueryId.toString, networkQueryId.toString, username, groupId, now, now) //TODO: XXX: HACK: Would like to remove the cast //NB: Munge in username from authentication result ReadQueryInstancesResponse(networkQueryId, authResult.username, groupId, Seq(instance)).asInstanceOf[BaseResp] } } protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): BaseResp = { doBroadcastQuery(request, new ReadPreviousQueriesAggregator, shouldBroadcast) } protected def doRenameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseResp = { doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast) } protected def doDeleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseResp = { doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast) } protected def doReadApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseResp = afterAuthenticating(request) { _ => info(s"doReadApprovedQueryTopics($request)") //TODO: Is authenticating necessary? //TODO: XXX: HACK: Would like to remove the cast authorizationService.readApprovedEntries(request) match { case Left(errorResponse) => errorResponse.asInstanceOf[BaseResp] case Right(validResponse) => validResponse.asInstanceOf[BaseResp] } } import broadcastAndAggregationService.sendAndAggregate protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean): BaseResp = { info(s"doBroadcastQuery($request)") //TODO: XXX: HACK: Would like to remove the cast def doSynchronousQuery(networkAuthn: AuthenticationInfo) = waitFor(sendAndAggregate(networkAuthn, request, aggregator, shouldBroadcast)).asInstanceOf[BaseResp] afterAuthenticating(request) { authResult => debug(s"doBroadcastQuery($request) authResult is $authResult") //NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another //When making BroadcastMessages val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false)) //NB: Only audit RunQueryRequests request match { case runQueryRequest: RunQueryRequest => afterAuditingAndAuthorizing(runQueryRequest) (doSynchronousQuery(networkAuthn)) case _ => doSynchronousQuery(networkAuthn) } } } private[service] val runQueryAggregatorFor: RunQueryRequest => RunQueryAggregator = Aggregators.forRunQueryRequest(includeAggregateResult) protected def waitFor[R](futureResponse: Future[R]): R = { XmlDateHelper.time("Waiting for aggregated results")(debug(_)) { Await.result(futureResponse, queryTimeout) } } private[service] def afterAuditingAndAuthorizing[T](request: RunQueryRequest)(body: => T): T = { auditTransactionally(request) { debug(s"afterAuditingAndAuthorizing($request) with $authorizationService") authorizationService.authorizeRunQueryRequest(request) match { case na: NotAuthorized => throw na.toException case _ => () } body } } private[service] def auditTransactionally[T](request: RunQueryRequest)(body: => T): T = { try { body } finally { auditDao.addAuditEntry( request.projectId, request.authn.domain, request.authn.username, request.queryDefinition.toI2b2String, //TODO: Use i2b2 format Still? request.topicId) } } import AuthenticationResult._ private[service] def afterAuthenticating[T](request: BaseShrineRequest)(f: Authenticated => T): T = { val AuthenticationInfo(domain, username, _) = request.authn val authResult = authenticator.authenticate(request.authn) authResult match { case a: Authenticated => f(a) case NotAuthenticated(_, _, reason) => throw new NotAuthenticatedException(s"User $domain:$username could not be authenticated: $reason") } } } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/service/audit/QepAuditDb.scala b/qep/service/src/main/scala/net/shrine/service/audit/QepAuditDb.scala index 43f0ab1dd..fe1d4e449 100644 --- a/qep/service/src/main/scala/net/shrine/service/audit/QepAuditDb.scala +++ b/qep/service/src/main/scala/net/shrine/service/audit/QepAuditDb.scala @@ -1,163 +1,168 @@ package net.shrine.service.audit import java.io.PrintWriter import java.sql.{DriverManager, Connection, SQLException} import java.util.logging.Logger import javax.naming.InitialContext import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.log.Loggable +import net.shrine.protocol.RunQueryRequest import net.shrine.service.QepConfigSource import net.shrine.audit.{QueryTopicId, Time, QueryName, NetworkQueryId, UserName, ShrineNodeId} import slick.driver.JdbcProfile import scala.concurrent.{Future, Await} import scala.concurrent.duration.{Duration,DurationInt} import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking /** * DB code for the QEP audit metrics. * * @author david * @since 8/18/15 */ case class QepAuditDb(schemaDef:QepAuditSchema,dataSource: DataSource) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) blocking { Await.result(future, 10 seconds) } } + def insertQepQuery(runQueryRequest:RunQueryRequest,commonName:String):Unit = { + insertQepQuery(QepQueryAuditData.fromRunQueryRequest(runQueryRequest,commonName)) + } + def insertQepQuery(qepQueryAuditData: QepQueryAuditData):Unit = { dbRun(allQepQueryQuery += qepQueryAuditData) } def selectAllQepQueries:Seq[QepQueryAuditData] = { dbRun(allQepQueryQuery.result) } } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepAuditSchema(jdbcProfile: JdbcProfile) extends Loggable { import jdbcProfile.api._ def ddlForAllTables = { allQepQueryQuery.schema } //to get the schema, use the REPL //println(QepAuditSchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } class QepQueryAuditTable(tag:Tag) extends Table[QepQueryAuditData](tag,"qepQueries") { def shrineNodeId = column[ShrineNodeId]("shrineNodeId") def userName = column[UserName]("userName") def networkQueryId = column[NetworkQueryId]("networkQueryId") def queryName = column[QueryName]("queryName") def timeQuerySent = column[Time]("timeQuerySent") def queryTopicId = column[Option[QueryTopicId]]("queryTopicId") def * = (shrineNodeId,userName,networkQueryId,queryName,timeQuerySent,queryTopicId) <> (QepQueryAuditData.tupled,QepQueryAuditData.unapply) } val allQepQueryQuery = TableQuery[QepQueryAuditTable] } object QepAuditSchema { val allConfig:Config = QepConfigSource.config val config:Config = allConfig.getConfig("shrine.qep.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName) val schema = QepAuditSchema(slickProfile) } object QepAuditDb { val dataSource:DataSource = { val dataSourceFrom = QepAuditSchema.config.getString("dataSourceFrom") if(dataSourceFrom == "JNDI") { val jndiDataSourceName = QepAuditSchema.config.getString("jndiDataSourceName") val initialContext:InitialContext = new InitialContext() initialContext.lookup(jndiDataSourceName).asInstanceOf[DataSource] } else if (dataSourceFrom == "testDataSource") { val testDataSourceConfig = QepAuditSchema.config.getConfig("testDataSource") val driverClassName = testDataSourceConfig.getString("driverClassName") val url = testDataSourceConfig.getString("url") //Creating an instance of the driver register it. (!) From a previous epoch, but it works. Class.forName(driverClassName).newInstance() object TestDataSource extends DataSource { override def getConnection: Connection = { DriverManager.getConnection(url) } override def getConnection(username: String, password: String): Connection = { DriverManager.getConnection(url, username, password) } //unused methods override def unwrap[T](iface: Class[T]): T = ??? override def isWrapperFor(iface: Class[_]): Boolean = ??? override def setLogWriter(out: PrintWriter): Unit = ??? override def getLoginTimeout: Int = ??? override def setLoginTimeout(seconds: Int): Unit = ??? override def getParentLogger: Logger = ??? override def getLogWriter: PrintWriter = ??? } TestDataSource } else throw new IllegalArgumentException(s"shrine.steward.database.dataSourceFrom must be either JNDI or testDataSource, not $dataSourceFrom") } val db = QepAuditDb(QepAuditSchema.schema,dataSource) val createTablesOnStart = QepAuditSchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) QepAuditDb.db.createTables() } diff --git a/qep/service/src/main/scala/net/shrine/service/audit/QepQueryAuditData.scala b/qep/service/src/main/scala/net/shrine/service/audit/QepQueryAuditData.scala index cf90b96d8..3bce60158 100644 --- a/qep/service/src/main/scala/net/shrine/service/audit/QepQueryAuditData.scala +++ b/qep/service/src/main/scala/net/shrine/service/audit/QepQueryAuditData.scala @@ -1,36 +1,45 @@ package net.shrine.service.audit import net.shrine.audit.{QueryTopicId, Time, QueryName, NetworkQueryId, UserName, ShrineNodeId} +import net.shrine.protocol.RunQueryRequest /** * Container for QEP audit data for ACT metrics * * @author david * @since 8/17/15 */ case class QepQueryAuditData(shrineNodeId:ShrineNodeId, userName:UserName, networkQueryId:NetworkQueryId, queryName:QueryName, timeQuerySent:Time, queryTopicId:Option[QueryTopicId]) {} object QepQueryAuditData extends ((ShrineNodeId,UserName,NetworkQueryId,QueryName,Time,Option[QueryTopicId]) => QepQueryAuditData) { -//todo should be able to access the actId from the KeyStore.myCn, if you can find a way to get at it. - def apply( shrineNodeId:String, userName:String, networkQueryId:Long, queryName:String, queryTopicId:Option[String] ):QepQueryAuditData = QepQueryAuditData( shrineNodeId, userName, networkQueryId, queryName, System.currentTimeMillis(), queryTopicId ) + + def fromRunQueryRequest(request:RunQueryRequest,commonName:String):QepQueryAuditData = { + QepQueryAuditData( + commonName, + request.authn.username, + request.networkQueryId, + request.queryDefinition.name, + request.topicId) + } + } \ No newline at end of file