diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/audit/AdapterAuditDb.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/audit/AdapterAuditDb.scala index 234c5c11c..51bea5458 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/audit/AdapterAuditDb.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/audit/AdapterAuditDb.scala @@ -1,367 +1,366 @@ package net.shrine.adapter.audit import java.io.PrintWriter import java.sql.{DriverManager, Connection, SQLException} import java.util.logging.Logger import javax.naming.InitialContext import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.adapter.service.AdapterConfigSource import net.shrine.crypto.KeyStoreCertCollection import net.shrine.log.Loggable import net.shrine.audit.{QueryTopicName, QueryTopicId, Time, QueryName, NetworkQueryId, UserName, ShrineNodeId} import net.shrine.protocol.{BroadcastMessage, RunQueryRequest, RunQueryResponse, ShrineResponse} import slick.driver.JdbcProfile import scala.concurrent.{Future, Await} import scala.concurrent.duration.{Duration,DurationInt} -import scala.language.postfixOps - import scala.concurrent.ExecutionContext.Implicits.global - import scala.concurrent.blocking +import scala.language.postfixOps /** * DB code for the Adapter audit metrics. * * @author david * @since 8/25/15 */ case class AdapterAuditDb(schemaDef:AdapterAuditSchema,dataSource: DataSource) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) blocking { Await.result(future, 10 seconds) } } def insertQueryReceived(broadcastMessage: BroadcastMessage):Unit = { debug(s"insertQueryReceived $broadcastMessage") QueryReceived.fromBroadcastMessage(broadcastMessage).foreach(insertQueryReceived) } def insertQueryReceived(queryReceived:QueryReceived):Unit = { dbRun(allQueriesReceived += queryReceived) } def selectAllQueriesReceived:Seq[QueryReceived] = { dbRun(allQueriesReceived.result) } def insertExecutionStarted(runQueryRequest: RunQueryRequest):Unit = { debug(s"insertExecutionStarted $runQueryRequest") insertExecutionStarted(ExecutionStarted.fromRequest(runQueryRequest)) } def insertExecutionStarted(executionStart:ExecutionStarted):Unit = { - dbRun(allExecutionStarts += executionStart) + dbRun(allExecutionsStarted += executionStart) } def selectAllExecutionStarts:Seq[ExecutionStarted] = { - dbRun(allExecutionStarts.result) + dbRun(allExecutionsStarted.result) } def insertExecutionCompletedShrineResponse(request: RunQueryRequest,shrineResponse: ShrineResponse) = { debug(s"insertExecutionCompleted $shrineResponse for $request") ExecutionCompleted.fromRequestResponse(request,shrineResponse).foreach(insertExecutionCompleted) } def insertExecutionCompleted(executionCompleted:ExecutionCompleted):Unit = { - dbRun(allExecutionCompletes += executionCompleted) + dbRun(allExecutionsCompleted += executionCompleted) } def selectAllExecutionCompletes:Seq[ExecutionCompleted] = { - dbRun(allExecutionCompletes.result) + dbRun(allExecutionsCompleted.result) } def insertResultSent(networkQueryId: NetworkQueryId,shrineResponse:ShrineResponse):Unit = { debug(s"insertResultSent $shrineResponse for $networkQueryId") ResultSent.fromResponse(networkQueryId,shrineResponse).foreach(insertResultSent) } def insertResultSent(resultSent: ResultSent):Unit = { dbRun(allResultsSent += resultSent) } def selectAllResultsSent:Seq[ResultSent] = { dbRun(allResultsSent.result) } } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class AdapterAuditSchema(jdbcProfile: JdbcProfile) extends Loggable { import jdbcProfile.api._ def ddlForAllTables = { allQueriesReceived.schema ++ - allExecutionStarts.schema ++ - allExecutionCompletes.schema ++ + allExecutionsStarted.schema ++ + allExecutionsCompleted.schema ++ allResultsSent.schema } //to get the schema, use the REPL //println(AdapterAuditSchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } - class QueriesReceivedAuditTable(tag:Tag) extends Table[QueryReceived](tag,"queryReceived") { + class QueriesReceivedAuditTable(tag:Tag) extends Table[QueryReceived](tag,"queriesReceived") { def shrineNodeId = column[ShrineNodeId]("shrineNodeId") def userName = column[UserName]("userName") def networkQueryId = column[NetworkQueryId]("networkQueryId") def queryName = column[QueryName]("queryName") - def timeQuerySent = column[Time]("timeSent") def queryTopicId = column[Option[QueryTopicId]]("topicId") def queryTopicName = column[Option[QueryTopicName]]("topicName") + def timeQuerySent = column[Time]("timeSent") def timeQueryReceived = column[Time]("timeReceived") - def * = (shrineNodeId,userName,networkQueryId,queryName,timeQuerySent,queryTopicId,queryTopicName,timeQueryReceived) <> (QueryReceived.tupled,QueryReceived.unapply) + def * = (shrineNodeId,userName,networkQueryId,queryName,queryTopicId,queryTopicName,timeQuerySent,timeQueryReceived) <> (QueryReceived.tupled,QueryReceived.unapply) } val allQueriesReceived = TableQuery[QueriesReceivedAuditTable] - class ExecutionStartsTable(tag:Tag) extends Table[ExecutionStarted](tag,"executionStarts") { + class ExecutionsStartedTable(tag:Tag) extends Table[ExecutionStarted](tag,"executionsStarted") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def queryName = column[QueryName]("queryName") - def timeExecutionStarts = column[Time]("timeExecutionStarts") + def timeExecutionStarts = column[Time]("timeExecutionStarted") def * = (networkQueryId,queryName,timeExecutionStarts) <> (ExecutionStarted.tupled,ExecutionStarted.unapply) } - val allExecutionStarts = TableQuery[ExecutionStartsTable] + val allExecutionsStarted = TableQuery[ExecutionsStartedTable] - class ExecutionCompletesTable(tag:Tag) extends Table[ExecutionCompleted](tag,"executionCompletes") { + class ExecutionsCompletedTable(tag:Tag) extends Table[ExecutionCompleted](tag,"executionsCompleted") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def replyId = column[Long]("replyId") def queryName = column[QueryName]("queryName") - def timeExecutionCompletes = column[Time]("timeExecutionCompletes") + def timeExecutionCompletes = column[Time]("timeExecutionCompleted") def * = (networkQueryId,replyId,queryName,timeExecutionCompletes) <> (ExecutionCompleted.tupled,ExecutionCompleted.unapply) } - val allExecutionCompletes = TableQuery[ExecutionCompletesTable] + val allExecutionsCompleted = TableQuery[ExecutionsCompletedTable] class ResultsSentTable(tag:Tag) extends Table[ResultSent](tag,"resultsSent") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def replyId = column[Long]("replyId") def queryName = column[QueryName]("queryName") def timeResultsSent = column[Time]("timeResultsSent") def * = (networkQueryId,replyId,queryName,timeResultsSent) <> (ResultSent.tupled,ResultSent.unapply) } val allResultsSent = TableQuery[ResultsSentTable] } object AdapterAuditSchema { val allConfig:Config = AdapterConfigSource.config //todo rename adapter2 to adapter val config:Config = allConfig.getConfig("shrine.adapter2.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = AdapterConfigSource.objectForName(slickProfileClassName) val schema = AdapterAuditSchema(slickProfile) } object AdapterAuditDb { val dataSource:DataSource = { val dataSourceFrom = AdapterAuditSchema.config.getString("dataSourceFrom") if(dataSourceFrom == "JNDI") { val jndiDataSourceName = AdapterAuditSchema.config.getString("jndiDataSourceName") val initialContext:InitialContext = new InitialContext() initialContext.lookup(jndiDataSourceName).asInstanceOf[DataSource] } else if (dataSourceFrom == "testDataSource") { val testDataSourceConfig = AdapterAuditSchema.config.getConfig("testDataSource") val driverClassName = testDataSourceConfig.getString("driverClassName") val url = testDataSourceConfig.getString("url") //Creating an instance of the driver register it. (!) From a previous epoch, but it works. Class.forName(driverClassName).newInstance() object TestDataSource extends DataSource { override def getConnection: Connection = { DriverManager.getConnection(url) } override def getConnection(username: String, password: String): Connection = { DriverManager.getConnection(url, username, password) } //unused methods override def unwrap[T](iface: Class[T]): T = ??? override def isWrapperFor(iface: Class[_]): Boolean = ??? override def setLogWriter(out: PrintWriter): Unit = ??? override def getLoginTimeout: Int = ??? override def setLoginTimeout(seconds: Int): Unit = ??? override def getParentLogger: Logger = ??? override def getLogWriter: PrintWriter = ??? } TestDataSource } else throw new IllegalArgumentException(s"shrine.steward.database.dataSourceFrom must be either JNDI or testDataSource, not $dataSourceFrom") } val db = AdapterAuditDb(AdapterAuditSchema.schema,dataSource) val createTablesOnStart = AdapterAuditSchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) AdapterAuditDb.db.createTables() } -case class ResultSent( - networkQueryId:NetworkQueryId, - responseId:Long, - queryName:QueryName, - timeQueryResponse:Time - ) +case class QueryReceived( + shrineNodeId:ShrineNodeId, + userName:UserName, + networkQueryId:NetworkQueryId, + queryName:QueryName, + queryTopicId:Option[QueryTopicId], + queryTopicName:Option[QueryTopicName], + timeQuerySent:Time, + timeQueryReceived:Time + ) -object ResultSent extends (( +object QueryReceived extends (( + ShrineNodeId, + UserName, NetworkQueryId, - Long, QueryName, + Option[QueryTopicId], + Option[QueryTopicName], + Time, Time - ) => ResultSent){ - def fromResponse(networkQueryId:NetworkQueryId,shrineResponse:ShrineResponse) = { + ) => QueryReceived) with Loggable { + + def fromBroadcastMessage(message:BroadcastMessage):Option[QueryReceived] = { + message.request match { + case rqr:RunQueryRequest => + + val timestampAndShrineNodeCn:(Time,ShrineNodeId) = message.signature.fold{ + warn(s"No signature on message ${message.requestId}") + (-1L,"No Cert For Message")}{signature => + val timesamp = signature.timestamp.toGregorianCalendar.getTimeInMillis + val shrineNodeId:ShrineNodeId = signature.signingCert.fold("Signing Cert Not Available")(x => KeyStoreCertCollection.extractCommonName(x.toCertificate).getOrElse("Common name not in cert")) + (timesamp,shrineNodeId) + } + + Some(QueryReceived(timestampAndShrineNodeCn._2, + message.networkAuthn.username, + rqr.networkQueryId, + rqr.queryDefinition.name, + rqr.topicId, + rqr.topicName, + timestampAndShrineNodeCn._1, + System.currentTimeMillis() + )) - shrineResponse match { - case rqr:RunQueryResponse => Some(ResultSent( - networkQueryId, - rqr.queryId, - rqr.queryName, - System.currentTimeMillis())) case _ => None } } } case class ExecutionStarted( networkQueryId:NetworkQueryId, queryName:QueryName, - timeQueryResponse:Time + timeExecutionStarted:Time ) object ExecutionStarted extends (( NetworkQueryId, QueryName, Time ) => ExecutionStarted){ def fromRequest(rqr:RunQueryRequest) = { ExecutionStarted(rqr.networkQueryId, rqr.queryDefinition.name, System.currentTimeMillis()) } } case class ExecutionCompleted( networkQueryId:NetworkQueryId, replyId:Long, queryName:QueryName, - timeQueryResponse:Time + timeExecutionCompleted:Time ) object ExecutionCompleted extends (( NetworkQueryId, Long, QueryName, Time ) => ExecutionCompleted){ def fromRequestResponse(request: RunQueryRequest,shrineResponse:ShrineResponse) = { shrineResponse match { case rqr:RunQueryResponse => Some(ExecutionCompleted( request.networkQueryId, rqr.queryId, rqr.queryName, System.currentTimeMillis())) case _ => None } } } -case class QueryReceived( - shrineNodeId:ShrineNodeId, - userName:UserName, - networkQueryId:NetworkQueryId, - queryName:QueryName, - timeQuerySent:Time, - queryTopicId:Option[QueryTopicId], - queryTopicName:Option[QueryTopicName], - timeQueryReceived:Time - ) +case class ResultSent( + networkQueryId:NetworkQueryId, + responseId:Long, + queryName:QueryName, + timeResultSent:Time + ) -object QueryReceived extends (( - ShrineNodeId, - UserName, - NetworkQueryId, +object ResultSent extends (( + NetworkQueryId, + Long, QueryName, - Time, - Option[QueryTopicId], - Option[QueryTopicName], Time - ) => QueryReceived) with Loggable { - - def fromBroadcastMessage(message:BroadcastMessage):Option[QueryReceived] = { - message.request match { - case rqr:RunQueryRequest => - - val timestampAndShrineNodeCn:(Time,ShrineNodeId) = message.signature.fold{ - warn(s"No signature on message ${message.requestId}") - (-1L,"No Cert For Message")}{signature => - val timesamp = signature.timestamp.toGregorianCalendar.getTimeInMillis - val shrineNodeId:ShrineNodeId = signature.signingCert.fold("Signing Cert Not Available")(x => KeyStoreCertCollection.extractCommonName(x.toCertificate).getOrElse("Common name not in cert")) - (timesamp,shrineNodeId) - } - - Some(QueryReceived(timestampAndShrineNodeCn._2, - message.networkAuthn.username, - rqr.networkQueryId, - rqr.queryDefinition.name, - timestampAndShrineNodeCn._1, - rqr.topicId, - rqr.topicName, - System.currentTimeMillis() - )) + ) => ResultSent){ + def fromResponse(networkQueryId:NetworkQueryId,shrineResponse:ShrineResponse) = { + shrineResponse match { + case rqr:RunQueryResponse => Some(ResultSent( + networkQueryId, + rqr.queryId, + rqr.queryName, + System.currentTimeMillis())) case _ => None } } -} \ No newline at end of file +} + diff --git a/adapter/adapter-service/src/main/sql/mysql.ddl b/adapter/adapter-service/src/main/sql/mysql.ddl index b5981b50c..868451be8 100644 --- a/adapter/adapter-service/src/main/sql/mysql.ddl +++ b/adapter/adapter-service/src/main/sql/mysql.ddl @@ -1,4 +1,4 @@ -create table `resultsSent` (`networkQueryId` BIGINT NOT NULL,`replyId` BIGINT NOT NULL, `queryName` TEXT NOT NULL,`timeResultsSent` BIGINT NOT NULL); -create table `executionStarts` (`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`timeExecutionStarts` BIGINT NOT NULL); -create table `executionCompletes` (`networkQueryId` BIGINT NOT NULL,`replyId` BIGINT NOT NULL, `queryName` TEXT NOT NULL,`timeExecutionCompletes` BIGINT NOT NULL); -create table `queryReceived` (`shrineNodeId` TEXT NOT NULL,`userName` TEXT NOT NULL,`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`timeSent` BIGINT NOT NULL,`topicId` TEXT,`topicName` TEXT,`timeReceived` BIGINT NOT NULL) +create table `queriesReceived` (`shrineNodeId` TEXT NOT NULL,`userName` TEXT NOT NULL,`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`topicId` TEXT,`topicName` TEXT,`timeSent` BIGINT NOT NULL,`timeReceived` BIGINT NOT NULL); +create table `executionsStarted` (`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`timeExecutionStarted` BIGINT NOT NULL); +create table `executionsCompleted` (`networkQueryId` BIGINT NOT NULL,`replyId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`timeExecutionCompleted` BIGINT NOT NULL); +create table `resultsSent` (`networkQueryId` BIGINT NOT NULL,`replyId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`timeResultsSent` BIGINT NOT NULL); \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/audit/AdapterAuditDbTest.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/audit/AdapterAuditDbTest.scala index 5cda47b38..e38150755 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/audit/AdapterAuditDbTest.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/audit/AdapterAuditDbTest.scala @@ -1,76 +1,76 @@ package net.shrine.adapter.audit import net.shrine.adapter.service.AdapterConfigSource import net.shrine.util.ShouldMatchersForJUnit import org.junit.{After, Before, Test} /** * @author david * @since 8/25/15 */ class AdapterAuditDbTest extends ShouldMatchersForJUnit {// with TestWithDatabase { val resultsSent = ResultSent(-1,-1,"ben's query",System.currentTimeMillis()) val executionStarted = ExecutionStarted(-1,"ben's query",System.currentTimeMillis()) val executionCompleted = ExecutionCompleted(-1,-1,"ben's query",System.currentTimeMillis()) - val queryReceived = QueryReceived("example.com","ben",-1,"ben's query",System.currentTimeMillis(),Some("1"),Some("test topic"),System.currentTimeMillis()) + val queryReceived = QueryReceived("example.com","ben",-1,"ben's query",Some("1"),Some("test topic"),System.currentTimeMillis(),System.currentTimeMillis()) @Test def testInsertResultsSent() { AdapterConfigSource.configForBlock("shrine.adapter2.audit.useQepAudit","true",this.getClass.getSimpleName){ AdapterAuditDb.db.insertResultSent(resultsSent) val results = AdapterAuditDb.db.selectAllResultsSent results should equal(Seq(resultsSent)) } } @Test def testInsertExecutionStarted() { AdapterConfigSource.configForBlock("shrine.adapter2.audit.useQepAudit","true",this.getClass.getSimpleName){ AdapterAuditDb.db.insertExecutionStarted(executionStarted) val results = AdapterAuditDb.db.selectAllExecutionStarts results should equal(Seq(executionStarted)) } } @Test def testInsertExecutionCompleted() { AdapterConfigSource.configForBlock("shrine.adapter2.audit.useQepAudit","true",this.getClass.getSimpleName){ AdapterAuditDb.db.insertExecutionCompleted(executionCompleted) val results = AdapterAuditDb.db.selectAllExecutionCompletes results should equal(Seq(executionCompleted)) } } @Test def testInsertQueryReceived() { AdapterConfigSource.configForBlock("shrine.adapter2.audit.useQepAudit","true",this.getClass.getSimpleName){ AdapterAuditDb.db.insertQueryReceived(queryReceived) val results = AdapterAuditDb.db.selectAllQueriesReceived results should equal(Seq(queryReceived)) } } @Before def beforeEach() = { AdapterAuditDb.db.createTables() } @After def afterEach() = { AdapterAuditDb.db.dropTables() } } diff --git a/qep/service/src/main/scala/net/shrine/service/audit/QepAuditDb.scala b/qep/service/src/main/scala/net/shrine/service/audit/QepAuditDb.scala index ad8c76886..fa9b9b75a 100644 --- a/qep/service/src/main/scala/net/shrine/service/audit/QepAuditDb.scala +++ b/qep/service/src/main/scala/net/shrine/service/audit/QepAuditDb.scala @@ -1,171 +1,227 @@ package net.shrine.service.audit import java.io.PrintWriter import java.sql.{DriverManager, Connection, SQLException} import java.util.logging.Logger import javax.naming.InitialContext import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.log.Loggable import net.shrine.protocol.RunQueryRequest import net.shrine.service.QepConfigSource import net.shrine.audit.{QueryTopicName, QueryTopicId, Time, QueryName, NetworkQueryId, UserName, ShrineNodeId} import slick.driver.JdbcProfile import scala.concurrent.{Future, Await} import scala.concurrent.duration.{Duration,DurationInt} import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.blocking /** * DB code for the QEP audit metrics. * * @author david * @since 8/18/15 */ case class QepAuditDb(schemaDef:QepAuditSchema,dataSource: DataSource) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) blocking { Await.result(future, 10 seconds) } } def insertQepQuery(runQueryRequest:RunQueryRequest,commonName:String):Unit = { debug(s"insertQepQuery $runQueryRequest") insertQepQuery(QepQueryAuditData.fromRunQueryRequest(runQueryRequest,commonName)) } def insertQepQuery(qepQueryAuditData: QepQueryAuditData):Unit = { dbRun(allQepQueryQuery += qepQueryAuditData) } def selectAllQepQueries:Seq[QepQueryAuditData] = { dbRun(allQepQueryQuery.result) } } +object QepAuditDb { + + val dataSource:DataSource = { + + val dataSourceFrom = QepAuditSchema.config.getString("dataSourceFrom") + if(dataSourceFrom == "JNDI") { + val jndiDataSourceName = QepAuditSchema.config.getString("jndiDataSourceName") + val initialContext:InitialContext = new InitialContext() + + initialContext.lookup(jndiDataSourceName).asInstanceOf[DataSource] + + } + else if (dataSourceFrom == "testDataSource") { + + val testDataSourceConfig = QepAuditSchema.config.getConfig("testDataSource") + val driverClassName = testDataSourceConfig.getString("driverClassName") + val url = testDataSourceConfig.getString("url") + + //Creating an instance of the driver register it. (!) From a previous epoch, but it works. + Class.forName(driverClassName).newInstance() + + object TestDataSource extends DataSource { + override def getConnection: Connection = { + DriverManager.getConnection(url) + } + + override def getConnection(username: String, password: String): Connection = { + DriverManager.getConnection(url, username, password) + } + + //unused methods + override def unwrap[T](iface: Class[T]): T = ??? + override def isWrapperFor(iface: Class[_]): Boolean = ??? + override def setLogWriter(out: PrintWriter): Unit = ??? + override def getLoginTimeout: Int = ??? + override def setLoginTimeout(seconds: Int): Unit = ??? + override def getParentLogger: Logger = ??? + override def getLogWriter: PrintWriter = ??? + } + + TestDataSource + } + else throw new IllegalArgumentException(s"shrine.steward.database.dataSourceFrom must be either JNDI or testDataSource, not $dataSourceFrom") + } + + val db = QepAuditDb(QepAuditSchema.schema,dataSource) + + val createTablesOnStart = QepAuditSchema.config.getBoolean("createTablesOnStart") + if(createTablesOnStart) QepAuditDb.db.createTables() + +} + /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepAuditSchema(jdbcProfile: JdbcProfile) extends Loggable { import jdbcProfile.api._ def ddlForAllTables = { allQepQueryQuery.schema } //to get the schema, use the REPL //println(QepAuditSchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } - class QepQueryAuditTable(tag:Tag) extends Table[QepQueryAuditData](tag,"qepQueries") { + class QueriesSent(tag:Tag) extends Table[QepQueryAuditData](tag,"queriesSent") { def shrineNodeId = column[ShrineNodeId]("shrineNodeId") def userName = column[UserName]("userName") def networkQueryId = column[NetworkQueryId]("networkQueryId") def queryName = column[QueryName]("queryName") - def timeQuerySent = column[Time]("timeQuerySent") def queryTopicId = column[Option[QueryTopicId]]("queryTopicId") def queryTopicName = column[Option[QueryTopicName]]("queryTopicName") + def timeQuerySent = column[Time]("timeQuerySent") - def * = (shrineNodeId,userName,networkQueryId,queryName,timeQuerySent,queryTopicId,queryTopicName) <> (QepQueryAuditData.tupled,QepQueryAuditData.unapply) + def * = (shrineNodeId,userName,networkQueryId,queryName,queryTopicId,queryTopicName,timeQuerySent) <> (QepQueryAuditData.tupled,QepQueryAuditData.unapply) } - val allQepQueryQuery = TableQuery[QepQueryAuditTable] + val allQepQueryQuery = TableQuery[QueriesSent] } object QepAuditSchema { val allConfig:Config = QepConfigSource.config val config:Config = allConfig.getConfig("shrine.qep.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName) val schema = QepAuditSchema(slickProfile) } -object QepAuditDb { - - val dataSource:DataSource = { - - val dataSourceFrom = QepAuditSchema.config.getString("dataSourceFrom") - if(dataSourceFrom == "JNDI") { - val jndiDataSourceName = QepAuditSchema.config.getString("jndiDataSourceName") - val initialContext:InitialContext = new InitialContext() - - initialContext.lookup(jndiDataSourceName).asInstanceOf[DataSource] - } - else if (dataSourceFrom == "testDataSource") { - - val testDataSourceConfig = QepAuditSchema.config.getConfig("testDataSource") - val driverClassName = testDataSourceConfig.getString("driverClassName") - val url = testDataSourceConfig.getString("url") - - //Creating an instance of the driver register it. (!) From a previous epoch, but it works. - Class.forName(driverClassName).newInstance() - - object TestDataSource extends DataSource { - override def getConnection: Connection = { - DriverManager.getConnection(url) - } - - override def getConnection(username: String, password: String): Connection = { - DriverManager.getConnection(url, username, password) - } - - //unused methods - override def unwrap[T](iface: Class[T]): T = ??? - override def isWrapperFor(iface: Class[_]): Boolean = ??? - override def setLogWriter(out: PrintWriter): Unit = ??? - override def getLoginTimeout: Int = ??? - override def setLoginTimeout(seconds: Int): Unit = ??? - override def getParentLogger: Logger = ??? - override def getLogWriter: PrintWriter = ??? - } - - TestDataSource - } - else throw new IllegalArgumentException(s"shrine.steward.database.dataSourceFrom must be either JNDI or testDataSource, not $dataSourceFrom") +/** + * Container for QEP audit data for ACT metrics + * + * @author david + * @since 8/17/15 + */ +case class QepQueryAuditData( + shrineNodeId:ShrineNodeId, + userName:UserName, + networkQueryId:NetworkQueryId, + queryName:QueryName, + queryTopicId:Option[QueryTopicId], + queryTopicName:Option[QueryTopicName], + timeQuerySent:Time + ) {} + +object QepQueryAuditData extends (( + ShrineNodeId, + UserName, + NetworkQueryId, + QueryName, + Option[QueryTopicId], + Option[QueryTopicName], + Time + ) => QepQueryAuditData) { + + def apply( + shrineNodeId:String, + userName:String, + networkQueryId:Long, + queryName:String, + queryTopicId:Option[String], + queryTopicName: Option[QueryTopicName] + ):QepQueryAuditData = QepQueryAuditData( + shrineNodeId, + userName, + networkQueryId, + queryName, + queryTopicId, + queryTopicName, + System.currentTimeMillis() + ) + + def fromRunQueryRequest(request:RunQueryRequest,commonName:String):QepQueryAuditData = { + QepQueryAuditData( + commonName, + request.authn.username, + request.networkQueryId, + request.queryDefinition.name, + request.topicId, + request.topicName + ) } - val db = QepAuditDb(QepAuditSchema.schema,dataSource) - - val createTablesOnStart = QepAuditSchema.config.getBoolean("createTablesOnStart") - if(createTablesOnStart) QepAuditDb.db.createTables() - -} - +} \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/service/audit/QepQueryAuditData.scala b/qep/service/src/main/scala/net/shrine/service/audit/QepQueryAuditData.scala deleted file mode 100644 index 4b103d206..000000000 --- a/qep/service/src/main/scala/net/shrine/service/audit/QepQueryAuditData.scala +++ /dev/null @@ -1,59 +0,0 @@ -package net.shrine.service.audit - -import net.shrine.audit.{QueryTopicName, QueryTopicId, Time, QueryName, NetworkQueryId, UserName, ShrineNodeId} -import net.shrine.protocol.RunQueryRequest - -/** - * Container for QEP audit data for ACT metrics - * - * @author david - * @since 8/17/15 - */ -case class QepQueryAuditData(shrineNodeId:ShrineNodeId, - userName:UserName, - networkQueryId:NetworkQueryId, - queryName:QueryName, - timeQuerySent:Time, - queryTopicId:Option[QueryTopicId], - queryTopicName:Option[QueryTopicName] - ) {} - -object QepQueryAuditData extends (( - ShrineNodeId, - UserName, - NetworkQueryId, - QueryName, - Time, - Option[QueryTopicId], - Option[QueryTopicName] - ) => QepQueryAuditData) { - - def apply( - shrineNodeId:String, - userName:String, - networkQueryId:Long, - queryName:String, - queryTopicId:Option[String], - queryTopicName: Option[QueryTopicName] - ):QepQueryAuditData = QepQueryAuditData( - shrineNodeId, - userName, - networkQueryId, - queryName, - System.currentTimeMillis(), - queryTopicId, - queryTopicName - ) - - def fromRunQueryRequest(request:RunQueryRequest,commonName:String):QepQueryAuditData = { - QepQueryAuditData( - commonName, - request.authn.username, - request.networkQueryId, - request.queryDefinition.name, - request.topicId, - request.topicName - ) - } - -} \ No newline at end of file diff --git a/qep/service/src/main/sql/mysql.ddl b/qep/service/src/main/sql/mysql.ddl index b5f4490d6..fa69cc90a 100644 --- a/qep/service/src/main/sql/mysql.ddl +++ b/qep/service/src/main/sql/mysql.ddl @@ -1 +1 @@ -create table `qepQueries` (`shrineNodeId` TEXT NOT NULL,`userName` TEXT NOT NULL,`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`timeQuerySent` BIGINT NOT NULL,`queryTopicId` TEXT,`queryTopicName` TEXT) \ No newline at end of file +create table `queriesSent` (`shrineNodeId` TEXT NOT NULL,`userName` TEXT NOT NULL,`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`queryTopicId` TEXT,`queryTopicName` TEXT,`timeQuerySent` BIGINT NOT NULL); \ No newline at end of file