diff --git a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala index ff07d83ad..322e1fd0c 100644 --- a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala +++ b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala @@ -1,214 +1,213 @@ package net.shrine.qep import net.shrine.aggregation.{Aggregator, Aggregators, DeleteQueryAggregator, FlagQueryAggregator, ReadInstanceResultsAggregator, ReadQueryDefinitionAggregator, RenameQueryAggregator, RunQueryAggregator, UnFlagQueryAggregator} import net.shrine.authentication.{AuthenticationResult, Authenticator, NotAuthenticatedException} import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized} import net.shrine.authorization.QueryAuthorizationService import net.shrine.broadcaster.BroadcastAndAggregationService import net.shrine.log.Loggable import net.shrine.protocol.{AggregatedRunQueryResponse, AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, Credential, DeleteQueryRequest, FlagQueryRequest, QueryInstance, ReadApprovedQueryTopicsRequest, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, ReadQueryDefinitionRequest, ReadQueryInstancesRequest, ReadQueryInstancesResponse, ReadResultOutputTypesRequest, ReadResultOutputTypesResponse, RenameQueryRequest, ResultOutputType, RunQueryRequest, UnFlagQueryRequest} import net.shrine.qep.audit.QepAuditDb import net.shrine.qep.dao.AuditDao import net.shrine.qep.queries.QepQueryDb import net.shrine.util.XmlDateHelper import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration /** * @author clint * @since Feb 19, 2014 */ trait AbstractQepService[BaseResp <: BaseShrineResponse] extends Loggable { val commonName:String val auditDao: AuditDao val authenticator: Authenticator val authorizationService: QueryAuthorizationService val includeAggregateResult: Boolean val broadcastAndAggregationService: BroadcastAndAggregationService val queryTimeout: Duration val breakdownTypes: Set[ResultOutputType] val collectQepAudit:Boolean protected def doReadResultOutputTypes(request: ReadResultOutputTypesRequest): BaseResp = { info(s"doReadResultOutputTypes($request)") authenticateAndThen(request) { authResult => val resultOutputTypes = ResultOutputType.nonErrorTypes ++ breakdownTypes //TODO: XXX: HACK: Would like to remove the cast ReadResultOutputTypesResponse(resultOutputTypes).asInstanceOf[BaseResp] } } protected def doFlagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { QepQueryDb.db.insertQepQueryFlag(request) doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast) } protected def doUnFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { QepQueryDb.db.insertQepQueryFlag(request) doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast) } protected def doRunQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor") //store the query in the qep's database doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast) } protected def doReadQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doReadQueryDefinition($request,$shouldBroadcast)") doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast) } protected def doReadInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doReadInstanceResults($request,$shouldBroadcast)") //todo try reading directly from the QEP database code here doBroadcastQuery(request, new ReadInstanceResultsAggregator(request.shrineNetworkQueryId, false), shouldBroadcast) } protected def doReadQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doReadQueryInstances($request,$shouldBroadcast)") authenticateAndThen(request) { authResult => val now = XmlDateHelper.now val networkQueryId = request.queryId val username = request.authn.username val groupId = request.projectId //NB: Return a dummy response, with a dummy QueryInstance containing the network (Shrine) id of the query we'd like //to get "instances" for. This allows the legacy web client to formulate a request for query results that Shrine //can understand, while meeting the conversational requirements of the legacy web client. val instance = QueryInstance(networkQueryId.toString, networkQueryId.toString, username, groupId, now, now) //TODO: XXX: HACK: Would like to remove the cast //NB: Munge in username from authentication result ReadQueryInstancesResponse(networkQueryId, authResult.username, groupId, Seq(instance)).asInstanceOf[BaseResp] } } protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): ReadPreviousQueriesResponse = { info(s"doReadPreviousQueries($request,$shouldBroadcast)") //check results. If any results are in one of many pending states, go ahead and request them. (Maybe go async) //pull queries from the local database. QepQueryDb.db.selectPreviousQueries(request) } protected def doRenameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doRenameQuery($request,$shouldBroadcast)") doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast) } protected def doDeleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doDeleteQuery($request,$shouldBroadcast)") doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast) } protected def doReadApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseResp = authenticateAndThen(request) { _ => info(s"doReadApprovedQueryTopics($request,$shouldBroadcast)") //TODO: XXX: HACK: Would like to remove the cast authorizationService.readApprovedEntries(request) match { case Left(errorResponse) => errorResponse.asInstanceOf[BaseResp] case Right(validResponse) => validResponse.asInstanceOf[BaseResp] } } import broadcastAndAggregationService.sendAndAggregate protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => debug(s"doBroadcastQuery($request) authResult is $authResult") //NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another //When making BroadcastMessages val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false)) //NB: Only audit RunQueryRequests request match { case runQueryRequest: RunQueryRequest => // inject modified, authorized runQueryRequest //although it might make more sense to put this whole if block in the aggregator, the RunQueryAggregator lives in the hub, far from this DB code auditAuthorizeAndThen(runQueryRequest) { authorizedRequest => debug(s"doBroadcastQuery authorizedRequest is $authorizedRequest") // tuck the ACT audit metrics data into a database here if (collectQepAudit) QepAuditDb.db.insertQepQuery(authorizedRequest,commonName) QepQueryDb.db.insertQepQuery(authorizedRequest) val response: BaseResp = doSynchronousQuery(networkAuthn,authorizedRequest,aggregator,shouldBroadcast) response match { - case aggregated:AggregatedRunQueryResponse => { - //todo start here aggregated.results.map(r => QepAuditDb.db.insert) - } + case aggregated:AggregatedRunQueryResponse => aggregated.results.map(QepQueryDb.db.insertQueryResult) + case _ => debug(s"Unanticipated response type $response") } response } case _ => doSynchronousQuery(networkAuthn,request,aggregator,shouldBroadcast) } } } private def doSynchronousQuery(networkAuthn: AuthenticationInfo,request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean) = { info(s"doSynchronousQuery($request) started") val response = waitFor(sendAndAggregate(networkAuthn, request, aggregator, shouldBroadcast)).asInstanceOf[BaseResp] info(s"doSynchronousQuery($request) completed with response $response") response } private[qep] val runQueryAggregatorFor: RunQueryRequest => RunQueryAggregator = Aggregators.forRunQueryRequest(includeAggregateResult) protected def waitFor[R](futureResponse: Future[R]): R = { XmlDateHelper.time("Waiting for aggregated results")(debug(_)) { Await.result(futureResponse, queryTimeout) } } private[qep] def auditAuthorizeAndThen[T](request: RunQueryRequest)(body: (RunQueryRequest => T)): T = { auditTransactionally(request) { debug(s"auditAuthorizeAndThen($request) with $authorizationService") val authorizedRequest = authorizationService.authorizeRunQueryRequest(request) match { case na: NotAuthorized => throw na.toException case authorized: Authorized => request.copy(topicName = authorized.topicIdAndName.map(x => x._2)) } body(authorizedRequest) } } private[qep] def auditTransactionally[T](request: RunQueryRequest)(body: => T): T = { try { body } finally { auditDao.addAuditEntry( request.projectId, request.authn.domain, request.authn.username, request.queryDefinition.toI2b2String, //TODO: Use i2b2 format Still? request.topicId) } } import AuthenticationResult._ private[qep] def authenticateAndThen[T](request: BaseShrineRequest)(f: Authenticated => T): T = { val AuthenticationInfo(domain, username, _) = request.authn val authResult = authenticator.authenticate(request.authn) authResult match { case a: Authenticated => f(a) case NotAuthenticated(_, _, reason) => throw new NotAuthenticatedException(s"User $domain:$username could not be authenticated: $reason") } } } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala index 622fc63aa..7a3b93793 100644 --- a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala +++ b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala @@ -1,406 +1,404 @@ package net.shrine.qep.queries import java.sql.SQLException import java.util.GregorianCalendar import javax.sql.DataSource import javax.xml.datatype.DatatypeFactory import com.typesafe.config.Config import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName} import net.shrine.log.Loggable import net.shrine.protocol.{QueryResult, ResultOutputType, DefaultBreakdownResultOutputTypes, UnFlagQueryRequest, FlagQueryRequest, QueryMaster, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RunQueryRequest} import net.shrine.qep.QepConfigSource import net.shrine.slick.TestableDataSourceCreator import slick.driver.JdbcProfile import scala.concurrent.duration.{Duration, DurationInt} import scala.concurrent.{Await, Future, blocking} import scala.language.postfixOps /** * DB code for the QEP's query instances and query results. * * @author david * @since 1/19/16 */ case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) blocking { Await.result(future, 10 seconds) } } def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = { debug(s"insertQepQuery $runQueryRequest") insertQepQuery(QepQuery(runQueryRequest)) } def insertQepQuery(qepQuery: QepQuery):Unit = { dbRun(allQepQueryQuery += qepQuery) } def selectAllQepQueries:Seq[QepQuery] = { dbRun(allQepQueryQuery.result) } //todo order def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = { val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain) val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set]) val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId))) ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2))) } //todo order def selectPreviousQueriesByUserAndDomain(userName: UserName,domain: String):Seq[QepQuery] = { dbRun(allQepQueryQuery.filter(_.userName === userName).filter(_.userDomain === domain).result) } def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(flagQueryRequest)) } def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(unflagQueryRequest)) } def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = { dbRun(allQepQueryFlags += qepQueryFlag) } def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = { val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result) flags.map(x => x.networkQueryId -> x).toMap } def insertQepResultRow(qepQueryRow:QueryResultRow) = { dbRun(allQueryResultRows += qepQueryRow) } def insertQueryResult(result:QueryResult) = { -//todo insertQepResultRow(QueryResultRow(result)) + insertQepResultRow(QueryResultRow(result)) } def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = { dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result) } } object QepQueryDb extends Loggable { val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config) val db = QepQueryDb(QepQuerySchema.schema,dataSource) val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) QepQueryDb.db.createTables() } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepQuerySchema(jdbcProfile: JdbcProfile) extends Loggable { import jdbcProfile.api._ def ddlForAllTables: jdbcProfile.DDL = { allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema } //to get the schema, use the REPL //println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") { def networkId = column[NetworkQueryId]("networkId") def userName = column[UserName]("userName") def userDomain = column[String]("domain") def queryName = column[QueryName]("queryName") def expression = column[String]("expression") def dateCreated = column[Time]("dateCreated") def queryXml = column[String]("queryXml") def * = (networkId,userName,userDomain,queryName,expression,dateCreated,queryXml) <> (QepQuery.tupled,QepQuery.unapply) } val allQepQueryQuery = TableQuery[QepQueries] class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") { def networkId = column[NetworkQueryId]("networkId") def flagged = column[Boolean]("flagged") def flagMessage = column[String]("flagMessage") def changeDate = column[Long]("changeDate") def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply) } val allQepQueryFlags = TableQuery[QepQueryFlags] val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for( queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists ) yield queryFlags /** * The adapter's QUERY_RESULTS table looks like this: * * mysql> describe QUERY_RESULT; +--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----+-------------------+----------------+ | Field | Type | Null | Key | Default | Extra | +--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----+-------------------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | local_id | varchar(255) | NO | | NULL | | | query_id | int(11) | NO | MUL | NULL | | | type | enum('PATIENTSET','PATIENT_COUNT_XML','PATIENT_AGE_COUNT_XML','PATIENT_RACE_COUNT_XML','PATIENT_VITALSTATUS_COUNT_XML','PATIENT_GENDER_COUNT_XML','ERROR') | NO | | NULL | | | status | enum('FINISHED','ERROR','PROCESSING','QUEUED') | NO | | NULL | | | time_elapsed | int(11) | YES | | NULL | | | last_updated | timestamp | NO | | CURRENT_TIMESTAMP | | +--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----+-------------------+----------------+ */ val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap) implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({ (resultType: ResultOutputType) => queryResultTypesToString(resultType) },{ (string: String) => stringsToQueryResultTypes(string) }) implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({ statusType => statusType.name },{ name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}")) }) //todo what of these actually get used? class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") { def resultId = column[Long]("resultId") - def localId = column[String]("localId") def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def resultType = column[ResultOutputType]("resultType") - def setSize = column[Long]("size") + def size = column[Long]("size") def startDate = column[Option[Long]]("startDate") def endDate = column[Option[Long]]("endDate") def description = column[Option[String]]("description") def status = column[QueryResult.StatusType]("status") def statusMessage = column[Option[String]]("statusMessage") def changeDate = column[Long]("changeDate") - def * = (resultId,localId,networkQueryId,adapterNode,resultType,setSize,startDate,endDate,description,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply) + def * = (resultId,networkQueryId,adapterNode,resultType,size,startDate,endDate,description,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply) } val allQueryResultRows = TableQuery[QepQueryResults] //Most recent query result rows for each queryId from each adapter val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for( queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists ) yield queryResultRows /* with some other aux tables to hold specifics: mysql> describe COUNT_RESULT; +------------------+-----------+------+-----+-------------------+----------------+ | Field | Type | Null | Key | Default | Extra | +------------------+-----------+------+-----+-------------------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | result_id | int(11) | NO | MUL | NULL | | | original_count | int(11) | NO | | NULL | | | obfuscated_count | int(11) | NO | | NULL | | | date_created | timestamp | NO | | CURRENT_TIMESTAMP | | +------------------+-----------+------+-----+-------------------+----------------+ mysql> describe BREAKDOWN_RESULT; +------------------+--------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +------------------+--------------+------+-----+---------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | result_id | int(11) | NO | MUL | NULL | | | data_key | varchar(255) | NO | | NULL | | | original_value | int(11) | NO | | NULL | | | obfuscated_value | int(11) | NO | | NULL | | +------------------+--------------+------+-----+---------+----------------+ mysql> describe ERROR_RESULT; +---------------------+--------------+------+-----+--------------------------+----------------+ | Field | Type | Null | Key | Default | Extra | +---------------------+--------------+------+-----+--------------------------+----------------+ | id | int(11) | NO | PRI | NULL | auto_increment | | result_id | int(11) | NO | MUL | NULL | | | message | varchar(255) | NO | | NULL | | | CODEC | varchar(256) | NO | | Pre-1.20 Error | | | SUMMARY | text | NO | | NULL | | | DESCRIPTION | text | NO | | NULL | | | PROBLEM_DESCRIPTION | text | NO | | NULL | | | DETAILS | text | NO | | NULL | | | STAMP | varchar(256) | NO | | Unknown time and machine | | +---------------------+--------------+------+-----+--------------------------+----------------+ */ } object QepQuerySchema { val allConfig:Config = QepConfigSource.config val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName) val schema = QepQuerySchema(slickProfile) } case class QepQuery( networkId:NetworkQueryId, userName: UserName, userDomain: String, queryName: QueryName, expression: String, dateCreated: Time, queryXml:String ){ def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = { val gregorianCalendar = new GregorianCalendar() gregorianCalendar.setTimeInMillis(dateCreated) val xmlGregorianCalendar = DatatypeFactory.newInstance().newXMLGregorianCalendar(gregorianCalendar) QueryMaster( queryMasterId = networkId.toString, networkQueryId = networkId, name = queryName, userId = userName, groupId = userDomain, createDate = xmlGregorianCalendar, held = None, //todo if a query is held at the adapter, how will we know? do we care? Question out to Bill and leadership flagged = qepQueryFlag.map(_.flagged), flagMessage = qepQueryFlag.map(_.flagMessage) ) } } object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,String,Time,String) => QepQuery) { def apply(runQueryRequest: RunQueryRequest):QepQuery = { new QepQuery( networkId = runQueryRequest.networkQueryId, userName = runQueryRequest.authn.username, userDomain = runQueryRequest.authn.domain, queryName = runQueryRequest.queryDefinition.name, expression = runQueryRequest.queryDefinition.expr.getOrElse("No Expression").toString, dateCreated = System.currentTimeMillis(), queryXml = runQueryRequest.toXmlString ) } } case class QepQueryFlag( networkQueryId: NetworkQueryId, flagged:Boolean, flagMessage:String, changeDate:Long ) object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) { def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = flagQueryRequest.networkQueryId, flagged = true, flagMessage = flagQueryRequest.message.getOrElse(""), changeDate = System.currentTimeMillis() ) } def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = unflagQueryRequest.networkQueryId, flagged = false, flagMessage = "", changeDate = System.currentTimeMillis() ) } } /* - resultId: Long, - instanceId: Long, - resultType: Option[ResultOutputType], - setSize: Long, - startDate: Option[XMLGregorianCalendar], - endDate: Option[XMLGregorianCalendar], - description: Option[String], - statusType: StatusType, - statusMessage: Option[String], //todo problemDigest in a separate table problemDigest: Option[ProblemDigest] = None, //todo breakdowns in a separate table breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty */ case class QueryResultRow( - resultId:Long, - localId:String, - networkQueryId:NetworkQueryId, //the query's instanceId //todo verify - adapterNode:String, - resultType:ResultOutputType, - setSize:Long, - startDate:Option[Long], - endDate:Option[Long], - description:Option[String], - status:QueryResult.StatusType, - statusMessage:Option[String], - changeDate:Long + resultId:Long, + networkQueryId:NetworkQueryId, //the query's instanceId //todo verify + adapterNode:String, + resultType:ResultOutputType, + size:Long, + startDate:Option[Long], + endDate:Option[Long], + description:Option[String], + status:QueryResult.StatusType, + statusMessage:Option[String], + changeDate:Long ) { } -/* -object QueryResultRow { +object QueryResultRow extends ((Long,NetworkQueryId,String,ResultOutputType,Long,Option[Long],Option[Long],Option[String],QueryResult.StatusType,Option[String],Long) => QueryResultRow) +{ def apply(result:QueryResult):QueryResultRow = { - QueryResultRow( - + new QueryResultRow( + resultId = result.resultId, + networkQueryId = result.instanceId, + adapterNode = "todo.com", //todo find this somewhere + resultType = result.resultType.getOrElse(ResultOutputType.PATIENT_COUNT_XML), //todo how is this optional?? + size = result.setSize, + startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis), + endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis), + description = result.description, + status = result.statusType, + statusMessage = result.statusMessage, + changeDate = System.currentTimeMillis() ) } } -*/ \ No newline at end of file diff --git a/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala b/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala index 96c241c28..06f529406 100644 --- a/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala +++ b/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala @@ -1,165 +1,147 @@ package net.shrine.qep.queries import net.shrine.protocol.{QueryResult, ResultOutputType} import net.shrine.util.ShouldMatchersForJUnit import org.junit.{After, Before, Test} /** * @author david * @since 1/20/16 */ class QepQueryDbTest extends ShouldMatchersForJUnit { val qepQuery = QepQuery( networkId = 1L, userName = "ben", userDomain = "testDomain", queryName = "testQuery", expression = "testExpression", dateCreated = System.currentTimeMillis(), queryXml = "testXML" ) val secondQepQuery = QepQuery( networkId = 2L, userName = "dave", userDomain = "testDomain", queryName = "testQuery", expression = "testExpression", dateCreated = System.currentTimeMillis(), queryXml = "testXML" ) val flag = QepQueryFlag( networkQueryId = 1L, flagged = true, flagMessage = "This query is flagged", changeDate = System.currentTimeMillis() ) @Test def testInsertQepQuery() { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepQuery(secondQepQuery) val results = QepQueryDb.db.selectAllQepQueries results should equal(Seq(qepQuery,secondQepQuery)) } @Test def testSelectQepQueriesForUser() { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepQuery(secondQepQuery) val results = QepQueryDb.db.selectPreviousQueriesByUserAndDomain("ben","testDomain") results should equal(Seq(qepQuery)) } @Test def testSelectQueryFlags() { val results1 = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(Set(1L,2L)) results1 should equal(Map.empty) QepQueryDb.db.insertQepQueryFlag(flag) val results2 = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(Set(1L,2L)) results2 should equal(Map(1L -> flag)) } - /* - resultId:Long, - localId:String, - networkQueryId:NetworkQueryId, //the query's instanceId //todo verify - adapterNode:String, - resultType:ResultOutputType, - setSize:Long, - startDate:Option[Long], - endDate:Option[Long], - description:Option[String], - status:QueryResult.StatusType, - statusMessage:Option[String], - changeDate:Long - - */ val qepResultRowFromExampleCom = QueryResultRow( resultId = 10L, - localId = "Don't care", networkQueryId = 1L, adapterNode = "example.com", resultType = ResultOutputType.PATIENT_COUNT_XML, - setSize = 30L, + size = 30L, startDate = Some(System.currentTimeMillis() - 60), endDate = Some(System.currentTimeMillis() - 30), description = None, status = QueryResult.StatusType.Finished, statusMessage = None, changeDate = System.currentTimeMillis() ) @Test def testInsertQueryResultRow() { QepQueryDb.db.insertQepResultRow(qepResultRowFromExampleCom) val results = QepQueryDb.db.selectMostRecentQepResultRowsFor(1L) results should equal(Seq(qepResultRowFromExampleCom)) } val qepResultRowFromExampleComInThePast = QueryResultRow( resultId = 8L, - localId = "Don't care", networkQueryId = 1L, adapterNode = "example.com", resultType = ResultOutputType.PATIENT_COUNT_XML, - setSize = 0L, + size = 0L, startDate = qepResultRowFromExampleCom.startDate, endDate = None, description = None, status = QueryResult.StatusType.Processing, statusMessage = None, changeDate = qepResultRowFromExampleCom.changeDate - 40 ) val qepResultRowFromGeneralHospital = QueryResultRow( resultId = 100L, - localId = "Don't care", networkQueryId = 1L, adapterNode = "generalhospital.org", resultType = ResultOutputType.PATIENT_COUNT_XML, - setSize = 100L, + size = 100L, startDate = Some(System.currentTimeMillis() - 60), endDate = Some(System.currentTimeMillis() - 30), description = None, status = QueryResult.StatusType.Finished, statusMessage = None, changeDate = System.currentTimeMillis() ) @Test def testGetMostRecentResultRows() { QepQueryDb.db.insertQepResultRow(qepResultRowFromExampleComInThePast) QepQueryDb.db.insertQepResultRow(qepResultRowFromGeneralHospital) QepQueryDb.db.insertQepResultRow(qepResultRowFromExampleCom) val results = QepQueryDb.db.selectMostRecentQepResultRowsFor(1L) results.to[Set] should equal(Set(qepResultRowFromExampleCom,qepResultRowFromGeneralHospital)) } @Before def beforeEach() = { QepQueryDb.db.createTables() } @After def afterEach() = { QepQueryDb.db.dropTables() } }