diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala index 2ef3dbe15..a7c6d0777 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala @@ -1,88 +1,88 @@ package net.shrine.adapter import java.sql.SQLException import net.shrine.log.Loggable import net.shrine.problem._ import net.shrine.protocol._ /** * @author Bill Simons * @since 4/8/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ abstract class Adapter extends Loggable { final def perform(message: BroadcastMessage): BaseShrineResponse = { def problemToErrorResponse(problem:Problem):ErrorResponse = { LoggingProblemHandler.handleProblem(problem) ErrorResponse(problem) } val shrineResponse = try { processRequest(message) } catch { case e: AdapterLockoutException => problemToErrorResponse(AdapterLockout(message.request.authn,e)) case e @ CrcInvocationException(invokedCrcUrl, request, cause) => problemToErrorResponse(CrcCouldNotBeInvoked(invokedCrcUrl,request,e)) case e: AdapterMappingException => problemToErrorResponse(AdapterMappingProblem(e)) case e: SQLException => problemToErrorResponse(AdapterDatabaseProblem(e)) //noinspection RedundantBlock case e: Exception => { val summary = if(message == null) "Unknown problem in Adapter.perform with null BroadcastMessage" else s"Unexpected exception in Adapter" problemToErrorResponse(ProblemNotYetEncoded(summary,e)) } } shrineResponse } protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse //NOOP, may be overridden by subclasses def shutdown(): Unit = () } case class AdapterLockout(authn:AuthenticationInfo,x:AdapterLockoutException) extends AbstractProblem(ProblemSources.Adapter) { - override val throwable = Some(x) - override val summary: String = s"User '${authn.domain}:${authn.username}' locked out." - override val description:String = s"User '${authn.domain}:${authn.username}' has run too many queries that produce the same result at ${x.url} ." + override lazy val throwable = Some(x) + override lazy val summary: String = s"User '${authn.domain}:${authn.username}' locked out." + override lazy val description:String = s"User '${authn.domain}:${authn.username}' has run too many queries that produce the same result at ${x.url} ." } case class CrcCouldNotBeInvoked(crcUrl:String,request:ShrineRequest,x:CrcInvocationException) extends AbstractProblem(ProblemSources.Adapter) { - override val throwable = Some(x) - override val summary: String = s"Error communicating with I2B2 CRC." - override val description: String = s"Error invoking the CRC at '$crcUrl' with a ${request.getClass.getSimpleName} due to ${throwable.get}." - override val detailsXml =

+ override lazy val throwable = Some(x) + override lazy val summary: String = s"Error communicating with I2B2 CRC." + override lazy val description: String = s"Error invoking the CRC at '$crcUrl' with a ${request.getClass.getSimpleName} due to ${throwable.get}." + override lazy val detailsXml =
Request is {request} {throwableDetail.getOrElse("")}
} case class AdapterMappingProblem(x:AdapterMappingException) extends AbstractProblem(ProblemSources.Adapter) { - override val throwable = Some(x) - override val summary: String = "Could not map query term(s)." - override val description = s"The Shrine Adapter on ${stamp.host.getHostName} cannot map this query to its local terms." - override val detailsXml =
+ override lazy val throwable = Some(x) + override lazy val summary: String = "Could not map query term(s)." + override lazy val description = s"The Shrine Adapter on ${stamp.host.getHostName} cannot map this query to its local terms." + override lazy val detailsXml =
Query Defitiontion is {x.runQueryRequest.queryDefinition} RunQueryRequest is ${x.runQueryRequest.elideAuthenticationInfo} {throwableDetail.getOrElse("")}
} case class AdapterDatabaseProblem(x:SQLException) extends AbstractProblem(ProblemSources.Adapter) { - override val throwable = Some(x) - override val summary: String = "Problem using the Adapter database." - override val description = "The Shrine Adapter encountered a problem using a database." + override lazy val throwable = Some(x) + override lazy val summary: String = "Problem using the Adapter database." + override lazy val description = "The Shrine Adapter encountered a problem using a database." } diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala index df6ef54f2..e23c911b3 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala @@ -1,108 +1,108 @@ package net.shrine.adapter import org.xml.sax.SAXParseException import scala.xml.NodeSeq import scala.xml.XML import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, BroadcastMessage, Credential, ErrorResponse, HiveCredentials, ShrineRequest, ShrineResponse, TranslatableRequest} import net.shrine.util.XmlDateHelper import net.shrine.client.Poster import net.shrine.problem.{AbstractProblem, ProblemSources} import scala.util.Try import scala.util.control.NonFatal /** * @author Bill Simons * @since 4/11/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ abstract class CrcAdapter[T <: ShrineRequest, V <: ShrineResponse]( poster: Poster, override protected val hiveCredentials: HiveCredentials) extends WithHiveCredentialsAdapter(hiveCredentials) { protected def parseShrineResponse(nodeSeq: NodeSeq): ShrineResponse private[adapter] def parseShrineErrorResponseWithFallback(xmlResponseFromCrc: String): ShrineResponse = { //NB: See https://open.med.harvard.edu/jira/browse/SHRINE-534 //NB: https://open.med.harvard.edu/jira/browse/SHRINE-745 val shrineResponseAttempt = for { crcXml <- Try(XML.loadString(xmlResponseFromCrc)) shrineResponse <- Try(parseShrineResponse(crcXml)).recover { case NonFatal(e) => info(s"Exception while parsing $crcXml",e) ErrorResponse.fromI2b2(crcXml) } //todo pass the exception to build a proper error response, and log the exception } yield shrineResponse shrineResponseAttempt.recover { case saxx:SAXParseException => ErrorResponse(CannotParseXmlFromCrc(saxx,xmlResponseFromCrc)) case NonFatal(e) => error(s"Error parsing response from CRC: ", e) ErrorResponse(ExceptionWhileLoadingCrcResponse(e,xmlResponseFromCrc)) }.get } //NB: default is a noop; only RunQueryAdapter needs this for now protected[adapter] def translateNetworkToLocal(request: T): T = request protected[adapter] override def processRequest(message: BroadcastMessage): BaseShrineResponse = { val i2b2Response = callCrc(translateRequest(message.request)) parseShrineErrorResponseWithFallback(i2b2Response) } protected def callCrc(request: ShrineRequest): String = { debug(s"Sending Shrine-formatted request to the CRC at '${poster.url}': $request") val crcRequest = request.toI2b2String val crcResponse = XmlDateHelper.time(s"Calling the CRC at '${poster.url}'")(debug(_)) { //Wrap exceptions in a more descriptive form, to enable sending better error messages back to the legacy web client try { poster.post(crcRequest) } catch { case NonFatal(e) => throw CrcInvocationException(poster.url, request, e) } } crcResponse.body } private[adapter] def translateRequest(request: BaseShrineRequest): ShrineRequest = request match { case transReq: TranslatableRequest[T] => //noinspection RedundantBlock { val HiveCredentials(domain, username, password, project) = hiveCredentials val authInfo = AuthenticationInfo(domain, username, Credential(password, isToken = false)) translateNetworkToLocal(transReq.withAuthn(authInfo).withProject(project).asRequest) } case req: ShrineRequest => req case _ => throw new IllegalArgumentException(s"Unexpected request: $request") } } case class CannotParseXmlFromCrc(saxx:SAXParseException,xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) { - override val throwable = Some(saxx) - override val summary: String = "Could not parse response from CRC." - override val description:String = s"${saxx.getMessage} while parsing the response from the CRC." - override val detailsXml =

+ override lazy val throwable = Some(saxx) + override lazy val summary: String = "Could not parse response from CRC." + override lazy val description:String = s"${saxx.getMessage} while parsing the response from the CRC." + override lazy val detailsXml =
{throwableDetail.getOrElse("")} Response is {xmlResponseFromCrc}
} case class ExceptionWhileLoadingCrcResponse(t:Throwable,xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) { - override val throwable = Some(t) - override val summary: String = "Unanticipated exception with response from CRC." - override val description:String = s"${t.getMessage} while parsing the response from the CRC." - override val detailsXml =
+ override lazy val throwable = Some(t) + override lazy val summary: String = "Unanticipated exception with response from CRC." + override lazy val description:String = s"${t.getMessage} while parsing the response from the CRC." + override lazy val detailsXml =
{throwableDetail.getOrElse("")} Response is {xmlResponseFromCrc}
} \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala index 9e3069947..6f947393f 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala @@ -1,39 +1,39 @@ package net.shrine.adapter.components import net.shrine.adapter.dao.AdapterDao import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.protocol.ShrineResponse import net.shrine.protocol.ReadQueryDefinitionRequest import net.shrine.protocol.ReadQueryDefinitionResponse import net.shrine.protocol.ErrorResponse import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.AbstractReadQueryDefinitionRequest /** * @author clint * @since Apr 4, 2013 * * NB: Tested by ReadQueryDefinitionAdapterTest */ final case class QueryDefinitions[Req <: AbstractReadQueryDefinitionRequest](dao: AdapterDao) { def get(request: Req): ShrineResponse = { val resultOption = for { shrineQuery <- dao.findQueryByNetworkId(request.queryId) } yield { ReadQueryDefinitionResponse( shrineQuery.networkId, shrineQuery.name, shrineQuery.username, shrineQuery.dateCreated, //TODO: I2b2 or Shrine format? shrineQuery.queryDefinition.toI2b2String) } resultOption.getOrElse(ErrorResponse(QueryNotInDatabase(request))) } } case class QueryNotInDatabase(request:AbstractReadQueryDefinitionRequest) extends AbstractProblem(ProblemSources.Hub) { - override val summary: String = s"Couldn't find query definition." - override val description:String = s"The query definition with network id: ${request.queryId} does not exist at this site." + override lazy val summary: String = s"Couldn't find query definition." + override lazy val description:String = s"The query definition with network id: ${request.queryId} does not exist at this site." } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala index 64777b48c..a40d46778 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala @@ -1,464 +1,464 @@ package net.shrine.adapter.dao.squeryl import javax.xml.datatype.XMLGregorianCalendar import net.shrine.adapter.dao.AdapterDao import net.shrine.adapter.dao.model.{ObfuscatedPair, ShrineQuery, ShrineQueryResult} import net.shrine.adapter.dao.model.squeryl.{SquerylBreakdownResultRow, SquerylCountRow, SquerylPrivilegedUser, SquerylQueryResultRow, SquerylShrineError, SquerylShrineQuery} import net.shrine.adapter.dao.squeryl.tables.Tables import net.shrine.dao.DateHelpers import net.shrine.dao.squeryl.{SquerylEntryPoint, SquerylInitializer} import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.protocol.{AuthenticationInfo, I2b2ResultEnvelope, QueryResult, ResultOutputType} import net.shrine.protocol.query.QueryDefinition import net.shrine.util.XmlDateHelper import org.squeryl.Query import org.squeryl.dsl.GroupWithMeasures import scala.util.Try import scala.xml.NodeSeq /** * @author clint * @since May 22, 2013 */ final class SquerylAdapterDao(initializer: SquerylInitializer, tables: Tables)(implicit breakdownTypes: Set[ResultOutputType]) extends AdapterDao with Loggable { initializer.init() override def inTransaction[T](f: => T): T = SquerylEntryPoint.inTransaction { f } import SquerylEntryPoint._ override def flagQuery(networkQueryId: Long, flagMessage: Option[String]): Unit = mutateFlagField(networkQueryId, newIsFlagged = true, flagMessage) override def unFlagQuery(networkQueryId: Long): Unit = mutateFlagField(networkQueryId, newIsFlagged = false, None) private def mutateFlagField(networkQueryId: Long, newIsFlagged: Boolean, newFlagMessage: Option[String]): Unit = { inTransaction { update(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId). set(queryRow.isFlagged := newIsFlagged, queryRow.flagMessage := newFlagMessage) } } } override def storeResults( authn: AuthenticationInfo, masterId: String, networkQueryId: Long, queryDefinition: QueryDefinition, rawQueryResults: Seq[QueryResult], obfuscatedQueryResults: Seq[QueryResult], failedBreakdownTypes: Seq[ResultOutputType], mergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Unit = { inTransaction { val insertedQueryId = insertQuery(masterId, networkQueryId, authn, queryDefinition, isFlagged = false, hasBeenRun = true, flagMessage = None) val insertedQueryResultIds = insertQueryResults(insertedQueryId, rawQueryResults) storeCountResults(rawQueryResults, obfuscatedQueryResults, insertedQueryResultIds) storeErrorResults(rawQueryResults, insertedQueryResultIds) storeBreakdownFailures(failedBreakdownTypes.toSet, insertedQueryResultIds) insertBreakdownResults(insertedQueryResultIds, mergedBreakdowns, obfuscatedBreakdowns) } } private[adapter] def storeCountResults(raw: Seq[QueryResult], obfuscated: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val notErrors = raw.filter(!_.isError) val obfuscatedNotErrors = obfuscated.filter(!_.isError) if(notErrors.size > 1) { warn(s"Got ${notErrors.size} raw (hopefully-)count results; more than 1 is unusual.") } if(obfuscatedNotErrors.size > 1) { warn(s"Got ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; more than 1 is unusual.") } if(notErrors.size != obfuscatedNotErrors.size) { warn(s"Got ${notErrors.size} raw and ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; that these numbers are different is unusual.") } import ResultOutputType.PATIENT_COUNT_XML def isCount(qr: QueryResult): Boolean = qr.resultType.contains(PATIENT_COUNT_XML) inTransaction { //NB: Take the count/setSize from the FIRST PATIENT_COUNT_XML QueryResult, //though the same count should be there for all of them, if there are more than one for { Seq(insertedCountQueryResultId) <- insertedIds.get(PATIENT_COUNT_XML) notError <- notErrors.find(isCount) //NB: Find a count result, just to be sure obfuscatedNotError <- obfuscatedNotErrors.find(isCount) //NB: Find a count result, just to be sure } { insertCountResult(insertedCountQueryResultId, notError.setSize, obfuscatedNotError.setSize) } } } private[adapter] def storeErrorResults(results: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val errors = results.filter(_.isError) val insertedErrorResultIds = insertedIds.getOrElse(ResultOutputType.ERROR,Nil) val insertedIdsToErrors = insertedErrorResultIds zip errors inTransaction { for { (insertedErrorResultId, errorQueryResult) <- insertedIdsToErrors } { val pd = errorQueryResult.problemDigest.get //it's an error so it will have a problem digest insertErrorResult( insertedErrorResultId, errorQueryResult.statusMessage.getOrElse("Unknown failure"), pd.codec, pd.stampText, pd.summary, pd.description, pd.detailsXml ) } } } private[adapter] def storeBreakdownFailures(failedBreakdownTypes: Set[ResultOutputType], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = { val insertedIdsForFailedBreakdownTypes = insertedIds.filterKeys(failedBreakdownTypes.contains) inTransaction { for { (failedBreakdownType, Seq(resultId)) <- insertedIdsForFailedBreakdownTypes } { //todo propagate backwards to the breakdown failure to create the corect problem object BreakdownFailure extends AbstractProblem(ProblemSources.Adapter) { - override val summary: String = "Couldn't retrieve result breakdown" - override val description:String = s"Couldn't retrieve result breakdown of type '$failedBreakdownType'" + override lazy val summary: String = "Couldn't retrieve result breakdown" + override lazy val description:String = s"Couldn't retrieve result breakdown of type '$failedBreakdownType'" } val pd = BreakdownFailure.toDigest insertErrorResult( resultId, s"Couldn't retrieve breakdown of type '$failedBreakdownType'", pd.codec, pd.stampText, pd.summary, pd.description, pd.detailsXml ) } } } override def findRecentQueries(howMany: Int): Seq[ShrineQuery] = { inTransaction { Queries.queriesForAllUsers.take(howMany).map(_.toShrineQuery).toSeq } } def findAllCounts():Seq[SquerylCountRow] = { inTransaction{ Queries.allCountResults.toSeq } } override def renameQuery(networkQueryId: Long, newName: String) { inTransaction { update(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId). set(queryRow.name := newName) } } } override def deleteQuery(networkQueryId: Long): Unit = { inTransaction { tables.shrineQueries.deleteWhere(_.networkId === networkQueryId) } } override def deleteQueryResultsFor(networkQueryId: Long): Unit = { inTransaction { val resultIdsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) => where(queryRow.networkId === networkQueryId). select(resultRow.id). on(queryRow.id === resultRow.queryId) }.toSet tables.queryResults.deleteWhere(_.id in resultIdsForNetworkQueryId) } } override def isUserLockedOut(authn: AuthenticationInfo, defaultThreshold: Int): Boolean = Try { inTransaction { val privilegedUserOption = Queries.privilegedUsers(authn.domain, authn.username).singleOption val threshold:Int = privilegedUserOption.flatMap(_.threshold).getOrElse(defaultThreshold.intValue) val thirtyDaysInThePast: XMLGregorianCalendar = DateHelpers.daysFromNow(-30) val overrideDate: XMLGregorianCalendar = privilegedUserOption.map(_.toPrivilegedUser).flatMap(_.overrideDate).getOrElse(thirtyDaysInThePast) //sorted instead of just finding max val counts: Seq[Long] = Queries.repeatedResults(authn.domain, authn.username, overrideDate).toSeq.sorted //and then grabbing the last, highest value in the sorted sequence val repeatedResultCount: Long = counts.lastOption.getOrElse(0L) val result = repeatedResultCount > threshold debug(s"User ${authn.domain}:${authn.username} locked out? $result") result } }.getOrElse(false) override def insertQuery(localMasterId: String, networkId: Long, authn: AuthenticationInfo, queryDefinition: QueryDefinition, isFlagged: Boolean, hasBeenRun: Boolean, flagMessage: Option[String]): Int = { inTransaction { val inserted = tables.shrineQueries.insert(new SquerylShrineQuery( 0, localMasterId, networkId, authn.username, authn.domain, XmlDateHelper.now, isFlagged, flagMessage, hasBeenRun, queryDefinition)) inserted.id } } /** * Insert rows into QueryResults, one for each QueryResult in the passed RunQueryResponse * Inserted rows are 'children' of the passed ShrineQuery (ie, they are the results of the query) */ override def insertQueryResults(parentQueryId: Int, results: Seq[QueryResult]): Map[ResultOutputType, Seq[Int]] = { def execTime(result: QueryResult): Option[Long] = { //TODO: How are locales handled here? Do we care? def toMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis for { start <- result.startDate end <- result.endDate } yield toMillis(end) - toMillis(start) } val typeToIdTuples = inTransaction { for { result <- results resultType = result.resultType.getOrElse(ResultOutputType.ERROR) //TODO: under what circumstances can QueryResults NOT have start and end dates set? elapsed = execTime(result) } yield { val lastInsertedQueryResultRow = tables.queryResults.insert(new SquerylQueryResultRow(0, result.resultId, parentQueryId, resultType, result.statusType, elapsed, XmlDateHelper.now)) (resultType, lastInsertedQueryResultRow.id) } } typeToIdTuples.groupBy { case (resultType, _) => resultType }.mapValues(_.map { case (_, count) => count }) } override def insertCountResult(resultId: Int, originalCount: Long, obfuscatedCount: Long) { //NB: Squeryl steers us toward inserting with dummy ids :( inTransaction { tables.countResults.insert(new SquerylCountRow(0, resultId, originalCount, obfuscatedCount, XmlDateHelper.now)) } } override def insertBreakdownResults(parentResultIds: Map[ResultOutputType, Seq[Int]], originalBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) { def merge(original: I2b2ResultEnvelope, obfuscated: I2b2ResultEnvelope): Map[String, ObfuscatedPair] = { Map.empty ++ (for { (key, originalValue) <- original.data obfuscatedValue <- obfuscated.data.get(key) } yield (key, ObfuscatedPair(originalValue, obfuscatedValue))) } inTransaction { for { (resultType, Seq(resultId)) <- parentResultIds if resultType.isBreakdown originalBreakdown <- originalBreakdowns.get(resultType) obfuscatedBreakdown <- obfuscatedBreakdowns.get(resultType) (key, ObfuscatedPair(original, obfuscated)) <- merge(originalBreakdown, obfuscatedBreakdown) } { tables.breakdownResults.insert(SquerylBreakdownResultRow(0, resultId, key, original, obfuscated)) } } } override def insertErrorResult(parentResultId: Int, errorMessage: String, codec:String, stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq) { //NB: Squeryl steers us toward inserting with dummy ids :( inTransaction { tables.errorResults.insert(SquerylShrineError(0, parentResultId, errorMessage, codec, stampText, summary, digestDescription, detailsXml.toString())) } } override def findQueryByNetworkId(networkQueryId: Long): Option[ShrineQuery] = { inTransaction { Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery) } } override def findQueriesByUserAndDomain(domain: String, username: String, howMany: Int): Seq[ShrineQuery] = { inTransaction { Queries.queriesForUser(username, domain).take(howMany).toSeq.map(_.toShrineQuery) } } override def findQueriesByDomain(domain: String): Seq[ShrineQuery] = { inTransaction { Queries.queriesForDomain(domain).toList.map(_.toShrineQuery) } } override def findResultsFor(networkQueryId: Long): Option[ShrineQueryResult] = { inTransaction { val breakdownRowsByType = Queries.breakdownResults(networkQueryId).toSeq.groupBy { case (outputType, _) => outputType.toQueryResultRow.resultType }.mapValues(_.map { case (_, row) => row.toBreakdownResultRow }) val queryRowOption = Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery) val countRowOption = Queries.countResults(networkQueryId).headOption.map(_.toCountRow) val queryResultRows = Queries.resultsForQuery(networkQueryId).toSeq.map(_.toQueryResultRow) val errorResultRows = Queries.errorResults(networkQueryId).toSeq.map(_.toShrineError) for { queryRow <- queryRowOption countRow <- countRowOption shrineQueryResult <- ShrineQueryResult.fromRows(queryRow, queryResultRows, countRow, breakdownRowsByType, errorResultRows) } yield { shrineQueryResult } } } /** * @author clint * @since Nov 19, 2012 */ object Queries { def privilegedUsers(domain: String, username: String): Query[SquerylPrivilegedUser] = { from(tables.privilegedUsers) { user => where(user.username === username and user.domain === domain).select(user) } } def repeatedResults(domain: String, username: String, overrideDate: XMLGregorianCalendar): Query[Long] = { val counts: Query[GroupWithMeasures[Long, Long]] = join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) => where(queryRow.username === username and queryRow.domain === domain and (countRow.originalValue <> 0L) and queryRow.dateCreated > DateHelpers.toTimestamp(overrideDate)). groupBy(countRow.originalValue). compute(count(countRow.originalValue)). on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId) } //Filter for result counts > 0 from(counts) { cnt => where(cnt.measures gt 0).select(cnt.measures) } } val queriesForAllUsers: Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => select(queryRow).orderBy(queryRow.dateCreated.desc) } } //TODO: Find a way to parameterize on limit, to avoid building the query every time //TODO: limit def queriesForUser(username: String, domain: String): Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => where(queryRow.domain === domain and queryRow.username === username). select(queryRow). orderBy(queryRow.dateCreated.desc) } } def queriesForDomain(domain: String): Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => where(queryRow.domain === domain). select(queryRow). orderBy(queryRow.dateCreated.desc) } } val allCountResults: Query[SquerylCountRow] = { from(tables.countResults) { queryRow => select(queryRow) } } def queriesByNetworkId(networkQueryId: Long): Query[SquerylShrineQuery] = { from(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId).select(queryRow) } } //TODO: Find out how to compose queries, to re-use queriesByNetworkId def queryNamesByNetworkId(networkQueryId: Long): Query[String] = { from(tables.shrineQueries) { queryRow => where(queryRow.networkId === networkQueryId).select(queryRow.name) } } def resultsForQuery(networkQueryId: Long): Query[SquerylQueryResultRow] = { val resultsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) => where(queryRow.networkId === networkQueryId). select(resultRow). on(queryRow.id === resultRow.queryId) } from(resultsForNetworkQueryId)(select(_)) } def countResults(networkQueryId: Long): Query[SquerylCountRow] = { join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) => where(queryRow.networkId === networkQueryId). select(countRow). on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId) } } def errorResults(networkQueryId: Long): Query[SquerylShrineError] = { join(tables.shrineQueries, tables.queryResults, tables.errorResults) { (queryRow, resultRow, errorRow) => where(queryRow.networkId === networkQueryId). select(errorRow). on(queryRow.id === resultRow.queryId, resultRow.id === errorRow.resultId) } } //NB: using groupBy here is too much of a pain; do it 'manually' later def breakdownResults(networkQueryId: Long): Query[(SquerylQueryResultRow, SquerylBreakdownResultRow)] = { join(tables.shrineQueries, tables.queryResults, tables.breakdownResults) { (queryRow, resultRow, breakdownRow) => where(queryRow.networkId === networkQueryId). select((resultRow, breakdownRow)). on(queryRow.id === resultRow.queryId, resultRow.id === breakdownRow.resultId) } } } } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala index 4156d09c9..e785eb11d 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala @@ -1,103 +1,103 @@ package net.shrine.adapter.service import net.shrine.log.Loggable import net.shrine.protocol.{BaseShrineResponse, BroadcastMessage, ErrorResponse, NodeId, RequestType, Result, Signature} import net.shrine.adapter.AdapterMap import net.shrine.crypto.Verifier import net.shrine.problem.{AbstractProblem, ProblemSources} import scala.concurrent.duration.Duration import scala.concurrent.duration._ /** * Heart of the adapter. * * @author clint * @since Nov 14, 2013 */ final class AdapterService( nodeId: NodeId, signatureVerifier: Verifier, maxSignatureAge: Duration, adapterMap: AdapterMap) extends AdapterRequestHandler with Loggable { import AdapterService._ logStartup(adapterMap) override def handleRequest(message: BroadcastMessage): Result = { handleInvalidSignature(message).orElse { for { adapter <- adapterMap.adapterFor(message.request.requestType) } yield time(nodeId) { adapter.perform(message) } }.getOrElse { Result(nodeId, 0.milliseconds, ErrorResponse(UnknownRequestType(message.request.requestType))) } } /** * @return None if the signature is fine, Some(result with an ErrorResponse) if not */ private def handleInvalidSignature(message: BroadcastMessage): Option[Result] = { val (sigIsValid, elapsed) = time(signatureVerifier.verifySig(message, maxSignatureAge)) if(sigIsValid) { None } else { info(s"Incoming message had invalid signature: $message") Some(Result(nodeId, elapsed.milliseconds, ErrorResponse(CouldNotVerifySignature(message)))) } } } object AdapterService extends Loggable { private def logStartup(adapterMap: AdapterMap) { info("Adapter service initialized, will respond to the following queries: ") val sortedByReqType = adapterMap.requestsToAdapters.toSeq.sortBy { case (k, _) => k } sortedByReqType.foreach { case (requestType, adapter) => info(s" $requestType:\t(${adapter.getClass.getSimpleName})") } } private[service] def time[T](f: => T): (T, Long) = { val start = System.currentTimeMillis val result = f val elapsed = System.currentTimeMillis - start (result, elapsed) } private[service] def time(nodeId: NodeId)(f: => BaseShrineResponse): Result = { val (response, elapsed) = time(f) Result(nodeId, elapsed.milliseconds, response) } } case class CouldNotVerifySignature(message: BroadcastMessage) extends AbstractProblem(ProblemSources.Adapter){ val signature: Option[Signature] = message.signature - override val summary: String = signature.fold("A message was not signed")(sig => s"The trust relationship with ${sig.signedBy} is not properly configured.") - override val description: String = signature.fold(s"The Adapter at ${stamp.host.getHostName} could not properly validate a request because it had no signature.")(sig => s"The Adapter at ${stamp.host.getHostName} could not properly validate the request from ${sig.signedBy}. An incoming message from the hub had an invalid signature.") - override val detailsXml = signature.fold( + override lazy val summary: String = signature.fold("A message was not signed")(sig => s"The trust relationship with ${sig.signedBy} is not properly configured.") + override lazy val description: String = signature.fold(s"The Adapter at ${stamp.host.getHostName} could not properly validate a request because it had no signature.")(sig => s"The Adapter at ${stamp.host.getHostName} could not properly validate the request from ${sig.signedBy}. An incoming message from the hub had an invalid signature.") + override lazy val detailsXml = signature.fold(
)( sig =>
Signature is {sig}
) } case class UnknownRequestType(requestType: RequestType) extends AbstractProblem(ProblemSources.Adapter){ - override val summary: String = s"Unknown request type $requestType" - override val description: String = s"The Adapter at ${stamp.host.getHostName} received a request of type $requestType that it cannot process." + override lazy val summary: String = s"Unknown request type $requestType" + override lazy val description: String = s"The Adapter at ${stamp.host.getHostName} received a request of type $requestType that it cannot process." } \ No newline at end of file diff --git a/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala b/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala index d981a9125..3b8952fa9 100644 --- a/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala +++ b/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala @@ -1,36 +1,36 @@ package net.shrine.authentication import net.shrine.authentication.AuthenticationResult.NotAuthenticated import net.shrine.problem.{AbstractProblem, ProblemSources} import scala.xml.NodeSeq /** * @author clint * @since Dec 13, 2013 */ final case class NotAuthenticatedException(domain: String, username: String,message: String, cause: Throwable) extends RuntimeException(message, cause) { def problem = NotAuthenticatedProblem(this) } object NotAuthenticatedException { def apply(na:NotAuthenticated):NotAuthenticatedException = NotAuthenticatedException(na.domain,na.username,na.message,na.cause.getOrElse(null)) } case class NotAuthenticatedProblem(nax:NotAuthenticatedException) extends AbstractProblem(ProblemSources.Qep){ - override val summary = s"Can not authenticate ${nax.domain}:${nax.username}." + override lazy val summary = s"Can not authenticate ${nax.domain}:${nax.username}." - override val throwable = Some(nax) + override lazy val throwable = Some(nax) - override val description = s"Can not authenticate ${nax.domain}:${nax.username}. ${nax.getLocalizedMessage}" + override lazy val description = s"Can not authenticate ${nax.domain}:${nax.username}. ${nax.getLocalizedMessage}" - override val detailsXml: NodeSeq = NodeSeq.fromSeq( + override lazy val detailsXml: NodeSeq = NodeSeq.fromSeq(
{throwableDetail.getOrElse("")}
) } \ No newline at end of file diff --git a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala index f1df2acca..0ca15c035 100644 --- a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala +++ b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala @@ -1,112 +1,112 @@ package net.shrine.authorization import net.shrine.log.Loggable import scala.util.{Failure, Success, Try} import net.shrine.client.HttpResponse import net.shrine.i2b2.protocol.pm.GetUserConfigurationRequest import net.shrine.i2b2.protocol.pm.User import net.shrine.problem._ import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.ErrorResponse import scala.util.control.NonFatal /** * @author clint * @since Apr 5, 2013 */ trait PmAuthorizerComponent { self: PmHttpClientComponent with Loggable => import PmAuthorizerComponent._ //noinspection RedundantBlock object Pm { def parsePmResult(authn: AuthenticationInfo)(httpResponse: HttpResponse): Try[Either[ErrorResponse, User]] = { User.fromI2b2(httpResponse.body).map(Right(_)).recoverWith { case NonFatal(e) => { debug(s"Couldn't extract a User from '$httpResponse'") Try(Left(ErrorResponse.fromI2b2(httpResponse.body))) } }.recover { case NonFatal(e) => { val problem = CouldNotInterpretResponseFromPmCell(pmPoster.url,authn,httpResponse,e) LoggingProblemHandler.handleProblem(problem) Left(ErrorResponse(problem)) } } } def authorize(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo): AuthorizationStatus = { val request = GetUserConfigurationRequest(authn) val responseAttempt: Try[HttpResponse] = Try { debug(s"Authorizing with PM cell at ${pmPoster.url}") pmPoster.post(request.toI2b2String) } val authStatusAttempt: Try[AuthorizationStatus with Product with Serializable] = responseAttempt.flatMap(parsePmResult(authn)).map { case Right(user) => { val managerUserOption = for { roles <- user.rolesByProject.get(projectId) if neededRoles.forall(roles.contains) } yield user managerUserOption.map(Authorized).getOrElse { NotAuthorized(MissingRequiredRoles(projectId,neededRoles,authn)) } } case Left(errorResponse) => { //todo remove when ErrorResponse gets its message info(s"ErrorResponse message '${errorResponse.errorMessage}' may not have carried through to the NotAuthorized object") NotAuthorized(errorResponse.problemDigest) } } authStatusAttempt match { case Success(s) => s case Failure(x) => NotAuthorized(CouldNotReachPmCell(pmPoster.url,authn,x)) } } } } object PmAuthorizerComponent { sealed trait AuthorizationStatus case class Authorized(user: User) extends AuthorizationStatus case class NotAuthorized(problemDigest: ProblemDigest) extends AuthorizationStatus { def toErrorResponse = ErrorResponse(problemDigest.summary,problemDigest) } object NotAuthorized { def apply(problem:Problem):NotAuthorized = NotAuthorized(problem.toDigest) } } case class MissingRequiredRoles(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep) { - override val summary: String = s"User ${authn.domain}:${authn.username} is missing roles in project '$projectId'" + override lazy val summary: String = s"User ${authn.domain}:${authn.username} is missing roles in project '$projectId'" - override val description:String = s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'" + override lazy val description:String = s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'" } case class CouldNotReachPmCell(pmUrl:String,authn: AuthenticationInfo,x:Throwable) extends AbstractProblem(ProblemSources.Qep) { - override val throwable = Some(x) - override val summary: String = s"Could not reach PM cell." - override val description:String = s"Shrine encountered ${throwable.get} while attempting to reach the PM cell at $pmUrl for ${authn.domain}:${authn.username}." + override lazy val throwable = Some(x) + override lazy val summary: String = s"Could not reach PM cell." + override lazy val description:String = s"Shrine encountered ${throwable.get} while attempting to reach the PM cell at $pmUrl for ${authn.domain}:${authn.username}." } case class CouldNotInterpretResponseFromPmCell(pmUrl:String,authn: AuthenticationInfo,httpResponse: HttpResponse,x:Throwable) extends AbstractProblem(ProblemSources.Qep) { - override val throwable = Some(x) + override lazy val throwable = Some(x) override def summary: String = s"Could not interpret response from PM cell." override def description: String = s"Shrine could not interpret the response from the PM cell at ${pmUrl} for ${authn.domain}:${authn.username}: due to ${throwable.get}" - override val detailsXml =
+ override lazy val detailsXml =
Response is {httpResponse} {throwableDetail.getOrElse("")}
} \ No newline at end of file diff --git a/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala b/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala index 7b27030aa..7e5e7dec1 100644 --- a/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala +++ b/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala @@ -1,236 +1,236 @@ package net.shrine.authorization import java.net.URL import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager} import java.security.cert.X509Certificate import akka.io.IO import com.typesafe.config.{Config, ConfigFactory} import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized} import net.shrine.authorization.steward.{InboundShrineQuery, ResearchersTopics, TopicIdAndName} import net.shrine.log.Loggable import net.shrine.protocol.{ApprovedTopic, AuthenticationInfo, ErrorResponse, ReadApprovedQueryTopicsRequest, ReadApprovedQueryTopicsResponse, RunQueryRequest} import net.shrine.config.ConfigExtensions import org.json4s.native.JsonMethods.parse import org.json4s.{DefaultFormats, Formats} import akka.actor.ActorSystem import akka.util.Timeout import akka.pattern.ask import net.shrine.problem.{AbstractProblem, ProblemSources} import spray.can.Http import spray.can.Http.{HostConnectorInfo, HostConnectorSetup} import spray.http.{BasicHttpCredentials, HttpRequest, HttpResponse} import spray.http.StatusCodes.{OK, Unauthorized, UnavailableForLegalReasons} import spray.httpx.TransformerPipelineSupport.WithTransformation import spray.httpx.Json4sSupport import spray.client.pipelining.{Get, Post, addCredentials, sendReceive} import spray.io.{ClientSSLEngineProvider, PipelineContext, SSLContextProvider} import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration} import scala.concurrent.{Await, Future} import scala.language.postfixOps /** * A QueryAuthorizationService that talks to the standard data steward application to learn about topics (intents) and check that a * shrine query can be run * * @author david * @since 4/2/15 */ final case class StewardQueryAuthorizationService(qepUserName:String, qepPassword:String, stewardBaseUrl:URL, defaultTimeout:FiniteDuration = 10 seconds) extends QueryAuthorizationService with Loggable with Json4sSupport { import system.dispatcher // execution context for futures implicit val system = ActorSystem("AuthorizationServiceActors",ConfigFactory.load("shrine")) //todo use shrine's config implicit val timeout:Timeout = Timeout.durationToTimeout(defaultTimeout)//10 seconds implicit def json4sFormats: Formats = DefaultFormats val qepCredentials = BasicHttpCredentials(qepUserName,qepPassword) def sendHttpRequest(httpRequest: HttpRequest):Future[HttpResponse] = { // Place a special SSLContext in scope here to be used by HttpClient. // It trusts all server certificates. // Most important - it will encrypt all of the traffic on the wire. implicit def trustfulSslContext: SSLContext = { object BlindFaithX509TrustManager extends X509TrustManager { def checkClientTrusted(chain: Array[X509Certificate], authType: String) = (info(s"Client asked BlindFaithX509TrustManager to check $chain for $authType")) def checkServerTrusted(chain: Array[X509Certificate], authType: String) = (info(s"Server asked BlindFaithX509TrustManager to check $chain for $authType")) def getAcceptedIssuers = Array[X509Certificate]() } val context = SSLContext.getInstance("TLS") context.init(Array[KeyManager](), Array(BlindFaithX509TrustManager), null) context } implicit def trustfulSslContextProvider: SSLContextProvider = { SSLContextProvider.forContext(trustfulSslContext) } class CustomClientSSLEngineProvider extends ClientSSLEngineProvider { def apply(pc: PipelineContext) = ClientSSLEngineProvider.default(trustfulSslContextProvider).apply(pc) } implicit def sslEngineProvider: ClientSSLEngineProvider = new CustomClientSSLEngineProvider val requestWithCredentials = httpRequest ~> addCredentials(qepCredentials) val responseFuture: Future[HttpResponse] = for { HostConnectorInfo(hostConnector, _) <- { val hostConnectorSetup = new HostConnectorSetup(httpRequest.uri.authority.host.address, httpRequest.uri.authority.port, sslEncryption = httpRequest.uri.scheme=="https")( sslEngineProvider = sslEngineProvider) IO(Http) ask hostConnectorSetup } response <- sendReceive(hostConnector).apply(requestWithCredentials) _ <- hostConnector ask Http.CloseAll } yield response responseFuture } /* todo to recycle connections with http://spray.io/documentation/1.2.3/spray-client/ if needed def sendHttpRequest(httpRequest: HttpRequest):Future[HttpResponse] = { import akka.io.IO import akka.pattern.ask import spray.can.Http val requestWithCredentials = httpRequest ~> addCredentials(qepCredentials) //todo failures via onFailure callbacks for{ sendR:SendReceive <- connectorSource response:HttpResponse <- sendR(requestWithCredentials) } yield response } val connectorSource: Future[SendReceive] = //Future[HttpRequest => Future[HttpResponse]] for ( //keep asking for a connector until you get one //todo correct URL // Http.HostConnectorInfo(connector, _) <- IO(Http) ? Http.HostConnectorSetup("www.spray.io", port = 8080) Http.HostConnectorInfo(connector, _) <- IO(Http) ? Http.HostConnectorSetup("localhost", port = 6060) ) yield sendReceive(connector) */ def sendAndReceive(httpRequest: HttpRequest,timeout:Duration = defaultTimeout):HttpResponse = { info("StewardQueryAuthorizationService will request "+httpRequest.uri) //todo someday log request and response val responseFuture = sendHttpRequest(httpRequest) val response:HttpResponse = Await.result(responseFuture,timeout) info("StewardQueryAuthorizationService received response with status "+response.status) response } //Contact a data steward and either return an Authorized or a NotAuthorized or throw an exception override def authorizeRunQueryRequest(runQueryRequest: RunQueryRequest): AuthorizationResult = { debug(s"authorizeRunQueryRequest started for ${runQueryRequest.queryDefinition.name}") val interpreted = runQueryRequest.topicId.fold( authorizeRunQueryRequestNoTopic(runQueryRequest) )( authorizeRunQueryRequestForTopic(runQueryRequest,_) ) debug(s"authorizeRunQueryRequest completed with $interpreted) for ${runQueryRequest.queryDefinition.name}") interpreted } def authorizeRunQueryRequestNoTopic(runQueryRequest: RunQueryRequest): AuthorizationResult = { val userName = runQueryRequest.authn.username val queryId = runQueryRequest.queryDefinition.name //xml's .text returns something that looks like xquery with backwards slashes. toString() returns xml. val queryForJson = InboundShrineQuery(runQueryRequest.networkQueryId,queryId,runQueryRequest.queryDefinition.toXml.toString()) val request = Post(s"$stewardBaseUrl/steward/qep/requestQueryAccess/user/$userName", queryForJson) val response:HttpResponse = sendAndReceive(request,runQueryRequest.waitTime) interpretAuthorizeRunQueryResponse(response) } def authorizeRunQueryRequestForTopic(runQueryRequest: RunQueryRequest,topicIdString:String): AuthorizationResult = { val userName = runQueryRequest.authn.username val queryId = runQueryRequest.queryDefinition.name //xml's .text returns something that looks like xquery with backwards slashes. toString() returns xml. val queryForJson = InboundShrineQuery(runQueryRequest.networkQueryId,queryId,runQueryRequest.queryDefinition.toXml.toString()) val request = Post(s"$stewardBaseUrl/steward/qep/requestQueryAccess/user/$userName/topic/$topicIdString", queryForJson) val response:HttpResponse = sendAndReceive(request,runQueryRequest.waitTime) debug(s"authorizeRunQueryRequestForTopic response is $response") interpretAuthorizeRunQueryResponse(response) } /** Interpret the response from the steward app. Primarily here for testing. */ def interpretAuthorizeRunQueryResponse(response:HttpResponse):AuthorizationResult = { response.status match { case OK => { val topicJson = new String(response.entity.data.toByteArray) debug(s"topicJson is $topicJson") val topic:Option[TopicIdAndName] = parse(topicJson).extractOpt[TopicIdAndName] debug(s"topic is $topic") Authorized(topic.map(x => (x.id,x.name))) } case UnavailableForLegalReasons => NotAuthorized(response.entity.asString) case Unauthorized => throw new AuthorizationException(s"steward rejected qep's login credentials. $response") case _ => throw new AuthorizationException(s"QueryAuthorizationService detected a problem: $response") } } //Either read the approved topics from a data steward or have an error response. override def readApprovedEntries(readTopicsRequest: ReadApprovedQueryTopicsRequest): Either[ErrorResponse, ReadApprovedQueryTopicsResponse] = { val userName = readTopicsRequest.authn.username val request = Get(s"$stewardBaseUrl/steward/qep/approvedTopics/user/$userName") val response:HttpResponse = sendAndReceive(request,readTopicsRequest.waitTime) if(response.status == OK) { val topicsJson = new String(response.entity.data.toByteArray) val topicsFromSteward: ResearchersTopics = parse(topicsJson).extract[ResearchersTopics] val topics: Seq[ApprovedTopic] = topicsFromSteward.topics.map(topic => ApprovedTopic(topic.id, topic.name)) Right(ReadApprovedQueryTopicsResponse(topics)) } else Left(ErrorResponse(ErrorStatusFromDataStewardApp(response,stewardBaseUrl))) } override def toString() = { super.toString().replaceAll(qepPassword,"REDACTED") } } object StewardQueryAuthorizationService { def apply(config:Config):StewardQueryAuthorizationService = StewardQueryAuthorizationService ( qepUserName = config.getString("qepUserName"), qepPassword = config.getString("qepPassword"), stewardBaseUrl = config.get("stewardBaseUrl", new URL(_)) ) } case class ErrorStatusFromDataStewardApp(response:HttpResponse,stewardBaseUrl:URL) extends AbstractProblem(ProblemSources.Qep) { - override val summary: String = s"Data Steward App responded with status ${response.status}" - override val description:String = s"The Data Steward App at ${stewardBaseUrl} responded with status ${response.status}, not OK." - override val detailsXml =
+ override lazy val summary: String = s"Data Steward App responded with status ${response.status}" + override lazy val description:String = s"The Data Steward App at ${stewardBaseUrl} responded with status ${response.status}, not OK." + override lazy val detailsXml =
Response is {response} {throwableDetail.getOrElse("")}
} \ No newline at end of file diff --git a/commons/data-commons/src/main/scala/net/shrine/problem/Problem.scala b/commons/data-commons/src/main/scala/net/shrine/problem/Problem.scala index d3b5f6fb3..264fd048f 100644 --- a/commons/data-commons/src/main/scala/net/shrine/problem/Problem.scala +++ b/commons/data-commons/src/main/scala/net/shrine/problem/Problem.scala @@ -1,191 +1,202 @@ package net.shrine.problem import java.net.InetAddress import java.text.SimpleDateFormat import java.util.Date import net.shrine.log.Loggable import net.shrine.serialization.{XmlMarshaller, XmlUnmarshaller} import scala.concurrent.Future import scala.xml.{Elem, Node, NodeSeq} /** * Describes what information we have about a problem at the site in code where we discover it. * * @author david * @since 8/6/15 */ trait Problem extends DelayedInit { def summary:String def problemName = getClass.getName def throwable:Option[Throwable] = None def stamp:Stamp def description:String - def exceptionXml(exception:Option[Throwable]): Option[Elem] = exception.map{x => + def exceptionXml(exception:Option[Throwable]): Option[Elem] = { + println("Hello!") + println(exception) + exception.map{x => {x.getClass.getName} {x.getMessage} {x.getStackTrace.map(line => {line})}{exceptionXml(Option(x.getCause)).getOrElse("")} - } + }} - def throwableDetail = exceptionXml(throwable) + def throwableDetail: Option[Elem] = exceptionXml(throwable) def detailsXml: NodeSeq = NodeSeq.fromSeq(
{throwableDetail.getOrElse("")}
) def toDigest:ProblemDigest = ProblemDigest(problemName,stamp.pretty,summary,description,detailsXml, stamp.time) override def delayedInit(code: => Unit): Unit = { code if (!ProblemConfigSource.turnOffConnector) { + println(s"Yello! ${this.throwable}") + println(s"Red! ${this.summary}") val problem = Problems - problem.DatabaseConnector.insertProblem(toDigest) + problem.DatabaseConnector.insertProblem(this.toDigest) } } } case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq, epoch: Long) extends XmlMarshaller { override def toXml: Node = { {codec} {stampText} {summary} {description} {epoch} {detailsXml} } /** * Ignores detailXml. equals with scala.xml is impossible. See http://www.scala-lang.org/api/2.10.3/index.html#scala.xml.Equality$ */ override def equals(other: Any): Boolean = other match { case that: ProblemDigest => (that canEqual this) && codec == that.codec && stampText == that.stampText && summary == that.summary && description == that.description && epoch == that.epoch case _ => false } /** * Ignores detailXml */ override def hashCode: Int = { val prime = 67 codec.hashCode + prime * (stampText.hashCode + prime *(summary.hashCode + prime * (description.hashCode + prime * epoch.hashCode()))) } } object ProblemDigest extends XmlUnmarshaller[ProblemDigest] with Loggable { override def fromXml(xml: NodeSeq): ProblemDigest = { val problemNode = xml \ "problem" require(problemNode.nonEmpty,s"No problem tag in $xml") def extractText(tagName:String) = (problemNode \ tagName).text val codec = extractText("codec") val stampText = extractText("stamp") val summary = extractText("summary") val description = extractText("description") val detailsXml: NodeSeq = problemNode \ "details" val epoch = try { extractText("epoch").toLong } catch { case nx:NumberFormatException => error(s"While parsing xml representing a ProblemDigest, the epoch could not be parsed into a long", nx) 0 } ProblemDigest(codec,stampText,summary,description,detailsXml,epoch) } } case class Stamp(host:InetAddress,time:Long,source:ProblemSources.ProblemSource) { def pretty = s"${new Date(time)} on ${host.getHostName} ${source.pretty}" } object Stamp { //TODO: val dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")? //TODO: Currently the stamp text is locale specific, which can change depending on the jre/computer running it... def apply(source:ProblemSources.ProblemSource, timer: => Long): Stamp = Stamp(InetAddress.getLocalHost, timer,source) } +/** + * An abstract problem to enable easy creation of Problems. Note that when overriding fields, + * you should only use def or lazy val, and not val. + * See: http://stackoverflow.com/questions/15346600/field-inside-object-which-extends-app-trait-is-set-to-null-why-is-that-so + * @param source + */ abstract class AbstractProblem(source:ProblemSources.ProblemSource) extends Problem { def timer = System.currentTimeMillis - val stamp = Stamp(source, timer) + lazy val stamp = Stamp(source, timer) } trait ProblemHandler { def handleProblem(problem:Problem) } /** * An example problem handler */ object LoggingProblemHandler extends ProblemHandler with Loggable { override def handleProblem(problem: Problem): Unit = { problem.throwable.fold(error(problem.toString))(throwable => error(problem.toString,throwable) ) } } object ProblemSources{ sealed trait ProblemSource { def pretty = getClass.getSimpleName.dropRight(1) } case object Adapter extends ProblemSource case object Hub extends ProblemSource case object Qep extends ProblemSource case object Dsa extends ProblemSource case object Unknown extends ProblemSource def problemSources = Set(Adapter,Hub,Qep,Dsa,Unknown) } case class ProblemNotYetEncoded(internalSummary:String,t:Option[Throwable] = None) extends AbstractProblem(ProblemSources.Unknown){ override val summary = "An unanticipated problem encountered." override val throwable = { val rx = t.fold(new IllegalStateException(s"$summary"))( new IllegalStateException(s"$summary",_) ) rx.fillInStackTrace() Some(rx) } val reportedAtStackTrace = new IllegalStateException("Capture reporting stack trace.") override val description = "This problem is not yet classified in Shrine source code. Please report the details to the Shrine dev team." override val detailsXml: NodeSeq = NodeSeq.fromSeq(
{internalSummary} {throwableDetail.getOrElse("")}
) } object ProblemNotYetEncoded { def apply(summary:String,x:Throwable):ProblemNotYetEncoded = ProblemNotYetEncoded(summary,Some(x)) } diff --git a/commons/data-commons/src/main/scala/net/shrine/problem/TestProblem.scala b/commons/data-commons/src/main/scala/net/shrine/problem/TestProblem.scala index 55fdc2c5d..f961c80dd 100644 --- a/commons/data-commons/src/main/scala/net/shrine/problem/TestProblem.scala +++ b/commons/data-commons/src/main/scala/net/shrine/problem/TestProblem.scala @@ -1,15 +1,15 @@ package net.shrine.problem /** * @author david * @since 1.22 */ case class TestProblem(override val summary: String = "test summary", override val description:String = "test description", override val throwable: Option[Throwable] = None) extends AbstractProblem(ProblemSources.Unknown) { override def timer = 0 // No point in logging test problems - override def delayedInit(code: => Unit) = { - code - } + //override def delayedInit(code: => Unit) = { + // code + //} } diff --git a/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala b/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala index 06c9c66b2..82c786782 100644 --- a/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala +++ b/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala @@ -1,79 +1,78 @@ package net.shrine.slick import java.io.PrintWriter import java.sql.{Connection, DriverManager} -import java.util.function.BiConsumer import java.util.logging.Logger import javax.naming.{Context, InitialContext} import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.config.ConfigExtensions /** * @author david * @since 1/26/16 */ object TestableDataSourceCreator { def dataSource(config:Config):DataSource = { val dataSourceFrom = config.getString("dataSourceFrom") if(dataSourceFrom == "JNDI") { val jndiDataSourceName = config.getString("jndiDataSourceName") val initialContext:InitialContext = new InitialContext() // check to see what part blows up val secondaryContext = initialContext.lookup("java:comp/env/").asInstanceOf[Context] val printKeyValues = (a: java.util.Enumeration[_]) => while (a.hasMoreElements) { println(a.nextElement()) } println("Seconday keys:") printKeyValues(secondaryContext.getEnvironment.keys()) println("Secondary values") printKeyValues(secondaryContext.getEnvironment.elements()) println("Primary keys:") printKeyValues(initialContext.getEnvironment.keys()) println("Primary values:") printKeyValues(initialContext.getEnvironment.elements()) println(initialContext.getEnvironment) initialContext.lookup(jndiDataSourceName).asInstanceOf[DataSource] } else if (dataSourceFrom == "testDataSource") { val testDataSourceConfig = config.getConfig("testDataSource") val driverClassName = testDataSourceConfig.getString("driverClassName") val url = testDataSourceConfig.getString("url") case class Credentials(username: String,password:String) def configToCredentials(config:Config) = new Credentials(config.getString("username"),config.getString("password")) val credentials: Option[Credentials] = testDataSourceConfig.getOptionConfigured("credentials",configToCredentials) //Creating an instance of the driver register it. (!) From a previous epoch, but it works. Class.forName(driverClassName).newInstance() object TestDataSource extends DataSource { override def getConnection: Connection = { credentials.fold(DriverManager.getConnection(url))(credentials => DriverManager.getConnection(url,credentials.username,credentials.password)) } override def getConnection(username: String, password: String): Connection = { DriverManager.getConnection(url, username, password) } //unused methods override def unwrap[T](iface: Class[T]): T = ??? override def isWrapperFor(iface: Class[_]): Boolean = ??? override def setLogWriter(out: PrintWriter): Unit = ??? override def getLoginTimeout: Int = ??? override def setLoginTimeout(seconds: Int): Unit = ??? override def getParentLogger: Logger = ??? override def getLogWriter: PrintWriter = ??? } TestDataSource } else throw new IllegalArgumentException(s"dataSourceFrom config value must be either JNDI or testDataSource, not $dataSourceFrom") } } diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala index 59d25fa81..be7f831fa 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala @@ -1,402 +1,402 @@ package net.shrine.protocol import javax.xml.datatype.XMLGregorianCalendar import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, Problem, ProblemDigest, ProblemSources} import net.shrine.protocol.QueryResult.StatusType import scala.xml.NodeSeq import net.shrine.util.{NodeSeqEnrichments, OptionEnrichments, SEnum, Tries, XmlDateHelper, XmlUtil} import net.shrine.serialization.{I2b2Marshaller, XmlMarshaller} import scala.util.Try /** * @author Bill Simons * @since 4/15/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html * * NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing */ final case class QueryResult ( resultId: Long, instanceId: Long, resultType: Option[ResultOutputType], setSize: Long, startDate: Option[XMLGregorianCalendar], endDate: Option[XMLGregorianCalendar], description: Option[String], statusType: StatusType, statusMessage: Option[String], problemDigest: Option[ProblemDigest] = None, breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty ) extends XmlMarshaller with I2b2Marshaller with Loggable { //only used in tests def this( resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, statusType: QueryResult.StatusType) = { this( resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), None, //description statusType, None) //statusMessage } def this( resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, description: String, statusType: QueryResult.StatusType) = { this( resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), Option(description), statusType, None) //statusMessage } def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match { case Some(rt) => rt == testedResultType case _ => false } import QueryResult._ //NB: Fragile, non-type-safe == def isError = statusType == StatusType.Error def elapsed: Option[Long] = { def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis for { start <- startDate end <- endDate } yield inMillis(end) - inMillis(start) } //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = { breakdowns.values.toSeq.sortBy(_.resultType.name) } override def toI2b2: NodeSeq = { import OptionEnrichments._ XmlUtil.stripWhitespace { { resultId } { instanceId } { description.toXml() } { resultType.fold( ResultOutputType.ERROR.toI2b2NameOnly("") ){ rt => if(rt.isBreakdown) rt.toI2b2NameOnly() else if (rt.isError) rt.toI2b2NameOnly() //The result type can be an error else if (statusType.isError) rt.toI2b2NameOnly() //Or the status type can be an error else rt.toI2b2 } } { setSize } { startDate.toXml() } { endDate.toXml() } { statusType } { statusType.toI2b2(this) } { //NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case, //so if we're going to do that, let's produce saner XML. sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data")) } } } override def toXml: NodeSeq = XmlUtil.stripWhitespace { import OptionEnrichments._ { resultId } { instanceId } { resultType.toXml(_.toXml) } { setSize } { startDate.toXml() } { endDate.toXml() } { description.toXml() } { statusType } { statusMessage.toXml() } { //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. sortedBreakdowns.map(_.toXml) } { problemDigest.map(_.toXml).getOrElse("") } } def withId(id: Long): QueryResult = copy(resultId = id) def withInstanceId(id: Long): QueryResult = copy(instanceId = id) def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize)) def withSetSize(size: Long): QueryResult = copy(setSize = size) def withDescription(desc: String): QueryResult = copy(description = Option(desc)) def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType)) def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData)) def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns) } object QueryResult { final case class StatusType( name: String, isDone: Boolean, i2b2Id: Option[Int] = Some(-1), private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2) extends StatusType.Value { def isError = this == StatusType.Error def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult) } object StatusType extends SEnum[StatusType] { private val defaultToI2b2: QueryResult => NodeSeq = { queryResult => val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{ throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id") } { i2b2Id }{ queryResult.statusType.name } } val noMessage:NodeSeq = null val Error = StatusType("ERROR", isDone = true, None, { queryResult => (queryResult.statusMessage, queryResult.problemDigest) match { case (Some(msg),Some(pd)) => { if(msg != "ERROR") msg else pd.summary } ++ pd.toXml case (Some(msg),None) => { msg } case (None,Some(pd)) => { pd.summary } ++ pd.toXml case (None, None) => noMessage } }) val Finished = StatusType("FINISHED", isDone = true, Some(3)) //TODO: Can we use the same for Queued, Processing, and Incomplete? val Processing = StatusType("PROCESSING", isDone = false, Some(2)) //todo only used in tests val Queued = StatusType("QUEUED", isDone = false, Some(2)) val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2)) //TODO: What s should these have? Does anyone care? val Held = StatusType("HELD", isDone = false) val SmallQueue = StatusType("SMALL_QUEUE", isDone = false) val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false) val LargeQueue = StatusType("LARGE_QUEUE", isDone = false) val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false) } def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _) def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = { import ResultOutputType.valueOf val typeName = asText(elemNames: _*)(xml) valueOf(typeName) orElse valueOf(breakdownTypes)(typeName) } def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = { val attempt = parse(xml) attempt.toOption } def extractProblemDigest(xml: NodeSeq):Option[ProblemDigest] = { val subXml = xml \ "problem" if(subXml.nonEmpty) Some(ProblemDigest.fromXml(xml)) else None } def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def extract(elemName: String): Option[String] = { Option((xml \ elemName).text.trim).filter(!_.isEmpty) } def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate) val asLong = extractLong(xml) _ import NodeSeqEnrichments.Strictness._ import Tries.sequence def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = { //noinspection ScalaUnnecessaryParentheses val mapAttempt = for { subXml <- xml.withChild(elemName) envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes))) mappings = envelopes.map(envelope => (envelope.resultType -> envelope)) } yield Map.empty ++ mappings mapAttempt.getOrElse(Map.empty) } QueryResult( resultId = asLong("resultId"), instanceId = asLong("instanceId"), resultType = extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml), setSize = asLong("setSize"), startDate = extractDate("startDate"), endDate = extractDate("endDate"), description = extract("description"), statusType = StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call statusMessage = extract("statusMessage"), problemDigest = extractProblemDigest(xml), breakdowns = extractBreakdowns("resultEnvelope") ) } def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def asLong = extractLong(xml) _ def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim) def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate) val statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get //TODO: Avoid fragile .get call val statusMessage: Option[String] = asTextOption("query_status_type", "description") val encodedProblemDigest = extractProblemDigest(xml \ "query_status_type") val problemDigest = if (encodedProblemDigest.isDefined) encodedProblemDigest else if (statusType.isError) Some(ErrorStatusFromCrc(statusMessage,xml.text).toDigest) else None case class Filling( resultType:Option[ResultOutputType], setSize:Long, startDate:Option[XMLGregorianCalendar], endDate:Option[XMLGregorianCalendar] ) val filling = if(!statusType.isError) { val resultType: Option[ResultOutputType] = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2) val setSize = asLong("set_size") val startDate = asXmlGcOption("start_date") val endDate = asXmlGcOption("end_date") Filling(resultType,setSize,startDate,endDate) } else { val resultType = None val setSize = 0L val startDate = None val endDate = None Filling(resultType,setSize,startDate,endDate) } QueryResult( resultId = asLong("result_instance_id"), instanceId = asLong("query_instance_id"), resultType = filling.resultType, setSize = filling.setSize, startDate = filling.startDate, endDate = filling.endDate, description = asTextOption("description"), statusType = statusType, statusMessage = statusMessage, problemDigest = problemDigest ) } def errorResult(description: Option[String], statusMessage: String,problemDigest:ProblemDigest):QueryResult = { QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } def errorResult(description: Option[String], statusMessage: String,problem:Problem):QueryResult = { val problemDigest = problem.toDigest QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } /** * For reconstituting errorResults from a database */ //todo remove and replace with real Problems def errorResult(description:Option[String], statusMessage:String, codec:String,stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq): QueryResult = { // This would require parsing the stamp text to change, and without a standard locale that's nigh impossible. // If this is replaced with real problems, then this can be addressed then. For now, passing on zero is the best bet. // TODO: REFACTOR SQUERYL ERRORS TO USE REAL PROBLEMS val problemDigest = ProblemDigest(codec,stampText,summary,digestDescription,detailsXml,0) QueryResult( resultId = 0L, instanceId = 0L, resultType = None, setSize = 0L, startDate = None, endDate = None, description = description, statusType = StatusType.Error, statusMessage = Option(statusMessage), problemDigest = Option(problemDigest)) } } case class ErrorStatusFromCrc(messageFromCrC:Option[String], xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) { - override val summary: String = "The I2B2 CRC reported an internal error." - override val description:String = s"The I2B2 CRC responded with status type ERROR ${messageFromCrC.fold(" but no message")(message => s"and a message of '$message'")}" - override val detailsXml =

+ override lazy val summary: String = "The I2B2 CRC reported an internal error." + override lazy val description:String = s"The I2B2 CRC responded with status type ERROR ${messageFromCrC.fold(" but no message")(message => s"and a message of '$message'")}" + override lazy val detailsXml =
CRC's Response is {xmlResponseFromCrc}
} diff --git a/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala b/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala index 0b444d164..f50e3f910 100644 --- a/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala +++ b/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala @@ -1,89 +1,89 @@ package net.shrine.hms.authorization import java.net.URL import com.typesafe.config.Config import net.shrine.authentication.{AuthenticationResult, Authenticator} import net.shrine.authorization.{AuthorizationResult, QueryAuthorizationService} import net.shrine.client.EndpointConfig import net.shrine.log.Loggable import net.shrine.protocol.{AuthenticationInfo, CredentialConfig, ErrorResponse, ReadApprovedQueryTopicsRequest, ReadApprovedQueryTopicsResponse, RunQueryRequest} import net.shrine.config.ConfigExtensions import net.shrine.problem.{AbstractProblem, ProblemSources} /** * @author Bill Simons * @since 1/30/12 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ final case class HmsDataStewardAuthorizationService( sheriffClient: SheriffClient, authenticator: Authenticator ) extends QueryAuthorizationService with Loggable { import net.shrine.hms.authorization.HmsDataStewardAuthorizationService._ override def readApprovedEntries(request: ReadApprovedQueryTopicsRequest): Either[ErrorResponse, ReadApprovedQueryTopicsResponse] = { val authn = request.authn authenticate(authn) match { case None => Left(ErrorResponse(HMSNotAuthenticatedProblem(authn))) case Some(ecommonsUsername) => val topics = sheriffClient.getApprovedEntries(ecommonsUsername) Right(ReadApprovedQueryTopicsResponse(topics)) } } override def authorizeRunQueryRequest(request: RunQueryRequest): AuthorizationResult = { val authn = request.authn if (request.topicId.isEmpty) { AuthorizationResult.NotAuthorized(s"HMS queries require a topic id; couldn't authenticate user ${toDomainAndUser(authn)}") } else { authenticate(authn) match { case None => AuthorizationResult.NotAuthorized(s"Requested topic is not approved; couldn't authenticate user ${toDomainAndUser(authn)}") case Some(ecommonsUsername) => sheriffClient.isAuthorized(ecommonsUsername, request.topicId.get, request.queryDefinition.toI2b2String) } } } private def authenticate(authn: AuthenticationInfo): Option[String] = { val authenticationResult = authenticator.authenticate(authn) identifyEcommonsUsername(authenticationResult) } } object HmsDataStewardAuthorizationService { def apply(config:Config,authenticator: Authenticator):HmsDataStewardAuthorizationService = { val endpointUrl = config.getString("sheriffEndpoint"+EndpointConfig.Keys.url) val credentials = config.getConfigured("sheriffCredentials", CredentialConfig(_)) val sheriffClient = JerseySheriffClient(endpointUrl, credentials.username, credentials.password) HmsDataStewardAuthorizationService(sheriffClient, authenticator) } private def toDomainAndUser(authn: AuthenticationInfo): String = s"${authn.domain}:${authn.username}" def identifyEcommonsUsername(authenticationResult: AuthenticationResult): Option[String] = authenticationResult match { case AuthenticationResult.Authenticated(_, ecommonsUsername) => Option(ecommonsUsername) case _ => None } } case class HMSNotAuthenticatedProblem(authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep){ - override val summary = s"Can not authenticate ${authn.domain}:${authn.username}." + override lazy val summary = s"Can not authenticate ${authn.domain}:${authn.username}." - override val description = s"Can not authenticate ${authn.domain}:${authn.username}." + override lazy val description = s"Can not authenticate ${authn.domain}:${authn.username}." } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala index bfa2100e1..e8d129e82 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala @@ -1,129 +1,129 @@ package net.shrine.aggregation import java.net.{ConnectException, UnknownHostException} import com.sun.jersey.api.client.ClientHandlerException import net.shrine.broadcaster.CouldNotParseResultsException import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemNotYetEncoded, ProblemSources} import scala.concurrent.duration.Duration import net.shrine.protocol.ErrorResponse import net.shrine.protocol.Failure import net.shrine.protocol.NodeId import net.shrine.protocol.Result import net.shrine.protocol.SingleNodeResult import net.shrine.protocol.Timeout import net.shrine.protocol.BaseShrineResponse /** * * @author Clint Gilbert * @since Sep 16, 2011 * * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Represents the basic aggregation strategy shared by several aggregators: * - Parses a sequence of SpinResultEntries into a sequence of some * combination of valid responses, ErrorResponses, and invalid * responses (cases where ShrineResponse.fromXml returns None) * - Filters the valid responses, weeding out responses that aren't of * the expected type * Invokes an abstract method with the valid responses, errors, and * invalid responses. * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class BasicAggregator[T <: BaseShrineResponse: Manifest] extends Aggregator with Loggable { private[aggregation] def isAggregatable(response: BaseShrineResponse): Boolean = { manifest[T].runtimeClass.isAssignableFrom(response.getClass) } import BasicAggregator._ override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): BaseShrineResponse = { val resultsOrErrors: Iterable[ParsedResult[T]] = { for { result <- results } yield { val parsedResponse: ParsedResult[T] = result match { case Result(origin, _, errorResponse: ErrorResponse) => Error(Option(origin), errorResponse) case Result(origin, elapsed, response: T) if isAggregatable(response) => Valid(origin, elapsed, response) case Timeout(origin) => Error(Option(origin), ErrorResponse(TimedOutWithAdapter(origin))) case Failure(origin, cause) => cause match { case cx: ConnectException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, cx))) case uhx: UnknownHostException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, uhx))) case chx: ClientHandlerException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, chx))) case cnprx:CouldNotParseResultsException => if(cnprx.statusCode >= 400) Error(Option(origin), ErrorResponse(HttpErrorResponseProblem(cnprx))) else Error(Option(origin), ErrorResponse(CouldNotParseResultsProblem(cnprx))) case x => Error(Option(origin), ErrorResponse(ProblemNotYetEncoded(s"Failure querying node ${origin.name}",x))) } case _ => Invalid(None, s"Unexpected response in $getClass:\r\n $result") } parsedResponse } } val invalidResponses = resultsOrErrors.collect { case invalid: Invalid => invalid } val validResponses = resultsOrErrors.collect { case valid: Valid[T] => valid } val errorResponses: Iterable[Error] = resultsOrErrors.collect { case error: Error => error } //Log all parsing errors invalidResponses.map(_.errorMessage).foreach(this.error(_)) val previouslyDetectedErrors = errors.map(Error(None, _)) makeResponseFrom(validResponses, errorResponses ++ previouslyDetectedErrors, invalidResponses) } private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse } object BasicAggregator { private[aggregation] sealed abstract class ParsedResult[+T] private[aggregation] final case class Valid[T](origin: NodeId, elapsed: Duration, response: T) extends ParsedResult[T] private[aggregation] final case class Error(origin: Option[NodeId], response: ErrorResponse) extends ParsedResult[Nothing] private[aggregation] final case class Invalid(origin: Option[NodeId], errorMessage: String) extends ParsedResult[Nothing] } case class CouldNotConnectToAdapter(origin:NodeId,cx: Exception) extends AbstractProblem(ProblemSources.Hub) { - override val throwable = Some(cx) - override val summary: String = "Shrine could not connect to the adapter." - override val description: String = s"Shrine could not connect to the adapter at ${origin.name} due to ${throwable.get}." + override lazy val throwable = Some(cx) + override lazy val summary: String = "Shrine could not connect to the adapter." + override lazy val description: String = s"Shrine could not connect to the adapter at ${origin.name} due to ${throwable.get}." } case class TimedOutWithAdapter(origin:NodeId) extends AbstractProblem(ProblemSources.Hub) { - override val throwable = None - override val summary: String = "Timed out with adapter." - override val description: String = s"Shrine observed a timeout with the adapter at ${origin.name}." + override lazy val throwable = None + override lazy val summary: String = "Timed out with adapter." + override lazy val description: String = s"Shrine observed a timeout with the adapter at ${origin.name}." } case class CouldNotParseResultsProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) { - override val throwable = Some(cnrpx) - override val summary: String = "Could not parse response." - override val description = s"While parsing a response from ${cnrpx.url} with http code ${cnrpx.statusCode} caught '${cnrpx.cause}'" - override val detailsXml =

+ override lazy val throwable = Some(cnrpx) + override lazy val summary: String = "Could not parse response." + override lazy val description = s"While parsing a response from ${cnrpx.url} with http code ${cnrpx.statusCode} caught '${cnrpx.cause}'" + override lazy val detailsXml =
Message body is {cnrpx.body} {throwableDetail.getOrElse("")}
} case class HttpErrorResponseProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) { - override val throwable = Some(cnrpx) - override val summary: String = "Adapter error." - override val description = s"Observed http status code ${cnrpx.statusCode} from ${cnrpx.url} and caught ${cnrpx.cause}." - override val detailsXml =
+ override lazy val throwable = Some(cnrpx) + override lazy val summary: String = "Adapter error." + override lazy val description = s"Observed http status code ${cnrpx.statusCode} from ${cnrpx.url} and caught ${cnrpx.cause}." + override lazy val detailsXml =
Message body is {cnrpx.body} {throwableDetail.getOrElse("")}
} \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala index 97789b287..dae2ca6f7 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala @@ -1,43 +1,43 @@ package net.shrine.aggregation import net.shrine.aggregation.BasicAggregator.{Error, Invalid, Valid} import net.shrine.problem.{AbstractProblem, ProblemSources} import net.shrine.protocol.ErrorResponse import net.shrine.protocol.BaseShrineResponse /** * * @author Clint Gilbert * @since Sep 16, 2011 * * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Extends BasicAggregator to ignore Errors and Invalid responses * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class IgnoresErrorsAggregator[T <: BaseShrineResponse : Manifest] extends BasicAggregator[T] { private[aggregation] override def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse = { //Filter out errors and invalid responses makeResponseFrom(validResponses) } //Default implementation, just returns first valid response, or if there are none, an ErrorResponse private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]]): BaseShrineResponse = { validResponses.map(_.response).toSet.headOption.getOrElse{ val problem = NoValidResponsesToAggregate() ErrorResponse(problem) } } } case class NoValidResponsesToAggregate() extends AbstractProblem(ProblemSources.Hub) { - override val summary: String = "No valid responses to aggregate." + override lazy val summary: String = "No valid responses to aggregate." - override val description:String = "The hub received no valid responses to aggregate." + override lazy val description:String = "The hub received no valid responses to aggregate." } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala index b61c2a5b0..1721fdc4d 100644 --- a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala +++ b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala @@ -1,533 +1,533 @@ package net.shrine.qep.queries import java.sql.SQLException import java.util.concurrent.TimeoutException import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName} import net.shrine.log.Loggable import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources} import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, FlagQueryRequest, I2b2ResultEnvelope, QueryMaster, QueryResult, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RenameQueryRequest, ResultOutputType, ResultOutputTypes, RunQueryRequest, UnFlagQueryRequest} import net.shrine.qep.QepConfigSource import net.shrine.slick.{CouldNotRunDbIoActionException, TestableDataSourceCreator} import net.shrine.util.XmlDateHelper import slick.driver.JdbcProfile import scala.collection.immutable.Iterable import scala.concurrent.duration.{Duration, DurationInt} import scala.concurrent.{Await, Future, blocking} import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.util.control.NonFatal import scala.xml.XML /** * DB code for the QEP's query instances and query results. * * @author david * @since 1/19/16 */ case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource,timeout:Duration) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) try { blocking { Await.result(future, timeout) } } catch { case tx:TimeoutException => throw CouldNotRunDbIoActionException(dataSource,tx) case NonFatal(x) => throw CouldNotRunDbIoActionException(dataSource,x) } } def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = { debug(s"insertQepQuery $runQueryRequest") insertQepQuery(QepQuery(runQueryRequest)) } def insertQepQuery(qepQuery: QepQuery):Unit = { dbRun(allQepQueryQuery += qepQuery) } def selectAllQepQueries:Seq[QepQuery] = { dbRun(mostRecentVisibleQepQueries.result) } def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = { val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain,request.fetchSize) val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set]) val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId))) ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2))) } def selectPreviousQueriesByUserAndDomain(userName: UserName, domain: String, limit:Int):Seq[QepQuery] = { dbRun(mostRecentVisibleQepQueries.filter(_.userName === userName).filter(_.userDomain === domain).sortBy(x => x.changeDate.desc).take(limit).result) } def renamePreviousQuery(request:RenameQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName,changeDate = System.currentTimeMillis())) } yield queryResults ) } def markDeleted(request:DeleteQueryRequest):Unit = { val networkQueryId = request.networkQueryId dbRun( for { queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result _ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true,changeDate = System.currentTimeMillis())) } yield queryResults ) } def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(flagQueryRequest)) } def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(unflagQueryRequest)) } def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = { dbRun(allQepQueryFlags += qepQueryFlag) } def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = { val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result) flags.map(x => x.networkQueryId -> x).toMap } def insertQepResultRow(qepQueryRow:QueryResultRow) = { dbRun(allQueryResultRows += qepQueryRow) } def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = { val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node")) val queryResultRow = QueryResultRow(networkQueryId,result) val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_)) val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq] dbRun( for { _ <- allQueryResultRows += queryResultRow _ <- allBreakdownResultsRows ++= breakdowns _ <- allProblemDigestRows ++= problem } yield () ) } def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = { dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result) } def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = { val (queryResults, breakdowns,problems) = dbRun( for { queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result breakdowns <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result } yield (queryResults, breakdowns, problems) ) val resultIdsToI2b2ResultEnvelopes: Map[Long, Map[ResultOutputType, I2b2ResultEnvelope]] = breakdowns.groupBy(_.resultId).map(rIdToB => rIdToB._1 -> QepQueryBreakdownResultsRow.resultEnvelopesFrom(rIdToB._2)) def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = { if(problemSeq.size == 1) problemSeq.head.toProblemDigest else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq") } val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) ) queryResults.map(r => r.toQueryResult( resultIdsToI2b2ResultEnvelopes.getOrElse(r.resultId,Map.empty), adapterNodesToProblemDigests.get(r.adapterNode) )) } def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = { dbRun(allBreakdownResultsRows += breakdownResultsRow) } def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = { dbRun(allBreakdownResultsRows.result) } } object QepQueryDb extends Loggable { val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config) val timeout = QepQuerySchema.config.getInt("timeout") seconds val db = QepQueryDb(QepQuerySchema.schema,dataSource,timeout) val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) QepQueryDb.db.createTables() } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepQuerySchema(jdbcProfile: JdbcProfile,moreBreakdowns: Set[ResultOutputType]) extends Loggable { import jdbcProfile.api._ def ddlForAllTables: jdbcProfile.DDL = { allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema } //to get the schema, use the REPL //println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") { def networkId = column[NetworkQueryId]("networkId") def userName = column[UserName]("userName") def userDomain = column[String]("domain") def queryName = column[QueryName]("queryName") def expression = column[Option[String]]("expression") def dateCreated = column[Time]("dateCreated") def deleted = column[Boolean]("deleted") def queryXml = column[String]("queryXml") def changeDate = column[Long]("changeDate") def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply) } val allQepQueryQuery = TableQuery[QepQueries] val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for( queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists ) yield queries val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false) class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") { def networkId = column[NetworkQueryId]("networkId") def flagged = column[Boolean]("flagged") def flagMessage = column[String]("flagMessage") def changeDate = column[Long]("changeDate") def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply) } val allQepQueryFlags = TableQuery[QepQueryFlags] val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for( queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists ) yield queryFlags val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values ++ moreBreakdowns val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap) implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({ (resultType: ResultOutputType) => queryResultTypesToString(resultType) },{ (string: String) => stringsToQueryResultTypes(string) }) implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({ statusType => statusType.name },{ name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}")) }) class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") { def resultId = column[Long]("resultId") def networkQueryId = column[NetworkQueryId]("networkQueryId") def instanceId = column[Long]("instanceId") def adapterNode = column[String]("adapterNode") def resultType = column[Option[ResultOutputType]]("resultType") def size = column[Long]("size") def startDate = column[Option[Long]]("startDate") def endDate = column[Option[Long]]("endDate") def status = column[QueryResult.StatusType]("status") def statusMessage = column[Option[String]]("statusMessage") def changeDate = column[Long]("changeDate") def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply) } val allQueryResultRows = TableQuery[QepQueryResults] //Most recent query result rows for each queryId from each adapter val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for( queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists ) yield queryResultRows class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def resultId = column[Long]("resultId") def resultType = column[ResultOutputType]("resultType") def dataKey = column[String]("dataKey") def value = column[Long]("value") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply) } val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults] //Most recent query result rows for each queryId from each adapter val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for( breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists ) yield breakdownResultsRows /* case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller { */ class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def codec = column[String]("codec") def stamp = column[String]("stamp") def summary = column[String]("summary") def description = column[String]("description") def details = column[String]("details") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply) } val allProblemDigestRows = TableQuery[QepResultProblemDigests] val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for( problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists ) yield problemDigests } object QepQuerySchema { val allConfig:Config = QepConfigSource.config val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName) import net.shrine.config.{ConfigExtensions, Keys} val moreBreakdowns: Set[ResultOutputType] = config.getOptionConfigured("breakdownResultOutputTypes",ResultOutputTypes.fromConfig).getOrElse(Set.empty) val schema = QepQuerySchema(slickProfile,moreBreakdowns) } case class QepQuery( networkId:NetworkQueryId, userName: UserName, userDomain: String, queryName: QueryName, expression: Option[String], dateCreated: Time, deleted: Boolean, queryXml: String, changeDate: Time ){ def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = { QueryMaster( queryMasterId = networkId.toString, networkQueryId = networkId, name = queryName, userId = userName, groupId = userDomain, createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated), flagged = qepQueryFlag.map(_.flagged), flagMessage = qepQueryFlag.map(_.flagMessage) ) } } object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,Option[String],Time,Boolean,String,Time) => QepQuery) { def apply(runQueryRequest: RunQueryRequest):QepQuery = { new QepQuery( networkId = runQueryRequest.networkQueryId, userName = runQueryRequest.authn.username, userDomain = runQueryRequest.authn.domain, queryName = runQueryRequest.queryDefinition.name, expression = runQueryRequest.queryDefinition.expr.map(_.toString), dateCreated = System.currentTimeMillis(), deleted = false, queryXml = runQueryRequest.toXmlString, changeDate = System.currentTimeMillis() ) } } case class QepQueryFlag( networkQueryId: NetworkQueryId, flagged:Boolean, flagMessage:String, changeDate:Long ) object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) { def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = flagQueryRequest.networkQueryId, flagged = true, flagMessage = flagQueryRequest.message.getOrElse(""), changeDate = System.currentTimeMillis() ) } def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = unflagQueryRequest.networkQueryId, flagged = false, flagMessage = "", changeDate = System.currentTimeMillis() ) } } case class QueryResultRow( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:Option[ResultOutputType], size:Long, startDate:Option[Long], endDate:Option[Long], status:QueryResult.StatusType, statusMessage:Option[String], changeDate:Long ) { def toQueryResult(breakdowns:Map[ResultOutputType,I2b2ResultEnvelope],problemDigest:Option[ProblemDigest]) = QueryResult( resultId = resultId, instanceId = instanceId, resultType = resultType, setSize = size, startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar), endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar), description = Some(adapterNode), statusType = status, statusMessage = statusMessage, breakdowns = breakdowns, problemDigest = problemDigest ) } object QueryResultRow extends ((Long,NetworkQueryId,Long,String,Option[ResultOutputType],Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow) { def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = { new QueryResultRow( resultId = result.resultId, networkQueryId = networkQueryId, instanceId = result.instanceId, adapterNode = result.description.getOrElse(s"$result has None in its description field, not a name of an adapter node."), resultType = result.resultType, size = result.setSize, startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis), endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis), status = result.statusType, statusMessage = result.statusMessage, changeDate = System.currentTimeMillis() ) } } case class QepQueryBreakdownResultsRow( networkQueryId: NetworkQueryId, adapterNode:String, resultId:Long, resultType: ResultOutputType, dataKey:String, value:Long, changeDate:Long ) object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){ def breakdownRowsFor(networkQueryId:NetworkQueryId, adapterNode:String, resultId:Long, breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = { breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis())) } def resultEnvelopesFrom(breakdowns:Seq[QepQueryBreakdownResultsRow]): Map[ResultOutputType, I2b2ResultEnvelope] = { def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = { val data = breakdowns.map(b => b.dataKey -> b.value).toMap I2b2ResultEnvelope(resultType,data) } breakdowns.groupBy(_.resultType).map(r => r._1 -> resultEnvelopeFrom(r._1,r._2)) } } case class QepProblemDigestRow( networkQueryId: NetworkQueryId, adapterNode: String, codec: String, stampText: String, summary: String, description: String, details: String, changeDate:Long ){ def toProblemDigest = { ProblemDigest( codec, stampText, summary, description, if(!details.isEmpty) XML.loadString(details) else
, //TODO: FIGURE OUT HOW TO GET AN ACUTAL EPOCH INTO HERE 0 ) } } case class QepDatabaseProblem(x:Exception) extends AbstractProblem(ProblemSources.Qep){ - override val summary = "A problem encountered while using a database." + override lazy val summary = "A problem encountered while using a database." - override val throwable = Some(x) + override lazy val throwable = Some(x) - override val description = x.getMessage + override lazy val description = x.getMessage } \ No newline at end of file