diff --git a/qep/service/src/main/scala/net/shrine/service/AbstractShrineService.scala b/qep/service/src/main/scala/net/shrine/service/AbstractShrineService.scala index 30454f4ef..c70cbece4 100644 --- a/qep/service/src/main/scala/net/shrine/service/AbstractShrineService.scala +++ b/qep/service/src/main/scala/net/shrine/service/AbstractShrineService.scala @@ -1,230 +1,212 @@ package net.shrine.service import net.shrine.log.Loggable import net.shrine.service.audit.QepAuditDb import net.shrine.service.dao.AuditDao import net.shrine.authentication.Authenticator import net.shrine.authorization.QueryAuthorizationService import net.shrine.broadcaster.BroadcastAndAggregationService import net.shrine.service.queries.QepQueryDb import scala.concurrent.duration.Duration import net.shrine.util.XmlDateHelper import scala.concurrent.Future import scala.concurrent.Await -import net.shrine.protocol.RunQueryRequest +import net.shrine.protocol.{ReadPreviousQueriesResponse, RunQueryRequest, BaseShrineRequest, AuthenticationInfo, Credential, BaseShrineResponse, ReadQueryInstancesRequest, QueryInstance, ReadQueryInstancesResponse, ReadQueryDefinitionRequest, DeleteQueryRequest, ReadApprovedQueryTopicsRequest, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, RenameQueryRequest, ReadPdoRequest, FlagQueryRequest, UnFlagQueryRequest, ReadResultOutputTypesRequest, ReadResultOutputTypesResponse, ResultOutputType} import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized} -import net.shrine.protocol.BaseShrineRequest -import net.shrine.protocol.AuthenticationInfo -import net.shrine.protocol.Credential import net.shrine.authentication.AuthenticationResult import net.shrine.authentication.NotAuthenticatedException import net.shrine.aggregation.RunQueryAggregator import net.shrine.aggregation.Aggregators -import net.shrine.protocol.BaseShrineResponse import net.shrine.aggregation.Aggregator -import net.shrine.protocol.ReadQueryInstancesRequest -import net.shrine.protocol.QueryInstance -import net.shrine.protocol.ReadQueryInstancesResponse -import net.shrine.protocol.ReadQueryDefinitionRequest import net.shrine.aggregation.ReadQueryDefinitionAggregator -import net.shrine.protocol.DeleteQueryRequest -import net.shrine.aggregation.ReadPreviousQueriesAggregator import net.shrine.aggregation.DeleteQueryAggregator import net.shrine.aggregation.ReadPdoResponseAggregator import net.shrine.aggregation.RenameQueryAggregator import net.shrine.aggregation.ReadInstanceResultsAggregator -import net.shrine.protocol.ReadApprovedQueryTopicsRequest -import net.shrine.protocol.ReadInstanceResultsRequest -import net.shrine.protocol.ReadPreviousQueriesRequest -import net.shrine.protocol.RenameQueryRequest -import net.shrine.protocol.ReadPdoRequest -import net.shrine.protocol.FlagQueryRequest import net.shrine.aggregation.FlagQueryAggregator -import net.shrine.protocol.UnFlagQueryRequest import net.shrine.aggregation.UnFlagQueryAggregator -import net.shrine.protocol.ReadResultOutputTypesRequest -import net.shrine.protocol.ReadResultOutputTypesResponse -import net.shrine.protocol.ResultOutputType /** * @author clint * @since Feb 19, 2014 */ //todo rename to QEP? This is the heart of the QEP. trait AbstractShrineService[BaseResp <: BaseShrineResponse] extends Loggable { val commonName:String val auditDao: AuditDao val authenticator: Authenticator val authorizationService: QueryAuthorizationService val includeAggregateResult: Boolean val broadcastAndAggregationService: BroadcastAndAggregationService val queryTimeout: Duration val breakdownTypes: Set[ResultOutputType] val collectQepAudit:Boolean protected def doReadResultOutputTypes(request: ReadResultOutputTypesRequest): BaseResp = { info(s"doReadResultOutputTypes($request)") authenticateAndThen(request) { authResult => val resultOutputTypes = ResultOutputType.nonErrorTypes ++ breakdownTypes //TODO: XXX: HACK: Would like to remove the cast ReadResultOutputTypesResponse(resultOutputTypes).asInstanceOf[BaseResp] } } protected def doFlagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast) } protected def doUnFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast) } protected def doRunQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor") //store the query in the qep's database doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast) } protected def doReadQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseResp = { doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast) } protected def doReadPdo(request: ReadPdoRequest, shouldBroadcast: Boolean): BaseResp = { doBroadcastQuery(request, new ReadPdoResponseAggregator, shouldBroadcast) } protected def doReadInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseResp = { doBroadcastQuery(request, new ReadInstanceResultsAggregator(request.shrineNetworkQueryId, false), shouldBroadcast) } protected def doReadQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doReadQueryInstances($request)") authenticateAndThen(request) { authResult => val now = XmlDateHelper.now val networkQueryId = request.queryId val username = request.authn.username val groupId = request.projectId //NB: Return a dummy response, with a dummy QueryInstance containing the network (Shrine) id of the query we'd like //to get "instances" for. This allows the legacy web client to formulate a request for query results that Shrine //can understand, while meeting the conversational requirements of the legacy web client. val instance = QueryInstance(networkQueryId.toString, networkQueryId.toString, username, groupId, now, now) //TODO: XXX: HACK: Would like to remove the cast //NB: Munge in username from authentication result ReadQueryInstancesResponse(networkQueryId, authResult.username, groupId, Seq(instance)).asInstanceOf[BaseResp] } } - protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): BaseResp = { - //todo instead pull results from the local database. + protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): ReadPreviousQueriesResponse = { + //pull results from the local database. - doBroadcastQuery(request, new ReadPreviousQueriesAggregator, shouldBroadcast) + //todo delete at end of local/local effort + // doBroadcastQuery(request, new ReadPreviousQueriesAggregator, shouldBroadcast).asInstanceOf[ReadPreviousQueriesResponse] + QepQueryDb.db.selectPreviousQueries(request) } protected def doRenameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseResp = { doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast) } protected def doDeleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseResp = { doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast) } protected def doReadApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseResp = authenticateAndThen(request) { _ => info(s"doReadApprovedQueryTopics($request)") //TODO: XXX: HACK: Would like to remove the cast authorizationService.readApprovedEntries(request) match { case Left(errorResponse) => errorResponse.asInstanceOf[BaseResp] case Right(validResponse) => validResponse.asInstanceOf[BaseResp] } } import broadcastAndAggregationService.sendAndAggregate protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => debug(s"doBroadcastQuery($request) authResult is $authResult") //NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another //When making BroadcastMessages val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false)) //NB: Only audit RunQueryRequests request match { case runQueryRequest: RunQueryRequest => // inject modified, authorized runQueryRequest auditAuthorizeAndThen(runQueryRequest) { authorizedRequest => debug(s"doBroadcastQuery authorizedRequest is $authorizedRequest") // tuck the ACT audit metrics data into a database here if (collectQepAudit) QepAuditDb.db.insertQepQuery(authorizedRequest,commonName) QepQueryDb.db.insertQepQuery(authorizedRequest) doSynchronousQuery(networkAuthn,authorizedRequest,aggregator,shouldBroadcast) } case _ => doSynchronousQuery(networkAuthn,request,aggregator,shouldBroadcast) } } } private def doSynchronousQuery(networkAuthn: AuthenticationInfo,request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean) = { info(s"doSynchronousQuery($request) started") val response = waitFor(sendAndAggregate(networkAuthn, request, aggregator, shouldBroadcast)).asInstanceOf[BaseResp] info(s"doSynchronousQuery($request) completed with response $response") response } private[service] val runQueryAggregatorFor: RunQueryRequest => RunQueryAggregator = Aggregators.forRunQueryRequest(includeAggregateResult) protected def waitFor[R](futureResponse: Future[R]): R = { XmlDateHelper.time("Waiting for aggregated results")(debug(_)) { Await.result(futureResponse, queryTimeout) } } private[service] def auditAuthorizeAndThen[T](request: RunQueryRequest)(body: (RunQueryRequest => T)): T = { auditTransactionally(request) { debug(s"auditAuthorizeAndThen($request) with $authorizationService") val authorizedRequest = authorizationService.authorizeRunQueryRequest(request) match { case na: NotAuthorized => throw na.toException case authorized: Authorized => { request.copy(topicName = authorized.topicIdAndName.map(x => x._2)) } } body(authorizedRequest) } } private[service] def auditTransactionally[T](request: RunQueryRequest)(body: => T): T = { try { body } finally { auditDao.addAuditEntry( request.projectId, request.authn.domain, request.authn.username, request.queryDefinition.toI2b2String, //TODO: Use i2b2 format Still? request.topicId) } } import AuthenticationResult._ private[service] def authenticateAndThen[T](request: BaseShrineRequest)(f: Authenticated => T): T = { val AuthenticationInfo(domain, username, _) = request.authn val authResult = authenticator.authenticate(request.authn) authResult match { case a: Authenticated => f(a) case NotAuthenticated(_, _, reason) => throw new NotAuthenticatedException(s"User $domain:$username could not be authenticated: $reason") } } } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/service/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/service/queries/QepQueryDb.scala index 40558d8eb..a07c46f5c 100644 --- a/qep/service/src/main/scala/net/shrine/service/queries/QepQueryDb.scala +++ b/qep/service/src/main/scala/net/shrine/service/queries/QepQueryDb.scala @@ -1,223 +1,253 @@ package net.shrine.service.queries import java.io.PrintWriter import java.sql.{DriverManager, Connection, SQLException} +import java.util.GregorianCalendar import java.util.logging.Logger import javax.naming.InitialContext import javax.sql.DataSource +import javax.xml.datatype.{DatatypeFactory} import com.typesafe.config.Config import net.shrine.log.Loggable -import net.shrine.protocol.RunQueryRequest +import net.shrine.protocol.{QueryMaster, ReadPreviousQueriesResponse, ReadPreviousQueriesRequest, RunQueryRequest} import net.shrine.audit.{Time, QueryName, NetworkQueryId, UserName} import net.shrine.service.QepConfigSource import slick.driver.JdbcProfile import scala.concurrent.{Future, Await} import scala.concurrent.duration.{Duration,DurationInt} import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking /** * DB code for the QEP's query instances and query results. * * @author david * @since 1/19/16 */ case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) blocking { Await.result(future, 10 seconds) } } def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = { debug(s"insertQepQuery $runQueryRequest") insertQepQuery(QepQuery(runQueryRequest)) } def insertQepQuery(qepQuery: QepQuery):Unit = { dbRun(allQepQueryQuery += qepQuery) } def selectAllQepQueries:Seq[QepQuery] = { dbRun(allQepQueryQuery.result) } + def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = { + val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain) + + ReadPreviousQueriesResponse(previousQueries.map(_.toQueryMaster)) + } + + def selectPreviousQueriesByUserAndDomain(userName: UserName,domain: String):Seq[QepQuery] = { + dbRun(allQepQueryQuery.filter(_.userName === userName).filter(_.userDomain === domain).result) + } } object QepQueryDb extends Loggable { val dataSource:DataSource = { val dataSourceFrom = QepQuerySchema.config.getString("dataSourceFrom") if(dataSourceFrom == "JNDI") { val jndiDataSourceName = QepQuerySchema.config.getString("jndiDataSourceName") val initialContext:InitialContext = new InitialContext() initialContext.lookup(jndiDataSourceName).asInstanceOf[DataSource] } else if (dataSourceFrom == "testDataSource") { val testDataSourceConfig = QepQuerySchema.config.getConfig("testDataSource") val driverClassName = testDataSourceConfig.getString("driverClassName") val url = testDataSourceConfig.getString("url") //Creating an instance of the driver register it. (!) From a previous epoch, but it works. Class.forName(driverClassName).newInstance() object TestDataSource extends DataSource { override def getConnection: Connection = { DriverManager.getConnection(url) } override def getConnection(username: String, password: String): Connection = { DriverManager.getConnection(url, username, password) } //unused methods override def unwrap[T](iface: Class[T]): T = ??? override def isWrapperFor(iface: Class[_]): Boolean = ??? override def setLogWriter(out: PrintWriter): Unit = ??? override def getLoginTimeout: Int = ??? override def setLoginTimeout(seconds: Int): Unit = ??? override def getParentLogger: Logger = ??? override def getLogWriter: PrintWriter = ??? } TestDataSource } else throw new IllegalArgumentException(s"shrine.steward.database.dataSourceFrom must be either JNDI or testDataSource, not $dataSourceFrom") } val db = QepQueryDb(QepQuerySchema.schema,dataSource) val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) QepQueryDb.db.createTables() } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepQuerySchema(jdbcProfile: JdbcProfile) extends Loggable { import jdbcProfile.api._ def ddlForAllTables = { allQepQueryQuery.schema } //to get the schema, use the REPL //println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } /** mysql> describe SHRINE_QUERY; +------------------+--------------+------+-----+-------------------+----------------+ | Field | Type | Null | Key | Default | Extra | +------------------+--------------+------+-----+-------------------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | local_id | varchar(255) | NO | MUL | NULL | | | network_id | bigint(20) | NO | MUL | NULL | | | username | varchar(255) | NO | MUL | NULL | | | domain | varchar(255) | NO | | NULL | | | query_name | varchar(255) | NO | | NULL | | | query_expression | text | YES | | NULL | | | date_created | timestamp | NO | | CURRENT_TIMESTAMP | | | has_been_run | tinyint(1) | NO | | 0 | | | flagged | tinyint(1) | NO | | 0 | | | flag_message | varchar(255) | YES | | NULL | | | query_xml | text | YES | | NULL | | +------------------+--------------+------+-----+-------------------+----------------+ */ class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") { def networkId = column[NetworkQueryId]("networkId") def userName = column[UserName]("userName") def userDomain = column[String]("domain") def queryName = column[QueryName]("queryName") def expression = column[String]("expression") def dateCreated = column[Time]("dateCreated") def hasBeenRun = column[Boolean]("hasBeenRun") def flagged = column[Boolean]("flagged") def flagMessage = column[String]("flagMessage") def queryXml = column[String]("queryXml") def * = (networkId,userName,userDomain,queryName,expression,dateCreated,hasBeenRun,flagged,flagMessage,queryXml) <> (QepQuery.tupled,QepQuery.unapply) } val allQepQueryQuery = TableQuery[QepQueries] } object QepQuerySchema { val allConfig:Config = QepConfigSource.config val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName) val schema = QepQuerySchema(slickProfile) } case class QepQuery( networkId:NetworkQueryId, userName: UserName, userDomain: String, queryName: QueryName, expression: String, dateCreated: Time, hasBeenRun: Boolean, flagged: Boolean, flagMessage: String, queryXml:String - ) + ){ + + def toQueryMaster:QueryMaster = { + + val gregorianCalendar = new GregorianCalendar() + gregorianCalendar.setTimeInMillis(dateCreated) + val xmlGregorianCalendar = DatatypeFactory.newInstance().newXMLGregorianCalendar(gregorianCalendar) + QueryMaster( + queryMasterId = "?", //todo what is this from? + networkQueryId = networkId, + name = queryName, + userId = userName, + groupId = userDomain, + createDate = xmlGregorianCalendar, + held = None, //todo if a query is held at the adapter, how will we know? do we care? + flagged = Some(flagged), //todo flagged is boolean, this is tri-state. When is it None vs false + flagMessage = Some(flagMessage) //todo this always has a flaggedMessage (empty). When should it be None? + ) + } +} object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,String,Time,Boolean,Boolean,String,String) => QepQuery) { def apply(runQueryRequest: RunQueryRequest):QepQuery = { new QepQuery( runQueryRequest.networkQueryId, runQueryRequest.authn.username, runQueryRequest.authn.domain, runQueryRequest.queryDefinition.name, runQueryRequest.queryDefinition.expr.getOrElse("No Expression").toString, System.currentTimeMillis(), false, false, //todo flagged?? "", //todo flagMessage runQueryRequest.toXmlString ) } }