diff --git a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala index 5d845bfd4..4aa2d7dc0 100644 --- a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala +++ b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala @@ -1,206 +1,236 @@ package net.shrine.qep.queries import java.sql.SQLException import java.util.GregorianCalendar import javax.sql.DataSource import javax.xml.datatype.DatatypeFactory import com.typesafe.config.Config import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName} import net.shrine.log.Loggable import net.shrine.protocol.{QueryMaster, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RunQueryRequest} import net.shrine.qep.QepConfigSource import net.shrine.slick.TestableDataSourceCreator import slick.driver.JdbcProfile import scala.concurrent.duration.{Duration, DurationInt} import scala.concurrent.{Await, Future, blocking} import scala.language.postfixOps /** * DB code for the QEP's query instances and query results. * * @author david * @since 1/19/16 */ case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) blocking { Await.result(future, 10 seconds) } } def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = { debug(s"insertQepQuery $runQueryRequest") insertQepQuery(QepQuery(runQueryRequest)) } def insertQepQuery(qepQuery: QepQuery):Unit = { dbRun(allQepQueryQuery += qepQuery) } def selectAllQepQueries:Seq[QepQuery] = { dbRun(allQepQueryQuery.result) } def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = { val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain) + val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set]) + val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId))) - ReadPreviousQueriesResponse(previousQueries.map(_.toQueryMaster)) + ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2))) } def selectPreviousQueriesByUserAndDomain(userName: UserName,domain: String):Seq[QepQuery] = { dbRun(allQepQueryQuery.filter(_.userName === userName).filter(_.userDomain === domain).result) } + + def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = { + debug(s"insertQepQueryFlag $qepQueryFlag") + dbRun(allQepQueryFlags += qepQueryFlag) + } + + def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = { + val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result) + + flags.map(x => x.networkQueryId -> x).toMap + } } object QepQueryDb extends Loggable { val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config) val db = QepQueryDb(QepQuerySchema.schema,dataSource) val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) QepQueryDb.db.createTables() } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepQuerySchema(jdbcProfile: JdbcProfile) extends Loggable { import jdbcProfile.api._ - def ddlForAllTables = { - allQepQueryQuery.schema + def ddlForAllTables: jdbcProfile.DDL = { + allQepQueryQuery.schema ++ allQepQueryFlags.schema } //to get the schema, use the REPL //println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } /** mysql> describe SHRINE_QUERY; +------------------+--------------+------+-----+-------------------+----------------+ | Field | Type | Null | Key | Default | Extra | +------------------+--------------+------+-----+-------------------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | local_id | varchar(255) | NO | MUL | NULL | | | network_id | bigint(20) | NO | MUL | NULL | | | username | varchar(255) | NO | MUL | NULL | | | domain | varchar(255) | NO | | NULL | | | query_name | varchar(255) | NO | | NULL | | | query_expression | text | YES | | NULL | | | date_created | timestamp | NO | | CURRENT_TIMESTAMP | | | has_been_run | tinyint(1) | NO | | 0 | | | flagged | tinyint(1) | NO | | 0 | | | flag_message | varchar(255) | YES | | NULL | | | query_xml | text | YES | | NULL | | +------------------+--------------+------+-----+-------------------+----------------+ */ class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") { def networkId = column[NetworkQueryId]("networkId") def userName = column[UserName]("userName") def userDomain = column[String]("domain") def queryName = column[QueryName]("queryName") def expression = column[String]("expression") def dateCreated = column[Time]("dateCreated") def hasBeenRun = column[Boolean]("hasBeenRun") - def flagged = column[Boolean]("flagged") - def flagMessage = column[String]("flagMessage") def queryXml = column[String]("queryXml") - def * = (networkId,userName,userDomain,queryName,expression,dateCreated,hasBeenRun,flagged,flagMessage,queryXml) <> (QepQuery.tupled,QepQuery.unapply) + def * = (networkId,userName,userDomain,queryName,expression,dateCreated,hasBeenRun,queryXml) <> (QepQuery.tupled,QepQuery.unapply) } + val allQepQueryQuery = TableQuery[QepQueries] + + class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") { + def networkId = column[NetworkQueryId]("networkId") + def flagged = column[Boolean]("flagged") + def flagMessage = column[String]("flagMessage") + def changeDate = column[Long]("changeDate") + + def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply) + } + + val allQepQueryFlags = TableQuery[QepQueryFlags] + val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for( + queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists + ) yield queryFlags } object QepQuerySchema { val allConfig:Config = QepConfigSource.config val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName) val schema = QepQuerySchema(slickProfile) } case class QepQuery( networkId:NetworkQueryId, userName: UserName, userDomain: String, queryName: QueryName, expression: String, dateCreated: Time, hasBeenRun: Boolean, - flagged: Boolean, - flagMessage: String, queryXml:String ){ - def toQueryMaster:QueryMaster = { + def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = { val gregorianCalendar = new GregorianCalendar() gregorianCalendar.setTimeInMillis(dateCreated) val xmlGregorianCalendar = DatatypeFactory.newInstance().newXMLGregorianCalendar(gregorianCalendar) QueryMaster( queryMasterId = networkId.toString, networkQueryId = networkId, name = queryName, userId = userName, groupId = userDomain, createDate = xmlGregorianCalendar, held = None, //todo if a query is held at the adapter, how will we know? do we care? - flagged = Some(flagged), //todo flagged is boolean, this is tri-state. When is it None vs false - flagMessage = Some(flagMessage) //todo this always has a flaggedMessage (empty). When should it be None? + flagged = qepQueryFlag.map(_.flagged), + flagMessage = qepQueryFlag.map(_.flagMessage) ) } } -object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,String,Time,Boolean,Boolean,String,String) => QepQuery) { +object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,String,Time,Boolean,String) => QepQuery) { def apply(runQueryRequest: RunQueryRequest):QepQuery = { new QepQuery( networkId = runQueryRequest.networkQueryId, userName = runQueryRequest.authn.username, userDomain = runQueryRequest.authn.domain, queryName = runQueryRequest.queryDefinition.name, expression = runQueryRequest.queryDefinition.expr.getOrElse("No Expression").toString, dateCreated = System.currentTimeMillis(), hasBeenRun = false, //todo ?? - flagged = false, //todo flagged?? - flagMessage = "", //todo flagMessage queryXml = runQueryRequest.toXmlString ) } } + +case class QepQueryFlag( + networkQueryId: NetworkQueryId, + flagged:Boolean, + flagMessage:String, + changeDate:Long + ) + diff --git a/qep/service/src/main/sql/mysql.ddl b/qep/service/src/main/sql/mysql.ddl index ae7932a8d..2fc4fff3c 100644 --- a/qep/service/src/main/sql/mysql.ddl +++ b/qep/service/src/main/sql/mysql.ddl @@ -1,2 +1,3 @@ create table `queriesSent` (`shrineNodeId` TEXT NOT NULL,`userName` TEXT NOT NULL,`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`queryTopicId` TEXT,`queryTopicName` TEXT,`timeQuerySent` BIGINT NOT NULL); create table `previousQueries` (`networkId` BIGINT NOT NULL,`userName` TEXT NOT NULL,`domain` TEXT NOT NULL,`queryName` TEXT NOT NULL,`expression` TEXT NOT NULL,`dateCreated` BIGINT NOT NULL,`hasBeenRun` BOOLEAN NOT NULL,`flagged` BOOLEAN NOT NULL,`flagMessage` TEXT NOT NULL,`queryXml` TEXT NOT NULL); +create table `queryFlags` (`networkId` BIGINT NOT NULL,`flagged` BOOLEAN NOT NULL,`flagMessage` TEXT NOT NULL,`changeDate` BIGINT NOT NULL); \ No newline at end of file diff --git a/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala b/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala index 639e69c2b..5669dc292 100644 --- a/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala +++ b/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala @@ -1,71 +1,64 @@ package net.shrine.qep.queries -import net.shrine.qep.QepConfigSource -import net.shrine.qep.QepConfigSource -import net.shrine.util.ShouldMatchersForJUnit import net.shrine.util.ShouldMatchersForJUnit import org.junit.{After, Before, Test} /** * @author david * @since 1/20/16 */ class QepQueryDbTest extends ShouldMatchersForJUnit {// with TestWithDatabase { val qepQuery = QepQuery( networkId = 1L, userName = "ben", userDomain = "testDomain", queryName = "testQuery", expression = "testExpression", dateCreated = System.currentTimeMillis(), hasBeenRun = false, - flagged = false, - flagMessage = "", queryXml = "testXML" ) val secondQepQuery = QepQuery( networkId = 2L, userName = "dave", userDomain = "testDomain", queryName = "testQuery", expression = "testExpression", dateCreated = System.currentTimeMillis(), hasBeenRun = false, - flagged = false, - flagMessage = "", queryXml = "testXML" ) @Test def testInsert() { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepQuery(secondQepQuery) val results = QepQueryDb.db.selectAllQepQueries results should equal(Seq(qepQuery,secondQepQuery)) } @Test def testSelectForUser() { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepQuery(secondQepQuery) val results = QepQueryDb.db.selectPreviousQueriesByUserAndDomain("ben","testDomain") results should equal(Seq(qepQuery)) } @Before def beforeEach() = { QepQueryDb.db.createTables() } @After def afterEach() = { QepQueryDb.db.dropTables() } }