diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/AdapterDao.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/AdapterDao.scala index 2e8116bb5..50fd75c72 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/AdapterDao.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/AdapterDao.scala @@ -1,71 +1,73 @@ package net.shrine.adapter.dao import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.AuthenticationInfo import net.shrine.adapter.dao.model.ShrineQueryResult import net.shrine.protocol.QueryResult import net.shrine.protocol.I2b2ResultEnvelope import net.shrine.protocol.ResultOutputType import net.shrine.adapter.dao.model.ShrineQuery import scala.xml.NodeSeq /** * @author clint * @since Oct 15, 2012 */ trait AdapterDao { /** * @return the id column of the inserted row */ def insertQuery(masterId: String, networkId: Long, authn: AuthenticationInfo, queryDefinition: QueryDefinition, isFlagged: Boolean, hasBeenRun: Boolean, flagMessage: Option[String]): Int //Returns a Map of output types to Seqs of inserted ids, since the ERROR output type can be used for multiple query_result rows, //Say for a run query operation that results in multiple error responses from the CRC. def insertQueryResults(parentQueryId: Int, results: Seq[QueryResult]): Map[ResultOutputType, Seq[Int]] def insertCountResult(resultId: Int, originalCount: Long, obfuscatedCount: Long): Unit def insertBreakdownResults(parentResultIds: Map[ResultOutputType, Seq[Int]], originalBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Unit def insertErrorResult(parentResultId: Int, errorMessage: String, codec:String, stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq): Unit def findQueriesByUserAndDomain(domain: String, username: String, howMany: Int): Seq[ShrineQuery] - + + def findQueriesByDomain(domain:String):Seq[ShrineQuery] + def findQueryByNetworkId(networkQueryId: Long): Option[ShrineQuery] def findResultsFor(networkQueryId: Long): Option[ShrineQueryResult] def isUserLockedOut(authn: AuthenticationInfo, defaultThreshold: Int): Boolean def renameQuery(networkQueryId: Long, newName: String): Unit def deleteQuery(networkQueryId: Long): Unit def deleteQueryResultsFor(networkQueryId: Long): Unit def findRecentQueries(howMany: Int): Seq[ShrineQuery] def storeResults(authn: AuthenticationInfo, masterId: String, networkQueryId: Long, queryDefinition: QueryDefinition, rawQueryResults: Seq[QueryResult], obfuscatedQueryResults: Seq[QueryResult], failedBreakdownTypes: Seq[ResultOutputType], mergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Unit def flagQuery(networkQueryId: Long, message: Option[String]): Unit def unFlagQuery(networkQueryId: Long): Unit def inTransaction[T](f: => T): T = f } diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala index da82f770c..78d58fcb9 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala @@ -1,450 +1,464 @@ package net.shrine.adapter.dao.squeryl import javax.xml.datatype.XMLGregorianCalendar import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.model.{ObfuscatedPair, ShrineQuery, ShrineQueryResult} import net.shrine.adapter.dao.model.squeryl.{SquerylBreakdownResultRow, SquerylCountRow, SquerylPrivilegedUser, SquerylQueryResultRow, SquerylShrineError, SquerylShrineQuery} import net.shrine.adapter.dao.squeryl.tables.Tables import net.shrine.dao.DateHelpers import net.shrine.dao.squeryl.{SquerylEntryPoint, SquerylInitializer} import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.protocol.{AuthenticationInfo, I2b2ResultEnvelope, QueryResult, ResultOutputType} import net.shrine.protocol.query.QueryDefinition import net.shrine.util.XmlDateHelper import org.squeryl.Query import org.squeryl.dsl.GroupWithMeasures import scala.util.Try import scala.xml.NodeSeq /** * @author clint * @since May 22, 2013 */ final class SquerylAdapterDao(initializer: SquerylInitializer, tables: Tables)(implicit breakdownTypes: Set[ResultOutputType]) extends AdapterDao with Loggable { initializer.init() override def inTransaction[T](f: => T): T = SquerylEntryPoint.inTransaction { f } import SquerylEntryPoint._ override def flagQuery(networkQueryId: Long, flagMessage: Option[String]): Unit = mutateFlagField(networkQueryId, newIsFlagged = true, flagMessage) override def unFlagQuery(networkQueryId: Long): Unit = mutateFlagField(networkQueryId, newIsFlagged = false, None) private def mutateFlagField(networkQueryId: Long, newIsFlagged: Boolean, newFlagMessage: Option[String]): Unit = { inTransaction { update(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId). set(queryRow.isFlagged := newIsFlagged, queryRow.flagMessage := newFlagMessage) } } } override def storeResults( authn: AuthenticationInfo, masterId: String, networkQueryId: Long, queryDefinition: QueryDefinition, rawQueryResults: Seq[QueryResult], obfuscatedQueryResults: Seq[QueryResult], failedBreakdownTypes: Seq[ResultOutputType], mergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Unit = { inTransaction { val insertedQueryId = insertQuery(masterId, networkQueryId, authn, queryDefinition, isFlagged = false, hasBeenRun = true, flagMessage = None) val insertedQueryResultIds = insertQueryResults(insertedQueryId, rawQueryResults) storeCountResults(rawQueryResults, obfuscatedQueryResults, insertedQueryResultIds) storeErrorResults(rawQueryResults, insertedQueryResultIds) storeBreakdownFailures(failedBreakdownTypes.toSet, insertedQueryResultIds) insertBreakdownResults(insertedQueryResultIds, mergedBreakdowns, obfuscatedBreakdowns) } } private[adapter] def storeCountResults(raw: Seq[QueryResult], obfuscated: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val notErrors = raw.filter(!_.isError) val obfuscatedNotErrors = obfuscated.filter(!_.isError) if(notErrors.size > 1) { warn(s"Got ${notErrors.size} raw (hopefully-)count results; more than 1 is unusual.") } if(obfuscatedNotErrors.size > 1) { warn(s"Got ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; more than 1 is unusual.") } if(notErrors.size != obfuscatedNotErrors.size) { warn(s"Got ${notErrors.size} raw and ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; that these numbers are different is unusual.") } import ResultOutputType.PATIENT_COUNT_XML def isCount(qr: QueryResult): Boolean = qr.resultType.contains(PATIENT_COUNT_XML) inTransaction { //NB: Take the count/setSize from the FIRST PATIENT_COUNT_XML QueryResult, //though the same count should be there for all of them, if there are more than one for { Seq(insertedCountQueryResultId) <- insertedIds.get(PATIENT_COUNT_XML) notError <- notErrors.find(isCount) //NB: Find a count result, just to be sure obfuscatedNotError <- obfuscatedNotErrors.find(isCount) //NB: Find a count result, just to be sure } { insertCountResult(insertedCountQueryResultId, notError.setSize, obfuscatedNotError.setSize) } } } private[adapter] def storeErrorResults(results: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val errors = results.filter(_.isError) val insertedErrorResultIds = insertedIds.getOrElse(ResultOutputType.ERROR,Nil) val insertedIdsToErrors = insertedErrorResultIds zip errors inTransaction { for { (insertedErrorResultId, errorQueryResult) <- insertedIdsToErrors } { val pd = errorQueryResult.problemDigest.get //it's an error so it will have a problem digest insertErrorResult( insertedErrorResultId, errorQueryResult.statusMessage.getOrElse("Unknown failure"), pd.codec, pd.stampText, pd.summary, pd.description, pd.detailsXml ) } } } private[adapter] def storeBreakdownFailures(failedBreakdownTypes: Set[ResultOutputType], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val insertedIdsForFailedBreakdownTypes = insertedIds.filterKeys(failedBreakdownTypes.contains) inTransaction { for { (failedBreakdownType, Seq(resultId)) <- insertedIdsForFailedBreakdownTypes } { //todo propagate backwards to the breakdown failure to create the corect problem object BreakdownFailure extends AbstractProblem(ProblemSources.Adapter) { override val summary: String = "Couldn't retrieve result breakdown" override val description:String = s"Couldn't retrieve result breakdown of type '$failedBreakdownType'" } val pd = BreakdownFailure.toDigest insertErrorResult( resultId, s"Couldn't retrieve breakdown of type '$failedBreakdownType'", pd.codec, pd.stampText, pd.summary, pd.description, pd.detailsXml ) } } } override def findRecentQueries(howMany: Int): Seq[ShrineQuery] = { inTransaction { Queries.queriesForAllUsers.take(howMany).map(_.toShrineQuery).toSeq } } def findAllCounts():Seq[SquerylCountRow] = { inTransaction{ Queries.allCountResults.toSeq } } override def renameQuery(networkQueryId: Long, newName: String) { inTransaction { update(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId). set(queryRow.name := newName) } } } override def deleteQuery(networkQueryId: Long): Unit = { inTransaction { tables.shrineQueries.deleteWhere(_.networkId === networkQueryId) } } override def deleteQueryResultsFor(networkQueryId: Long): Unit = { inTransaction { val resultIdsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) => where(queryRow.networkId === networkQueryId). select(resultRow.id). on(queryRow.id === resultRow.queryId) }.toSet tables.queryResults.deleteWhere(_.id in resultIdsForNetworkQueryId) } } override def isUserLockedOut(authn: AuthenticationInfo, defaultThreshold: Int): Boolean = Try { inTransaction { val privilegedUserOption = Queries.privilegedUsers(authn.domain, authn.username).singleOption val threshold:Int = privilegedUserOption.flatMap(_.threshold).getOrElse(defaultThreshold.intValue) val thirtyDaysInThePast: XMLGregorianCalendar = DateHelpers.daysFromNow(-30) val overrideDate: XMLGregorianCalendar = privilegedUserOption.map(_.toPrivilegedUser).flatMap(_.overrideDate).getOrElse(thirtyDaysInThePast) //sorted instead of just finding max val counts: Seq[Long] = Queries.repeatedResults(authn.domain, authn.username, overrideDate).toSeq.sorted //and then grabbing the last, highest value in the sorted sequence val repeatedResultCount: Long = counts.lastOption.getOrElse(0L) val result = repeatedResultCount > threshold debug(s"User ${authn.domain}:${authn.username} locked out? $result") result } }.getOrElse(false) override def insertQuery(localMasterId: String, networkId: Long, authn: AuthenticationInfo, queryDefinition: QueryDefinition, isFlagged: Boolean, hasBeenRun: Boolean, flagMessage: Option[String]): Int = { inTransaction { val inserted = tables.shrineQueries.insert(new SquerylShrineQuery( 0, localMasterId, networkId, authn.username, authn.domain, XmlDateHelper.now, isFlagged, flagMessage, hasBeenRun, queryDefinition)) inserted.id } } /** * Insert rows into QueryResults, one for each QueryResult in the passed RunQueryResponse * Inserted rows are 'children' of the passed ShrineQuery (ie, they are the results of the query) */ override def insertQueryResults(parentQueryId: Int, results: Seq[QueryResult]): Map[ResultOutputType, Seq[Int]] = { def execTime(result: QueryResult): Option[Long] = { //TODO: How are locales handled here? Do we care? def toMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis for { start <- result.startDate end <- result.endDate } yield toMillis(end) - toMillis(start) } val typeToIdTuples = inTransaction { for { result <- results resultType = result.resultType.getOrElse(ResultOutputType.ERROR) //TODO: under what circumstances can QueryResults NOT have start and end dates set? elapsed = execTime(result) } yield { val lastInsertedQueryResultRow = tables.queryResults.insert(new SquerylQueryResultRow(0, result.resultId, parentQueryId, resultType, result.statusType, elapsed, XmlDateHelper.now)) (resultType, lastInsertedQueryResultRow.id) } } typeToIdTuples.groupBy { case (resultType, _) => resultType }.mapValues(_.map { case (_, count) => count }) } override def insertCountResult(resultId: Int, originalCount: Long, obfuscatedCount: Long) { //NB: Squeryl steers us toward inserting with dummy ids :( inTransaction { tables.countResults.insert(new SquerylCountRow(0, resultId, originalCount, obfuscatedCount, XmlDateHelper.now)) } } override def insertBreakdownResults(parentResultIds: Map[ResultOutputType, Seq[Int]], originalBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) { def merge(original: I2b2ResultEnvelope, obfuscated: I2b2ResultEnvelope): Map[String, ObfuscatedPair] = { Map.empty ++ (for { (key, originalValue) <- original.data obfuscatedValue <- obfuscated.data.get(key) } yield (key, ObfuscatedPair(originalValue, obfuscatedValue))) } inTransaction { for { (resultType, Seq(resultId)) <- parentResultIds if resultType.isBreakdown originalBreakdown <- originalBreakdowns.get(resultType) obfuscatedBreakdown <- obfuscatedBreakdowns.get(resultType) (key, ObfuscatedPair(original, obfuscated)) <- merge(originalBreakdown, obfuscatedBreakdown) } { tables.breakdownResults.insert(SquerylBreakdownResultRow(0, resultId, key, original, obfuscated)) } } } override def insertErrorResult(parentResultId: Int, errorMessage: String, codec:String, stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq) { //NB: Squeryl steers us toward inserting with dummy ids :( inTransaction { tables.errorResults.insert(SquerylShrineError(0, parentResultId, errorMessage, codec, stampText, summary, digestDescription, detailsXml.toString())) } } override def findQueryByNetworkId(networkQueryId: Long): Option[ShrineQuery] = { inTransaction { Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery) } } override def findQueriesByUserAndDomain(domain: String, username: String, howMany: Int): Seq[ShrineQuery] = { inTransaction { Queries.queriesForUser(username, domain).take(howMany).toSeq.map(_.toShrineQuery) } } + override def findQueriesByDomain(domain: String): Seq[ShrineQuery] = { + inTransaction { + Queries.queriesForDomain(domain).toSeq.map(_.toShrineQuery) + } + } + override def findResultsFor(networkQueryId: Long): Option[ShrineQueryResult] = { inTransaction { val breakdownRowsByType = Queries.breakdownResults(networkQueryId).toSeq.groupBy { case (outputType, _) => outputType.toQueryResultRow.resultType }.mapValues(_.map { case (_, row) => row.toBreakdownResultRow }) val queryRowOption = Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery) val countRowOption = Queries.countResults(networkQueryId).headOption.map(_.toCountRow) val queryResultRows = Queries.resultsForQuery(networkQueryId).toSeq.map(_.toQueryResultRow) val errorResultRows = Queries.errorResults(networkQueryId).toSeq.map(_.toShrineError) for { queryRow <- queryRowOption countRow <- countRowOption shrineQueryResult <- ShrineQueryResult.fromRows(queryRow, queryResultRows, countRow, breakdownRowsByType, errorResultRows) } yield { shrineQueryResult } } } /** * @author clint * @since Nov 19, 2012 */ object Queries { def privilegedUsers(domain: String, username: String): Query[SquerylPrivilegedUser] = { from(tables.privilegedUsers) { user => where(user.username === username and user.domain === domain).select(user) } } def repeatedResults(domain: String, username: String, overrideDate: XMLGregorianCalendar): Query[Long] = { val counts: Query[GroupWithMeasures[Long, Long]] = join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) => where(queryRow.username === username and queryRow.domain === domain and (countRow.originalValue <> 0L) and queryRow.dateCreated > DateHelpers.toTimestamp(overrideDate)). groupBy(countRow.originalValue). compute(count(countRow.originalValue)). on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId) } //Filter for result counts > 0 from(counts) { cnt => where(cnt.measures gt 0).select(cnt.measures) } } val queriesForAllUsers: Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => select(queryRow).orderBy(queryRow.dateCreated.desc) } } //TODO: Find a way to parameterize on limit, to avoid building the query every time //TODO: limit def queriesForUser(username: String, domain: String): Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => where(queryRow.domain === domain and queryRow.username === username). select(queryRow). orderBy(queryRow.dateCreated.desc) } } + def queriesForDomain(domain: String): Query[SquerylShrineQuery] = { + from(tables.shrineQueries) { queryRow => + where(queryRow.domain === domain). + select(queryRow). + orderBy(queryRow.dateCreated.desc) + } + } + val allCountResults: Query[SquerylCountRow] = { from(tables.countResults) { queryRow => select(queryRow) } } def queriesByNetworkId(networkQueryId: Long): Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId).select(queryRow) } } //TODO: Find out how to compose queries, to re-use queriesByNetworkId def queryNamesByNetworkId(networkQueryId: Long): Query[String] = { from(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId).select(queryRow.name) } } def resultsForQuery(networkQueryId: Long): Query[SquerylQueryResultRow] = { val resultsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) => where(queryRow.networkId === networkQueryId). select(resultRow). on(queryRow.id === resultRow.queryId) } from(resultsForNetworkQueryId)(select(_)) } def countResults(networkQueryId: Long): Query[SquerylCountRow] = { join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) => where(queryRow.networkId === networkQueryId). select(countRow). on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId) } } def errorResults(networkQueryId: Long): Query[SquerylShrineError] = { join(tables.shrineQueries, tables.queryResults, tables.errorResults) { (queryRow, resultRow, errorRow) => where(queryRow.networkId === networkQueryId). select(errorRow). on(queryRow.id === resultRow.queryId, resultRow.id === errorRow.resultId) } } //NB: using groupBy here is too much of a pain; do it 'manually' later def breakdownResults(networkQueryId: Long): Query[(SquerylQueryResultRow, SquerylBreakdownResultRow)] = { join(tables.shrineQueries, tables.queryResults, tables.breakdownResults) { (queryRow, resultRow, breakdownRow) => where(queryRow.networkId === networkQueryId). select((resultRow, breakdownRow)). on(queryRow.id === resultRow.queryId, resultRow.id === breakdownRow.resultId) } } } } \ No newline at end of file diff --git a/adapter/adapter-service/src/test/scala/net/shrine/adapter/dao/MockAdapterDao.scala b/adapter/adapter-service/src/test/scala/net/shrine/adapter/dao/MockAdapterDao.scala index 5b95ec172..6082dc874 100644 --- a/adapter/adapter-service/src/test/scala/net/shrine/adapter/dao/MockAdapterDao.scala +++ b/adapter/adapter-service/src/test/scala/net/shrine/adapter/dao/MockAdapterDao.scala @@ -1,59 +1,61 @@ package net.shrine.adapter.dao import net.shrine.adapter.dao.model.ShrineQuery import net.shrine.adapter.dao.model.ShrineQueryResult import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.I2b2ResultEnvelope import net.shrine.protocol.QueryResult import net.shrine.protocol.ResultOutputType import net.shrine.protocol.query.QueryDefinition import scala.xml.NodeSeq /** * @author clint * @since Oct 19, 2012 */ object MockAdapterDao extends MockAdapterDao trait MockAdapterDao extends AdapterDao { override def flagQuery(networkQueryId: Long, flagMessage: Option[String]): Unit = () override def unFlagQuery(networkQueryId: Long): Unit = () override def insertQuery(localMasterId: String, networkId: Long, authn: AuthenticationInfo, query: QueryDefinition, isFlagged: Boolean, hasBeenRun: Boolean, flagMessage: Option[String]): Int = 0 override def insertQueryResults(parentQueryId: Int, results: Seq[QueryResult]): Map[ResultOutputType, Seq[Int]] = Map.empty override def insertCountResult(resultId: Int, originalCount: Long, obfuscatedCount: Long): Unit = () override def insertBreakdownResults(parentResultIds: Map[ResultOutputType, Seq[Int]], originalBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Unit = () override def insertErrorResult(parentResultId: Int, errorMessage: String, codec:String, stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq) = () override def findQueryByNetworkId(networkQueryId: Long): Option[ShrineQuery] = None override def findQueriesByUserAndDomain(domain: String, username: String, howMany: Int): Seq[ShrineQuery] = Nil + override def findQueriesByDomain(domain: String): Seq[ShrineQuery] = Nil + override def findResultsFor(networkQueryId: Long): Option[ShrineQueryResult] = None override def isUserLockedOut(id: AuthenticationInfo, defaultThreshold: Int): Boolean = false override def renameQuery(networkQueryId: Long, newName: String): Unit = () override def deleteQuery(networkQueryId: Long): Unit = () override def deleteQueryResultsFor(networkQueryId: Long): Unit = () override def findRecentQueries(howMany: Int): Seq[ShrineQuery] = Nil override def storeResults(authn: AuthenticationInfo, masterId: String, networkQueryId: Long, queryDefinition: QueryDefinition, rawQueryResults: Seq[QueryResult], obfuscatedQueryResults: Seq[QueryResult], failedBreakdownTypes: Seq[ResultOutputType], mergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Unit = () } \ No newline at end of file diff --git a/commons/data-commons/src/main/scala/net/shrine/dao/squeryl/DataSourceSquerylInitializer.scala b/commons/data-commons/src/main/scala/net/shrine/dao/squeryl/DataSourceSquerylInitializer.scala index ad9359d52..b524fb71c 100644 --- a/commons/data-commons/src/main/scala/net/shrine/dao/squeryl/DataSourceSquerylInitializer.scala +++ b/commons/data-commons/src/main/scala/net/shrine/dao/squeryl/DataSourceSquerylInitializer.scala @@ -1,18 +1,18 @@ package net.shrine.dao.squeryl import org.squeryl.internals.DatabaseAdapter import org.squeryl.SessionFactory import org.squeryl.Session import javax.sql.DataSource /** * @author clint - * @date May 21, 2013 + * @since May 21, 2013 */ final class DataSourceSquerylInitializer(dataSource: DataSource, adapter: DatabaseAdapter) extends SquerylInitializer { override lazy val init: Unit = { SessionFactory.concreteFactory = Some { () => Session.create(dataSource.getConnection, adapter) } } } \ No newline at end of file diff --git a/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala b/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala index 6984997fb..b5d1f42a1 100644 --- a/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala +++ b/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala @@ -1,60 +1,61 @@ package net.shrine.slick import java.io.PrintWriter import java.sql.{DriverManager, Connection} import java.util.logging.Logger import javax.naming.InitialContext import javax.sql.DataSource import com.typesafe.config.Config /** * @author david * @since 1/26/16 */ object TestableDataSourceCreator { def dataSource(config:Config):DataSource = { val dataSourceFrom = config.getString("dataSourceFrom") if(dataSourceFrom == "JNDI") { val jndiDataSourceName = config.getString("jndiDataSourceName") val initialContext:InitialContext = new InitialContext() initialContext.lookup(jndiDataSourceName).asInstanceOf[DataSource] } else if (dataSourceFrom == "testDataSource") { val testDataSourceConfig = config.getConfig("testDataSource") val driverClassName = testDataSourceConfig.getString("driverClassName") val url = testDataSourceConfig.getString("url") //Creating an instance of the driver register it. (!) From a previous epoch, but it works. Class.forName(driverClassName).newInstance() object TestDataSource extends DataSource { + //todo this is the one used . probably needs to handle passwords override def getConnection: Connection = { DriverManager.getConnection(url) } override def getConnection(username: String, password: String): Connection = { DriverManager.getConnection(url, username, password) } //unused methods override def unwrap[T](iface: Class[T]): T = ??? override def isWrapperFor(iface: Class[_]): Boolean = ??? override def setLogWriter(out: PrintWriter): Unit = ??? override def getLoginTimeout: Int = ??? override def setLoginTimeout(seconds: Int): Unit = ??? override def getParentLogger: Logger = ??? override def getLogWriter: PrintWriter = ??? } TestDataSource } else throw new IllegalArgumentException(s"dataSourceFrom config value must be either JNDI or testDataSource, not $dataSourceFrom") } } diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryMaster.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryMaster.scala index af93915dc..2caea9b9a 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryMaster.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryMaster.scala @@ -1,24 +1,24 @@ package net.shrine.protocol import javax.xml.datatype.XMLGregorianCalendar /** * @author Bill Simons * @since 4/2/12 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ final case class QueryMaster ( queryMasterId: String, //Outside of tests, this is always the networkQueryId as a string networkQueryId: Long, name: String, userId: String, groupId: String, createDate: XMLGregorianCalendar, - held: Option[Boolean] = None, + held: Option[Boolean] = None, //todo field should be removed, along with supporting code and tests. It's never used. flagged: Option[Boolean] = None, flagMessage: Option[String] = None) \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala index c7ecbb2ce..a257b6e8c 100644 --- a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala +++ b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala @@ -1,528 +1,518 @@ package net.shrine.qep.queries import java.sql.SQLException import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName} import net.shrine.log.Loggable import net.shrine.problem.ProblemDigest import net.shrine.protocol.{ResultOutputTypes, DeleteQueryRequest, RenameQueryRequest, I2b2ResultEnvelope, QueryResult, ResultOutputType, DefaultBreakdownResultOutputTypes, UnFlagQueryRequest, FlagQueryRequest, QueryMaster, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RunQueryRequest} import net.shrine.qep.QepConfigSource import net.shrine.slick.TestableDataSourceCreator import net.shrine.util.XmlDateHelper import slick.driver.JdbcProfile import scala.collection.immutable.Iterable import scala.concurrent.duration.{Duration, DurationInt} import scala.concurrent.{Await, Future, blocking} import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.xml.XML /** * DB code for the QEP's query instances and query results. * * @author david * @since 1/19/16 */ case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) blocking { Await.result(future, 10 seconds) } } def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = { debug(s"insertQepQuery $runQueryRequest") insertQepQuery(QepQuery(runQueryRequest)) } def insertQepQuery(qepQuery: QepQuery):Unit = { dbRun(allQepQueryQuery += qepQuery) } def selectAllQepQueries:Seq[QepQuery] = { dbRun(mostRecentVisibleQepQueries.result) } //todo order def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = { val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain,request.fetchSize) val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set]) val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId))) ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2))) } //todo order def selectPreviousQueriesByUserAndDomain(userName: UserName, domain: String, limit:Int):Seq[QepQuery] = { dbRun(mostRecentVisibleQepQueries.filter(_.userName === userName).filter(_.userDomain === domain).take(limit).result) } def renamePreviousQuery(request:RenameQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName,changeDate = System.currentTimeMillis())) } yield queryResults ) } def markDeleted(request:DeleteQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true,changeDate = System.currentTimeMillis())) } yield queryResults ) } def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(flagQueryRequest)) } def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(unflagQueryRequest)) } def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = { dbRun(allQepQueryFlags += qepQueryFlag) } def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = { val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result) flags.map(x => x.networkQueryId -> x).toMap } def insertQepResultRow(qepQueryRow:QueryResultRow) = { dbRun(allQueryResultRows += qepQueryRow) } def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = { val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node")) val queryResultRow = QueryResultRow(networkQueryId,result) val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_)) val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq] dbRun( for { _ <- allQueryResultRows += queryResultRow _ <- allBreakdownResultsRows ++= breakdowns _ <- allProblemDigestRows ++= problem } yield () ) } def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = { dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result) } def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = { val (queryResults, breakdowns,problems) = dbRun( for { queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result breakdowns <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result } yield (queryResults, breakdowns, problems) ) val resultIdsToI2b2ResultEnvelopes: Map[Long, Map[ResultOutputType, I2b2ResultEnvelope]] = breakdowns.groupBy(_.resultId).map(rIdToB => rIdToB._1 -> QepQueryBreakdownResultsRow.resultEnvelopesFrom(rIdToB._2)) def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = { if(problemSeq.size == 1) problemSeq.head.toProblemDigest else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq") } val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) ) queryResults.map(r => r.toQueryResult( resultIdsToI2b2ResultEnvelopes.getOrElse(r.resultId,Map.empty), adapterNodesToProblemDigests.get(r.adapterNode) )) } def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = { dbRun(allBreakdownResultsRows += breakdownResultsRow) } def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = { dbRun(allBreakdownResultsRows.result) } } object QepQueryDb extends Loggable { val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config) val db = QepQueryDb(QepQuerySchema.schema,dataSource) val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) QepQueryDb.db.createTables() } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepQuerySchema(jdbcProfile: JdbcProfile,moreBreakdowns: Set[ResultOutputType]) extends Loggable { import jdbcProfile.api._ def ddlForAllTables: jdbcProfile.DDL = { allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema } //to get the schema, use the REPL //println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") { def networkId = column[NetworkQueryId]("networkId") def userName = column[UserName]("userName") def userDomain = column[String]("domain") def queryName = column[QueryName]("queryName") def expression = column[String]("expression") def dateCreated = column[Time]("dateCreated") def deleted = column[Boolean]("deleted") def queryXml = column[String]("queryXml") def changeDate = column[Long]("changeDate") def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply) } val allQepQueryQuery = TableQuery[QepQueries] val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for( queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists ) yield queries val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false) class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") { def networkId = column[NetworkQueryId]("networkId") def flagged = column[Boolean]("flagged") def flagMessage = column[String]("flagMessage") def changeDate = column[Long]("changeDate") def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply) } val allQepQueryFlags = TableQuery[QepQueryFlags] val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for( queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists ) yield queryFlags - //todo there may be other custom breakdowns in the config. Use that as the source val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values ++ moreBreakdowns val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap) implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({ (resultType: ResultOutputType) => queryResultTypesToString(resultType) },{ (string: String) => stringsToQueryResultTypes(string) }) implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({ statusType => statusType.name },{ name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}")) }) class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") { def resultId = column[Long]("resultId") def networkQueryId = column[NetworkQueryId]("networkQueryId") def instanceId = column[Long]("instanceId") def adapterNode = column[String]("adapterNode") def resultType = column[ResultOutputType]("resultType") def size = column[Long]("size") def startDate = column[Option[Long]]("startDate") def endDate = column[Option[Long]]("endDate") def status = column[QueryResult.StatusType]("status") def statusMessage = column[Option[String]]("statusMessage") def changeDate = column[Long]("changeDate") def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply) } val allQueryResultRows = TableQuery[QepQueryResults] //Most recent query result rows for each queryId from each adapter val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for( queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists ) yield queryResultRows class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def resultId = column[Long]("resultId") def resultType = column[ResultOutputType]("resultType") def dataKey = column[String]("dataKey") def value = column[Long]("value") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply) } val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults] //Most recent query result rows for each queryId from each adapter val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for( breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists ) yield breakdownResultsRows /* case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller { */ class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def codec = column[String]("codec") def stamp = column[String]("stamp") def summary = column[String]("summary") def description = column[String]("description") def details = column[String]("details") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply) } val allProblemDigestRows = TableQuery[QepResultProblemDigests] val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for( problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists ) yield problemDigests } object QepQuerySchema { val allConfig:Config = QepConfigSource.config val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName) import net.shrine.config.{ConfigExtensions, Keys} val moreBreakdowns: Set[ResultOutputType] = config.getOptionConfigured(Keys.breakdownResultOutputTypes,ResultOutputTypes.fromConfig).getOrElse(Set.empty) val schema = QepQuerySchema(slickProfile,moreBreakdowns) } case class QepQuery( networkId:NetworkQueryId, userName: UserName, userDomain: String, queryName: QueryName, expression: String, dateCreated: Time, deleted: Boolean, queryXml: String, changeDate: Time ){ def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = { QueryMaster( queryMasterId = networkId.toString, networkQueryId = networkId, name = queryName, userId = userName, groupId = userDomain, createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated), - held = None, //todo if a query is held at the adapter, how will we know? do we care? Question out to Bill and leadership + held = None, //todo this field is never used. Remove it in 1.22 flagged = qepQueryFlag.map(_.flagged), flagMessage = qepQueryFlag.map(_.flagMessage) ) } } object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,String,Time,Boolean,String,Time) => QepQuery) { def apply(runQueryRequest: RunQueryRequest):QepQuery = { new QepQuery( networkId = runQueryRequest.networkQueryId, userName = runQueryRequest.authn.username, userDomain = runQueryRequest.authn.domain, queryName = runQueryRequest.queryDefinition.name, expression = runQueryRequest.queryDefinition.expr.getOrElse("No Expression").toString, dateCreated = System.currentTimeMillis(), deleted = false, queryXml = runQueryRequest.toXmlString, changeDate = System.currentTimeMillis() ) } } case class QepQueryFlag( networkQueryId: NetworkQueryId, flagged:Boolean, flagMessage:String, changeDate:Long ) object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) { def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = flagQueryRequest.networkQueryId, flagged = true, flagMessage = flagQueryRequest.message.getOrElse(""), changeDate = System.currentTimeMillis() ) } def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = unflagQueryRequest.networkQueryId, flagged = false, flagMessage = "", changeDate = System.currentTimeMillis() ) } } -/* - - //todo problemDigest in a separate table - problemDigest: Option[ProblemDigest] = None, - - //todo breakdowns in a separate table - breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty - */ - case class QueryResultRow( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:ResultOutputType, size:Long, startDate:Option[Long], endDate:Option[Long], status:QueryResult.StatusType, statusMessage:Option[String], changeDate:Long ) { def toQueryResult(breakdowns:Map[ResultOutputType,I2b2ResultEnvelope],problemDigest:Option[ProblemDigest]) = QueryResult( resultId = resultId, instanceId = instanceId, resultType = Some(resultType), setSize = size, startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar), endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar), description = Some(adapterNode), statusType = status, statusMessage = statusMessage, breakdowns = breakdowns, problemDigest = problemDigest ) } object QueryResultRow extends ((Long,NetworkQueryId,Long,String,ResultOutputType,Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow) { def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = { new QueryResultRow( resultId = result.resultId, networkQueryId = networkQueryId, instanceId = result.instanceId, adapterNode = result.description.getOrElse(s"$result has None in its description field, not a name of an adapter node."), resultType = result.resultType.getOrElse(ResultOutputType.PATIENT_COUNT_XML), //todo how is this optional?? size = result.setSize, startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis), endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis), status = result.statusType, statusMessage = result.statusMessage, changeDate = System.currentTimeMillis() ) } } case class QepQueryBreakdownResultsRow( networkQueryId: NetworkQueryId, adapterNode:String, resultId:Long, resultType: ResultOutputType, dataKey:String, value:Long, changeDate:Long ) object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){ def breakdownRowsFor(networkQueryId:NetworkQueryId, adapterNode:String, resultId:Long, breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = { breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis())) } def resultEnvelopesFrom(breakdowns:Seq[QepQueryBreakdownResultsRow]): Map[ResultOutputType, I2b2ResultEnvelope] = { def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = { val data = breakdowns.map(b => b.dataKey -> b.value).toMap I2b2ResultEnvelope(resultType,data) } breakdowns.groupBy(_.resultType).map(r => r._1 -> resultEnvelopeFrom(r._1,r._2)) } } case class QepProblemDigestRow( networkQueryId: NetworkQueryId, adapterNode: String, codec: String, stampText: String, summary: String, description: String, details: String, changeDate:Long ){ def toProblemDigest = { ProblemDigest( codec, stampText, summary, description, if(!details.isEmpty) XML.loadString(details) else

) } } diff --git a/tools/adapter-queries-to-qep/pom.xml b/tools/adapter-queries-to-qep/pom.xml index 9bfdceb50..94d9d842e 100644 --- a/tools/adapter-queries-to-qep/pom.xml +++ b/tools/adapter-queries-to-qep/pom.xml @@ -1,54 +1,52 @@ 4.0.0 SHRINE Copy Adapter Queries to QEP adapter-queries-to-qep jar net.shrine shrine-tools 1.21.0-SNAPSHOT - + net.shrine - shrine-crypto + shrine-data-commons ${project.version} - - net.shrine - shrine-utility-commons - ${project.version} + + com.typesafe + config net.shrine - shrine-utility-commons + shrine-adapter-service ${project.version} - test-jar - test - com.typesafe - config + net.shrine + shrine-qep + ${project.version} src/main/scala src/test/scala net.alchim31.maven scala-maven-plugin maven-assembly-plugin adapter-queries-to-qep-${project.version} false diff --git a/tools/adapter-queries-to-qep/src/main/examples/adapter-queries-to-qep.conf b/tools/adapter-queries-to-qep/src/main/examples/adapter-queries-to-qep.conf index 1811ac162..1f9e44311 100644 --- a/tools/adapter-queries-to-qep/src/main/examples/adapter-queries-to-qep.conf +++ b/tools/adapter-queries-to-qep/src/main/examples/adapter-queries-to-qep.conf @@ -1,21 +1,19 @@ -batch { -// === URL of the Shrine node to scan === -// shrineUrl = "https://some-shrine-node.example.com:6060/shrine-cell/rest/shrine" +shrine { + adapter { + queries { -// === i2b2 project id to use when querying === -// projectId = "SHRINE" + database { + slickProfileClassName = "slick.driver.H2Driver$" + createTablesOnStart = true //for testing with H2 in memory, when not running unit tests. Set to false normally -// === i2b2 topic id to use when querying === -// topicId = "4" + dataSourceFrom = "testDataSource" //Can be JNDI or testDataSource . Use testDataSource for tests, JNDI everywhere else -// === Credentials to use when querying === -// credentials { -// domain = "ExampleDomain" -// username = "ExampleUser" -// password = "ExamplePassword" -// } - -// inputFile = "/path/to/my/queries.xml" - -// outputFile = "/path/to/desired/output.csv" -} + testDataSource { + driverClassName = "org.h2.Driver" + url = "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1" //H2 embedded in-memory for unit tests ;TRACE_LEVEL_SYSTEM_OUT=2 for H2's trace + } + } + } + } + //may need breakdownResultOutputTypes {} +} \ No newline at end of file diff --git a/tools/adapter-queries-to-qep/src/main/scala/net/shrine/utilities/adapterqueriestoqep/AdapterQueriesToQep.scala b/tools/adapter-queries-to-qep/src/main/scala/net/shrine/utilities/adapterqueriestoqep/AdapterQueriesToQep.scala index ced4ee22e..af948712f 100644 --- a/tools/adapter-queries-to-qep/src/main/scala/net/shrine/utilities/adapterqueriestoqep/AdapterQueriesToQep.scala +++ b/tools/adapter-queries-to-qep/src/main/scala/net/shrine/utilities/adapterqueriestoqep/AdapterQueriesToQep.scala @@ -1,25 +1,80 @@ package net.shrine.utilities.adapterqueriestoqep import java.io.File +import javax.sql.DataSource +import javax.xml.datatype.XMLGregorianCalendar -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{Config, ConfigFactory} +import net.shrine.adapter.dao.AdapterDao +import net.shrine.adapter.dao.model.ShrineQuery +import net.shrine.adapter.dao.squeryl.SquerylAdapterDao +import net.shrine.dao.squeryl.{DataSourceSquerylInitializer, SquerylInitializer, SquerylDbAdapterSelecter} +import net.shrine.adapter.dao.squeryl.tables.{Tables => AdapterTables} +import net.shrine.protocol.ResultOutputTypes +import net.shrine.qep.queries.{QepQueryDb, QepQuery, QepQueryFlag} +import net.shrine.slick.TestableDataSourceCreator +import org.squeryl.internals.DatabaseAdapter +import net.shrine.config.ConfigExtensions /** * @author dwalend * @since 1.21 */ object AdapterQueriesToQep { def main(args: Array[String]): Unit = { - if(args.length < 2) throw new IllegalArgumentException("Requires at least two arguments, full paths to the adapter-queries-to-qep.conf file and the shrine.conf file.") + if(args.length < 3) throw new IllegalArgumentException("Requires three arguments: the domain to transfer, the full path to the adapter-queries-to-qep.conf file, and the full path to the shrine.conf file.") - val localConfig = args(0) - val shrineConfig = args(1) + val domain = args(0) + val localConfig = args(1) + val shrineConfig = args(2) - val config = ConfigFactory.parseFile(new File(localConfig)).withFallback(ConfigFactory.load(shrineConfig)) + val config: Config = ConfigFactory.parseFile(new File(localConfig)).withFallback(ConfigFactory.load(shrineConfig)) + val adapterDataSource: DataSource = TestableDataSourceCreator.dataSource(config.getConfig("shrine.adapter.query.database")) + val squerylAdapter: DatabaseAdapter = SquerylDbAdapterSelecter.determineAdapter(config.getString("shrine.shrineDatabaseType")) + val squerylInitializer: SquerylInitializer = new DataSourceSquerylInitializer(adapterDataSource, squerylAdapter) + val squerylAdapterTables: AdapterTables = new AdapterTables + val breakdownTypes = config.getOptionConfigured("breakdownResultOutputTypes",ResultOutputTypes.fromConfig).getOrElse(Set.empty) + val adapterDao: AdapterDao = new SquerylAdapterDao(squerylInitializer, squerylAdapterTables)(breakdownTypes) + val adapterQueries: Seq[ShrineQuery] = adapterDao.findQueriesByDomain(domain) + + //turn each ShrineQuery into a QepQuery and store it + adapterQueries.map(shrineQueryToQepQuery).foreach(QepQueryDb.db.insertQepQuery) + + //make flags for each ShrineQuery and store that + adapterQueries.flatMap(shrineQueryToQepQueryFlag).foreach(QepQueryDb.db.insertQepQueryFlag) + } + + def shrineQueryToQepQuery(shrineQuery: ShrineQuery):QepQuery = { + val date:Long = toMillis(shrineQuery.dateCreated) + new QepQuery( + networkId = shrineQuery.networkId, + userName = shrineQuery.username, + userDomain = shrineQuery.domain, + queryName = shrineQuery.name, + expression = shrineQuery.queryDefinition.expr.fold("")(expression => expression.toXml.text), + dateCreated = date, + deleted = false, + queryXml = shrineQuery.queryDefinition.toXml.text, + changeDate = date + ) + } + + def shrineQueryToQepQueryFlag(shrineQuery: ShrineQuery):Option[QepQueryFlag] = { + + if(shrineQuery.isFlagged){ + Some(new QepQueryFlag( + networkQueryId = shrineQuery.networkId, + flagged = shrineQuery.isFlagged, + flagMessage = shrineQuery.flagMessage.getOrElse(""), + changeDate = toMillis(shrineQuery.dateCreated) + )) + } + else None } + private def toMillis(xmlGc: XMLGregorianCalendar): Long = xmlGc.toGregorianCalendar.getTimeInMillis } \ No newline at end of file diff --git a/tools/adapter-queries-to-qep/src/test/resources/adapter-queries-to-qep.conf b/tools/adapter-queries-to-qep/src/test/resources/adapter-queries-to-qep.conf index 2199c3a75..e69de29bb 100644 --- a/tools/adapter-queries-to-qep/src/test/resources/adapter-queries-to-qep.conf +++ b/tools/adapter-queries-to-qep/src/test/resources/adapter-queries-to-qep.conf @@ -1,24 +0,0 @@ -batch { - // === URL of the Shrine node to scan === - shrineUrl = "https://some-shrine-node.example.com:6060/shrine-cell/rest/" - - // === i2b2 project id to use when querying === - projectId = "SHRINE" - - // === i2b2 topic id to use when querying === - topicId = "some-topic-id" - - // === Credentials to use when querying === - credentials { - domain = "ExampleDomain" - username = "ExampleUser" - password = "ExamplePassword" - } - - inputFile = "/path/to/my/queries.xml" - - outputFile = "/path/to/desired/output.csv" - - //NB: If omitted, default is 3 - queriesPerTerm = 99 -} \ No newline at end of file diff --git a/tools/pom.xml b/tools/pom.xml index a01662d26..3ac848a62 100644 --- a/tools/pom.xml +++ b/tools/pom.xml @@ -1,80 +1,81 @@ 4.0.0 SHRINE Tools shrine-tools pom net.shrine shrine-base 1.21.0-SNAPSHOT 1.5 2.4 monitor utility-commons scanner batch-querier + adapter-queries-to-qep mapping-automation net.shrine shrine-protocol ${project.version} org.codehaus.gmaven gmaven-plugin ${gmaven-plugin-version} compile testCompile true true true maven-assembly-plugin ${assembly-plugin-version} assembly package single src/main/assembly/assembly.xml