diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
index 2ef3dbe15..a7c6d0777 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
@@ -1,88 +1,88 @@
package net.shrine.adapter
import java.sql.SQLException
import net.shrine.log.Loggable
import net.shrine.problem._
import net.shrine.protocol._
/**
* @author Bill Simons
* @since 4/8/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
abstract class Adapter extends Loggable {
final def perform(message: BroadcastMessage): BaseShrineResponse = {
def problemToErrorResponse(problem:Problem):ErrorResponse = {
LoggingProblemHandler.handleProblem(problem)
ErrorResponse(problem)
}
val shrineResponse = try {
processRequest(message)
} catch {
case e: AdapterLockoutException => problemToErrorResponse(AdapterLockout(message.request.authn,e))
case e @ CrcInvocationException(invokedCrcUrl, request, cause) => problemToErrorResponse(CrcCouldNotBeInvoked(invokedCrcUrl,request,e))
case e: AdapterMappingException => problemToErrorResponse(AdapterMappingProblem(e))
case e: SQLException => problemToErrorResponse(AdapterDatabaseProblem(e))
//noinspection RedundantBlock
case e: Exception => {
val summary = if(message == null) "Unknown problem in Adapter.perform with null BroadcastMessage"
else s"Unexpected exception in Adapter"
problemToErrorResponse(ProblemNotYetEncoded(summary,e))
}
}
shrineResponse
}
protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse
//NOOP, may be overridden by subclasses
def shutdown(): Unit = ()
}
case class AdapterLockout(authn:AuthenticationInfo,x:AdapterLockoutException) extends AbstractProblem(ProblemSources.Adapter) {
- override val throwable = Some(x)
- override val summary: String = s"User '${authn.domain}:${authn.username}' locked out."
- override val description:String = s"User '${authn.domain}:${authn.username}' has run too many queries that produce the same result at ${x.url} ."
+ override lazy val throwable = Some(x)
+ override lazy val summary: String = s"User '${authn.domain}:${authn.username}' locked out."
+ override lazy val description:String = s"User '${authn.domain}:${authn.username}' has run too many queries that produce the same result at ${x.url} ."
}
case class CrcCouldNotBeInvoked(crcUrl:String,request:ShrineRequest,x:CrcInvocationException) extends AbstractProblem(ProblemSources.Adapter) {
- override val throwable = Some(x)
- override val summary: String = s"Error communicating with I2B2 CRC."
- override val description: String = s"Error invoking the CRC at '$crcUrl' with a ${request.getClass.getSimpleName} due to ${throwable.get}."
- override val detailsXml =
+ override lazy val throwable = Some(x)
+ override lazy val summary: String = s"Error communicating with I2B2 CRC."
+ override lazy val description: String = s"Error invoking the CRC at '$crcUrl' with a ${request.getClass.getSimpleName} due to ${throwable.get}."
+ override lazy val detailsXml =
Request is {request}
{throwableDetail.getOrElse("")}
}
case class AdapterMappingProblem(x:AdapterMappingException) extends AbstractProblem(ProblemSources.Adapter) {
- override val throwable = Some(x)
- override val summary: String = "Could not map query term(s)."
- override val description = s"The Shrine Adapter on ${stamp.host.getHostName} cannot map this query to its local terms."
- override val detailsXml =
+ override lazy val throwable = Some(x)
+ override lazy val summary: String = "Could not map query term(s)."
+ override lazy val description = s"The Shrine Adapter on ${stamp.host.getHostName} cannot map this query to its local terms."
+ override lazy val detailsXml =
Query Defitiontion is {x.runQueryRequest.queryDefinition}
RunQueryRequest is ${x.runQueryRequest.elideAuthenticationInfo}
{throwableDetail.getOrElse("")}
}
case class AdapterDatabaseProblem(x:SQLException) extends AbstractProblem(ProblemSources.Adapter) {
- override val throwable = Some(x)
- override val summary: String = "Problem using the Adapter database."
- override val description = "The Shrine Adapter encountered a problem using a database."
+ override lazy val throwable = Some(x)
+ override lazy val summary: String = "Problem using the Adapter database."
+ override lazy val description = "The Shrine Adapter encountered a problem using a database."
}
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala
index df6ef54f2..e23c911b3 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/CrcAdapter.scala
@@ -1,108 +1,108 @@
package net.shrine.adapter
import org.xml.sax.SAXParseException
import scala.xml.NodeSeq
import scala.xml.XML
import net.shrine.protocol.{AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, BroadcastMessage, Credential, ErrorResponse, HiveCredentials, ShrineRequest, ShrineResponse, TranslatableRequest}
import net.shrine.util.XmlDateHelper
import net.shrine.client.Poster
import net.shrine.problem.{AbstractProblem, ProblemSources}
import scala.util.Try
import scala.util.control.NonFatal
/**
* @author Bill Simons
* @since 4/11/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
abstract class CrcAdapter[T <: ShrineRequest, V <: ShrineResponse](
poster: Poster,
override protected val hiveCredentials: HiveCredentials) extends WithHiveCredentialsAdapter(hiveCredentials) {
protected def parseShrineResponse(nodeSeq: NodeSeq): ShrineResponse
private[adapter] def parseShrineErrorResponseWithFallback(xmlResponseFromCrc: String): ShrineResponse = {
//NB: See https://open.med.harvard.edu/jira/browse/SHRINE-534
//NB: https://open.med.harvard.edu/jira/browse/SHRINE-745
val shrineResponseAttempt = for {
crcXml <- Try(XML.loadString(xmlResponseFromCrc))
shrineResponse <- Try(parseShrineResponse(crcXml)).recover { case NonFatal(e) =>
info(s"Exception while parsing $crcXml",e)
ErrorResponse.fromI2b2(crcXml)
} //todo pass the exception to build a proper error response, and log the exception
} yield shrineResponse
shrineResponseAttempt.recover {
case saxx:SAXParseException => ErrorResponse(CannotParseXmlFromCrc(saxx,xmlResponseFromCrc))
case NonFatal(e) =>
error(s"Error parsing response from CRC: ", e)
ErrorResponse(ExceptionWhileLoadingCrcResponse(e,xmlResponseFromCrc))
}.get
}
//NB: default is a noop; only RunQueryAdapter needs this for now
protected[adapter] def translateNetworkToLocal(request: T): T = request
protected[adapter] override def processRequest(message: BroadcastMessage): BaseShrineResponse = {
val i2b2Response = callCrc(translateRequest(message.request))
parseShrineErrorResponseWithFallback(i2b2Response)
}
protected def callCrc(request: ShrineRequest): String = {
debug(s"Sending Shrine-formatted request to the CRC at '${poster.url}': $request")
val crcRequest = request.toI2b2String
val crcResponse = XmlDateHelper.time(s"Calling the CRC at '${poster.url}'")(debug(_)) {
//Wrap exceptions in a more descriptive form, to enable sending better error messages back to the legacy web client
try { poster.post(crcRequest) }
catch {
case NonFatal(e) => throw CrcInvocationException(poster.url, request, e)
}
}
crcResponse.body
}
private[adapter] def translateRequest(request: BaseShrineRequest): ShrineRequest = request match {
case transReq: TranslatableRequest[T] => //noinspection RedundantBlock
{
val HiveCredentials(domain, username, password, project) = hiveCredentials
val authInfo = AuthenticationInfo(domain, username, Credential(password, isToken = false))
translateNetworkToLocal(transReq.withAuthn(authInfo).withProject(project).asRequest)
}
case req: ShrineRequest => req
case _ => throw new IllegalArgumentException(s"Unexpected request: $request")
}
}
case class CannotParseXmlFromCrc(saxx:SAXParseException,xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) {
- override val throwable = Some(saxx)
- override val summary: String = "Could not parse response from CRC."
- override val description:String = s"${saxx.getMessage} while parsing the response from the CRC."
- override val detailsXml =
+ override lazy val throwable = Some(saxx)
+ override lazy val summary: String = "Could not parse response from CRC."
+ override lazy val description:String = s"${saxx.getMessage} while parsing the response from the CRC."
+ override lazy val detailsXml =
{throwableDetail.getOrElse("")}
Response is {xmlResponseFromCrc}
}
case class ExceptionWhileLoadingCrcResponse(t:Throwable,xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) {
- override val throwable = Some(t)
- override val summary: String = "Unanticipated exception with response from CRC."
- override val description:String = s"${t.getMessage} while parsing the response from the CRC."
- override val detailsXml =
+ override lazy val throwable = Some(t)
+ override lazy val summary: String = "Unanticipated exception with response from CRC."
+ override lazy val description:String = s"${t.getMessage} while parsing the response from the CRC."
+ override lazy val detailsXml =
{throwableDetail.getOrElse("")}
Response is {xmlResponseFromCrc}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala
index 9e3069947..6f947393f 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/components/QueryDefinitions.scala
@@ -1,39 +1,39 @@
package net.shrine.adapter.components
import net.shrine.adapter.dao.AdapterDao
import net.shrine.problem.{AbstractProblem, ProblemSources}
import net.shrine.protocol.ShrineResponse
import net.shrine.protocol.ReadQueryDefinitionRequest
import net.shrine.protocol.ReadQueryDefinitionResponse
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.query.QueryDefinition
import net.shrine.protocol.AbstractReadQueryDefinitionRequest
/**
* @author clint
* @since Apr 4, 2013
*
* NB: Tested by ReadQueryDefinitionAdapterTest
*/
final case class QueryDefinitions[Req <: AbstractReadQueryDefinitionRequest](dao: AdapterDao) {
def get(request: Req): ShrineResponse = {
val resultOption = for {
shrineQuery <- dao.findQueryByNetworkId(request.queryId)
} yield {
ReadQueryDefinitionResponse(
shrineQuery.networkId,
shrineQuery.name,
shrineQuery.username,
shrineQuery.dateCreated,
//TODO: I2b2 or Shrine format?
shrineQuery.queryDefinition.toI2b2String)
}
resultOption.getOrElse(ErrorResponse(QueryNotInDatabase(request)))
}
}
case class QueryNotInDatabase(request:AbstractReadQueryDefinitionRequest) extends AbstractProblem(ProblemSources.Hub) {
- override val summary: String = s"Couldn't find query definition."
- override val description:String = s"The query definition with network id: ${request.queryId} does not exist at this site."
+ override lazy val summary: String = s"Couldn't find query definition."
+ override lazy val description:String = s"The query definition with network id: ${request.queryId} does not exist at this site."
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala
index 64777b48c..a40d46778 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/squeryl/SquerylAdapterDao.scala
@@ -1,464 +1,464 @@
package net.shrine.adapter.dao.squeryl
import javax.xml.datatype.XMLGregorianCalendar
import net.shrine.adapter.dao.AdapterDao
import net.shrine.adapter.dao.model.{ObfuscatedPair, ShrineQuery, ShrineQueryResult}
import net.shrine.adapter.dao.model.squeryl.{SquerylBreakdownResultRow, SquerylCountRow, SquerylPrivilegedUser, SquerylQueryResultRow, SquerylShrineError, SquerylShrineQuery}
import net.shrine.adapter.dao.squeryl.tables.Tables
import net.shrine.dao.DateHelpers
import net.shrine.dao.squeryl.{SquerylEntryPoint, SquerylInitializer}
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemSources}
import net.shrine.protocol.{AuthenticationInfo, I2b2ResultEnvelope, QueryResult, ResultOutputType}
import net.shrine.protocol.query.QueryDefinition
import net.shrine.util.XmlDateHelper
import org.squeryl.Query
import org.squeryl.dsl.GroupWithMeasures
import scala.util.Try
import scala.xml.NodeSeq
/**
* @author clint
* @since May 22, 2013
*/
final class SquerylAdapterDao(initializer: SquerylInitializer, tables: Tables)(implicit breakdownTypes: Set[ResultOutputType]) extends AdapterDao with Loggable {
initializer.init()
override def inTransaction[T](f: => T): T = SquerylEntryPoint.inTransaction { f }
import SquerylEntryPoint._
override def flagQuery(networkQueryId: Long, flagMessage: Option[String]): Unit = mutateFlagField(networkQueryId, newIsFlagged = true, flagMessage)
override def unFlagQuery(networkQueryId: Long): Unit = mutateFlagField(networkQueryId, newIsFlagged = false, None)
private def mutateFlagField(networkQueryId: Long, newIsFlagged: Boolean, newFlagMessage: Option[String]): Unit = {
inTransaction {
update(tables.shrineQueries) { queryRow =>
where(queryRow.networkId === networkQueryId).
set(queryRow.isFlagged := newIsFlagged, queryRow.flagMessage := newFlagMessage)
}
}
}
override def storeResults(
authn: AuthenticationInfo,
masterId: String,
networkQueryId: Long,
queryDefinition: QueryDefinition,
rawQueryResults: Seq[QueryResult],
obfuscatedQueryResults: Seq[QueryResult],
failedBreakdownTypes: Seq[ResultOutputType],
mergedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope],
obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]): Unit = {
inTransaction {
val insertedQueryId = insertQuery(masterId,
networkQueryId,
authn,
queryDefinition,
isFlagged = false,
hasBeenRun = true,
flagMessage = None)
val insertedQueryResultIds = insertQueryResults(insertedQueryId, rawQueryResults)
storeCountResults(rawQueryResults, obfuscatedQueryResults, insertedQueryResultIds)
storeErrorResults(rawQueryResults, insertedQueryResultIds)
storeBreakdownFailures(failedBreakdownTypes.toSet, insertedQueryResultIds)
insertBreakdownResults(insertedQueryResultIds, mergedBreakdowns, obfuscatedBreakdowns)
}
}
private[adapter] def storeCountResults(raw: Seq[QueryResult], obfuscated: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = {
val notErrors = raw.filter(!_.isError)
val obfuscatedNotErrors = obfuscated.filter(!_.isError)
if(notErrors.size > 1) {
warn(s"Got ${notErrors.size} raw (hopefully-)count results; more than 1 is unusual.")
}
if(obfuscatedNotErrors.size > 1) {
warn(s"Got ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; more than 1 is unusual.")
}
if(notErrors.size != obfuscatedNotErrors.size) {
warn(s"Got ${notErrors.size} raw and ${obfuscatedNotErrors.size} obfuscated (hopefully-)count results; that these numbers are different is unusual.")
}
import ResultOutputType.PATIENT_COUNT_XML
def isCount(qr: QueryResult): Boolean = qr.resultType.contains(PATIENT_COUNT_XML)
inTransaction {
//NB: Take the count/setSize from the FIRST PATIENT_COUNT_XML QueryResult,
//though the same count should be there for all of them, if there are more than one
for {
Seq(insertedCountQueryResultId) <- insertedIds.get(PATIENT_COUNT_XML)
notError <- notErrors.find(isCount) //NB: Find a count result, just to be sure
obfuscatedNotError <- obfuscatedNotErrors.find(isCount) //NB: Find a count result, just to be sure
} {
insertCountResult(insertedCountQueryResultId, notError.setSize, obfuscatedNotError.setSize)
}
}
}
private[adapter] def storeErrorResults(results: Seq[QueryResult], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = {
val errors = results.filter(_.isError)
val insertedErrorResultIds = insertedIds.getOrElse(ResultOutputType.ERROR,Nil)
val insertedIdsToErrors = insertedErrorResultIds zip errors
inTransaction {
for {
(insertedErrorResultId, errorQueryResult) <- insertedIdsToErrors
} {
val pd = errorQueryResult.problemDigest.get //it's an error so it will have a problem digest
insertErrorResult(
insertedErrorResultId,
errorQueryResult.statusMessage.getOrElse("Unknown failure"),
pd.codec,
pd.stampText,
pd.summary,
pd.description,
pd.detailsXml
)
}
}
}
private[adapter] def storeBreakdownFailures(failedBreakdownTypes: Set[ResultOutputType], insertedIds: Map[ResultOutputType, Seq[Int]]): Unit = {
val insertedIdsForFailedBreakdownTypes = insertedIds.filterKeys(failedBreakdownTypes.contains)
inTransaction {
for {
(failedBreakdownType, Seq(resultId)) <- insertedIdsForFailedBreakdownTypes
} {
//todo propagate backwards to the breakdown failure to create the corect problem
object BreakdownFailure extends AbstractProblem(ProblemSources.Adapter) {
- override val summary: String = "Couldn't retrieve result breakdown"
- override val description:String = s"Couldn't retrieve result breakdown of type '$failedBreakdownType'"
+ override lazy val summary: String = "Couldn't retrieve result breakdown"
+ override lazy val description:String = s"Couldn't retrieve result breakdown of type '$failedBreakdownType'"
}
val pd = BreakdownFailure.toDigest
insertErrorResult(
resultId,
s"Couldn't retrieve breakdown of type '$failedBreakdownType'",
pd.codec,
pd.stampText,
pd.summary,
pd.description,
pd.detailsXml
)
}
}
}
override def findRecentQueries(howMany: Int): Seq[ShrineQuery] = {
inTransaction {
Queries.queriesForAllUsers.take(howMany).map(_.toShrineQuery).toSeq
}
}
def findAllCounts():Seq[SquerylCountRow] = {
inTransaction{
Queries.allCountResults.toSeq
}
}
override def renameQuery(networkQueryId: Long, newName: String) {
inTransaction {
update(tables.shrineQueries) { queryRow =>
where(queryRow.networkId === networkQueryId).
set(queryRow.name := newName)
}
}
}
override def deleteQuery(networkQueryId: Long): Unit = {
inTransaction {
tables.shrineQueries.deleteWhere(_.networkId === networkQueryId)
}
}
override def deleteQueryResultsFor(networkQueryId: Long): Unit = {
inTransaction {
val resultIdsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) =>
where(queryRow.networkId === networkQueryId).
select(resultRow.id).
on(queryRow.id === resultRow.queryId)
}.toSet
tables.queryResults.deleteWhere(_.id in resultIdsForNetworkQueryId)
}
}
override def isUserLockedOut(authn: AuthenticationInfo, defaultThreshold: Int): Boolean = Try {
inTransaction {
val privilegedUserOption = Queries.privilegedUsers(authn.domain, authn.username).singleOption
val threshold:Int = privilegedUserOption.flatMap(_.threshold).getOrElse(defaultThreshold.intValue)
val thirtyDaysInThePast: XMLGregorianCalendar = DateHelpers.daysFromNow(-30)
val overrideDate: XMLGregorianCalendar = privilegedUserOption.map(_.toPrivilegedUser).flatMap(_.overrideDate).getOrElse(thirtyDaysInThePast)
//sorted instead of just finding max
val counts: Seq[Long] = Queries.repeatedResults(authn.domain, authn.username, overrideDate).toSeq.sorted
//and then grabbing the last, highest value in the sorted sequence
val repeatedResultCount: Long = counts.lastOption.getOrElse(0L)
val result = repeatedResultCount > threshold
debug(s"User ${authn.domain}:${authn.username} locked out? $result")
result
}
}.getOrElse(false)
override def insertQuery(localMasterId: String,
networkId: Long,
authn: AuthenticationInfo,
queryDefinition: QueryDefinition,
isFlagged: Boolean,
hasBeenRun: Boolean,
flagMessage: Option[String]): Int = {
inTransaction {
val inserted = tables.shrineQueries.insert(new SquerylShrineQuery(
0,
localMasterId,
networkId,
authn.username,
authn.domain,
XmlDateHelper.now,
isFlagged,
flagMessage,
hasBeenRun,
queryDefinition))
inserted.id
}
}
/**
* Insert rows into QueryResults, one for each QueryResult in the passed RunQueryResponse
* Inserted rows are 'children' of the passed ShrineQuery (ie, they are the results of the query)
*/
override def insertQueryResults(parentQueryId: Int, results: Seq[QueryResult]): Map[ResultOutputType, Seq[Int]] = {
def execTime(result: QueryResult): Option[Long] = {
//TODO: How are locales handled here? Do we care?
def toMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis
for {
start <- result.startDate
end <- result.endDate
} yield toMillis(end) - toMillis(start)
}
val typeToIdTuples = inTransaction {
for {
result <- results
resultType = result.resultType.getOrElse(ResultOutputType.ERROR)
//TODO: under what circumstances can QueryResults NOT have start and end dates set?
elapsed = execTime(result)
} yield {
val lastInsertedQueryResultRow = tables.queryResults.insert(new SquerylQueryResultRow(0, result.resultId, parentQueryId, resultType, result.statusType, elapsed, XmlDateHelper.now))
(resultType, lastInsertedQueryResultRow.id)
}
}
typeToIdTuples.groupBy { case (resultType, _) => resultType }.mapValues(_.map { case (_, count) => count })
}
override def insertCountResult(resultId: Int, originalCount: Long, obfuscatedCount: Long) {
//NB: Squeryl steers us toward inserting with dummy ids :(
inTransaction {
tables.countResults.insert(new SquerylCountRow(0, resultId, originalCount, obfuscatedCount, XmlDateHelper.now))
}
}
override def insertBreakdownResults(parentResultIds: Map[ResultOutputType, Seq[Int]], originalBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope], obfuscatedBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) {
def merge(original: I2b2ResultEnvelope, obfuscated: I2b2ResultEnvelope): Map[String, ObfuscatedPair] = {
Map.empty ++ (for {
(key, originalValue) <- original.data
obfuscatedValue <- obfuscated.data.get(key)
} yield (key, ObfuscatedPair(originalValue, obfuscatedValue)))
}
inTransaction {
for {
(resultType, Seq(resultId)) <- parentResultIds
if resultType.isBreakdown
originalBreakdown <- originalBreakdowns.get(resultType)
obfuscatedBreakdown <- obfuscatedBreakdowns.get(resultType)
(key, ObfuscatedPair(original, obfuscated)) <- merge(originalBreakdown, obfuscatedBreakdown)
} {
tables.breakdownResults.insert(SquerylBreakdownResultRow(0, resultId, key, original, obfuscated))
}
}
}
override def insertErrorResult(parentResultId: Int, errorMessage: String, codec:String, stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq) {
//NB: Squeryl steers us toward inserting with dummy ids :(
inTransaction {
tables.errorResults.insert(SquerylShrineError(0, parentResultId, errorMessage, codec, stampText, summary, digestDescription, detailsXml.toString()))
}
}
override def findQueryByNetworkId(networkQueryId: Long): Option[ShrineQuery] = {
inTransaction {
Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery)
}
}
override def findQueriesByUserAndDomain(domain: String, username: String, howMany: Int): Seq[ShrineQuery] = {
inTransaction {
Queries.queriesForUser(username, domain).take(howMany).toSeq.map(_.toShrineQuery)
}
}
override def findQueriesByDomain(domain: String): Seq[ShrineQuery] = {
inTransaction {
Queries.queriesForDomain(domain).toList.map(_.toShrineQuery)
}
}
override def findResultsFor(networkQueryId: Long): Option[ShrineQueryResult] = {
inTransaction {
val breakdownRowsByType = Queries.breakdownResults(networkQueryId).toSeq.groupBy { case (outputType, _) => outputType.toQueryResultRow.resultType }.mapValues(_.map { case (_, row) => row.toBreakdownResultRow })
val queryRowOption = Queries.queriesByNetworkId(networkQueryId).headOption.map(_.toShrineQuery)
val countRowOption = Queries.countResults(networkQueryId).headOption.map(_.toCountRow)
val queryResultRows = Queries.resultsForQuery(networkQueryId).toSeq.map(_.toQueryResultRow)
val errorResultRows = Queries.errorResults(networkQueryId).toSeq.map(_.toShrineError)
for {
queryRow <- queryRowOption
countRow <- countRowOption
shrineQueryResult <- ShrineQueryResult.fromRows(queryRow, queryResultRows, countRow, breakdownRowsByType, errorResultRows)
} yield {
shrineQueryResult
}
}
}
/**
* @author clint
* @since Nov 19, 2012
*/
object Queries {
def privilegedUsers(domain: String, username: String): Query[SquerylPrivilegedUser] = {
from(tables.privilegedUsers) { user =>
where(user.username === username and user.domain === domain).select(user)
}
}
def repeatedResults(domain: String, username: String, overrideDate: XMLGregorianCalendar): Query[Long] = {
val counts: Query[GroupWithMeasures[Long, Long]] = join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) =>
where(queryRow.username === username and queryRow.domain === domain and (countRow.originalValue <> 0L) and queryRow.dateCreated > DateHelpers.toTimestamp(overrideDate)).
groupBy(countRow.originalValue).
compute(count(countRow.originalValue)).
on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId)
}
//Filter for result counts > 0
from(counts) { cnt =>
where(cnt.measures gt 0).select(cnt.measures)
}
}
val queriesForAllUsers: Query[SquerylShrineQuery] = {
from(tables.shrineQueries) { queryRow =>
select(queryRow).orderBy(queryRow.dateCreated.desc)
}
}
//TODO: Find a way to parameterize on limit, to avoid building the query every time
//TODO: limit
def queriesForUser(username: String, domain: String): Query[SquerylShrineQuery] = {
from(tables.shrineQueries) { queryRow =>
where(queryRow.domain === domain and queryRow.username === username).
select(queryRow).
orderBy(queryRow.dateCreated.desc)
}
}
def queriesForDomain(domain: String): Query[SquerylShrineQuery] = {
from(tables.shrineQueries) { queryRow =>
where(queryRow.domain === domain).
select(queryRow).
orderBy(queryRow.dateCreated.desc)
}
}
val allCountResults: Query[SquerylCountRow] = {
from(tables.countResults) { queryRow =>
select(queryRow)
}
}
def queriesByNetworkId(networkQueryId: Long): Query[SquerylShrineQuery] = {
from(tables.shrineQueries) { queryRow =>
where(queryRow.networkId === networkQueryId).select(queryRow)
}
}
//TODO: Find out how to compose queries, to re-use queriesByNetworkId
def queryNamesByNetworkId(networkQueryId: Long): Query[String] = {
from(tables.shrineQueries) { queryRow =>
where(queryRow.networkId === networkQueryId).select(queryRow.name)
}
}
def resultsForQuery(networkQueryId: Long): Query[SquerylQueryResultRow] = {
val resultsForNetworkQueryId = join(tables.shrineQueries, tables.queryResults) { (queryRow, resultRow) =>
where(queryRow.networkId === networkQueryId).
select(resultRow).
on(queryRow.id === resultRow.queryId)
}
from(resultsForNetworkQueryId)(select(_))
}
def countResults(networkQueryId: Long): Query[SquerylCountRow] = {
join(tables.shrineQueries, tables.queryResults, tables.countResults) { (queryRow, resultRow, countRow) =>
where(queryRow.networkId === networkQueryId).
select(countRow).
on(queryRow.id === resultRow.queryId, resultRow.id === countRow.resultId)
}
}
def errorResults(networkQueryId: Long): Query[SquerylShrineError] = {
join(tables.shrineQueries, tables.queryResults, tables.errorResults) { (queryRow, resultRow, errorRow) =>
where(queryRow.networkId === networkQueryId).
select(errorRow).
on(queryRow.id === resultRow.queryId, resultRow.id === errorRow.resultId)
}
}
//NB: using groupBy here is too much of a pain; do it 'manually' later
def breakdownResults(networkQueryId: Long): Query[(SquerylQueryResultRow, SquerylBreakdownResultRow)] = {
join(tables.shrineQueries, tables.queryResults, tables.breakdownResults) { (queryRow, resultRow, breakdownRow) =>
where(queryRow.networkId === networkQueryId).
select((resultRow, breakdownRow)).
on(queryRow.id === resultRow.queryId, resultRow.id === breakdownRow.resultId)
}
}
}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala
index 4156d09c9..e785eb11d 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/service/AdapterService.scala
@@ -1,103 +1,103 @@
package net.shrine.adapter.service
import net.shrine.log.Loggable
import net.shrine.protocol.{BaseShrineResponse, BroadcastMessage, ErrorResponse, NodeId, RequestType, Result, Signature}
import net.shrine.adapter.AdapterMap
import net.shrine.crypto.Verifier
import net.shrine.problem.{AbstractProblem, ProblemSources}
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
/**
* Heart of the adapter.
*
* @author clint
* @since Nov 14, 2013
*/
final class AdapterService(
nodeId: NodeId,
signatureVerifier: Verifier,
maxSignatureAge: Duration,
adapterMap: AdapterMap) extends AdapterRequestHandler with Loggable {
import AdapterService._
logStartup(adapterMap)
override def handleRequest(message: BroadcastMessage): Result = {
handleInvalidSignature(message).orElse {
for {
adapter <- adapterMap.adapterFor(message.request.requestType)
} yield time(nodeId) {
adapter.perform(message)
}
}.getOrElse {
Result(nodeId, 0.milliseconds, ErrorResponse(UnknownRequestType(message.request.requestType)))
}
}
/**
* @return None if the signature is fine, Some(result with an ErrorResponse) if not
*/
private def handleInvalidSignature(message: BroadcastMessage): Option[Result] = {
val (sigIsValid, elapsed) = time(signatureVerifier.verifySig(message, maxSignatureAge))
if(sigIsValid) { None }
else {
info(s"Incoming message had invalid signature: $message")
Some(Result(nodeId, elapsed.milliseconds, ErrorResponse(CouldNotVerifySignature(message))))
}
}
}
object AdapterService extends Loggable {
private def logStartup(adapterMap: AdapterMap) {
info("Adapter service initialized, will respond to the following queries: ")
val sortedByReqType = adapterMap.requestsToAdapters.toSeq.sortBy { case (k, _) => k }
sortedByReqType.foreach {
case (requestType, adapter) =>
info(s" $requestType:\t(${adapter.getClass.getSimpleName})")
}
}
private[service] def time[T](f: => T): (T, Long) = {
val start = System.currentTimeMillis
val result = f
val elapsed = System.currentTimeMillis - start
(result, elapsed)
}
private[service] def time(nodeId: NodeId)(f: => BaseShrineResponse): Result = {
val (response, elapsed) = time(f)
Result(nodeId, elapsed.milliseconds, response)
}
}
case class CouldNotVerifySignature(message: BroadcastMessage) extends AbstractProblem(ProblemSources.Adapter){
val signature: Option[Signature] = message.signature
- override val summary: String = signature.fold("A message was not signed")(sig => s"The trust relationship with ${sig.signedBy} is not properly configured.")
- override val description: String = signature.fold(s"The Adapter at ${stamp.host.getHostName} could not properly validate a request because it had no signature.")(sig => s"The Adapter at ${stamp.host.getHostName} could not properly validate the request from ${sig.signedBy}. An incoming message from the hub had an invalid signature.")
- override val detailsXml = signature.fold(
+ override lazy val summary: String = signature.fold("A message was not signed")(sig => s"The trust relationship with ${sig.signedBy} is not properly configured.")
+ override lazy val description: String = signature.fold(s"The Adapter at ${stamp.host.getHostName} could not properly validate a request because it had no signature.")(sig => s"The Adapter at ${stamp.host.getHostName} could not properly validate the request from ${sig.signedBy}. An incoming message from the hub had an invalid signature.")
+ override lazy val detailsXml = signature.fold(
)(
sig =>
Signature is {sig}
)
}
case class UnknownRequestType(requestType: RequestType) extends AbstractProblem(ProblemSources.Adapter){
- override val summary: String = s"Unknown request type $requestType"
- override val description: String = s"The Adapter at ${stamp.host.getHostName} received a request of type $requestType that it cannot process."
+ override lazy val summary: String = s"Unknown request type $requestType"
+ override lazy val description: String = s"The Adapter at ${stamp.host.getHostName} received a request of type $requestType that it cannot process."
}
\ No newline at end of file
diff --git a/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala b/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala
index d981a9125..3b8952fa9 100644
--- a/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala
+++ b/commons/auth/src/main/scala/net/shrine/authentication/NotAuthenticatedException.scala
@@ -1,36 +1,36 @@
package net.shrine.authentication
import net.shrine.authentication.AuthenticationResult.NotAuthenticated
import net.shrine.problem.{AbstractProblem, ProblemSources}
import scala.xml.NodeSeq
/**
* @author clint
* @since Dec 13, 2013
*/
final case class NotAuthenticatedException(domain: String, username: String,message: String, cause: Throwable) extends RuntimeException(message, cause) {
def problem = NotAuthenticatedProblem(this)
}
object NotAuthenticatedException {
def apply(na:NotAuthenticated):NotAuthenticatedException = NotAuthenticatedException(na.domain,na.username,na.message,na.cause.getOrElse(null))
}
case class NotAuthenticatedProblem(nax:NotAuthenticatedException) extends AbstractProblem(ProblemSources.Qep){
- override val summary = s"Can not authenticate ${nax.domain}:${nax.username}."
+ override lazy val summary = s"Can not authenticate ${nax.domain}:${nax.username}."
- override val throwable = Some(nax)
+ override lazy val throwable = Some(nax)
- override val description = s"Can not authenticate ${nax.domain}:${nax.username}. ${nax.getLocalizedMessage}"
+ override lazy val description = s"Can not authenticate ${nax.domain}:${nax.username}. ${nax.getLocalizedMessage}"
- override val detailsXml: NodeSeq = NodeSeq.fromSeq(
+ override lazy val detailsXml: NodeSeq = NodeSeq.fromSeq(
{throwableDetail.getOrElse("")}
)
}
\ No newline at end of file
diff --git a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala
index f1df2acca..0ca15c035 100644
--- a/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala
+++ b/commons/auth/src/main/scala/net/shrine/authorization/PmAuthorizerComponent.scala
@@ -1,112 +1,112 @@
package net.shrine.authorization
import net.shrine.log.Loggable
import scala.util.{Failure, Success, Try}
import net.shrine.client.HttpResponse
import net.shrine.i2b2.protocol.pm.GetUserConfigurationRequest
import net.shrine.i2b2.protocol.pm.User
import net.shrine.problem._
import net.shrine.protocol.AuthenticationInfo
import net.shrine.protocol.ErrorResponse
import scala.util.control.NonFatal
/**
* @author clint
* @since Apr 5, 2013
*/
trait PmAuthorizerComponent { self: PmHttpClientComponent with Loggable =>
import PmAuthorizerComponent._
//noinspection RedundantBlock
object Pm {
def parsePmResult(authn: AuthenticationInfo)(httpResponse: HttpResponse): Try[Either[ErrorResponse, User]] = {
User.fromI2b2(httpResponse.body).map(Right(_)).recoverWith {
case NonFatal(e) => {
debug(s"Couldn't extract a User from '$httpResponse'")
Try(Left(ErrorResponse.fromI2b2(httpResponse.body)))
}
}.recover {
case NonFatal(e) => {
val problem = CouldNotInterpretResponseFromPmCell(pmPoster.url,authn,httpResponse,e)
LoggingProblemHandler.handleProblem(problem)
Left(ErrorResponse(problem))
}
}
}
def authorize(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo): AuthorizationStatus = {
val request = GetUserConfigurationRequest(authn)
val responseAttempt: Try[HttpResponse] = Try {
debug(s"Authorizing with PM cell at ${pmPoster.url}")
pmPoster.post(request.toI2b2String)
}
val authStatusAttempt: Try[AuthorizationStatus with Product with Serializable] = responseAttempt.flatMap(parsePmResult(authn)).map {
case Right(user) => {
val managerUserOption = for {
roles <- user.rolesByProject.get(projectId)
if neededRoles.forall(roles.contains)
} yield user
managerUserOption.map(Authorized).getOrElse {
NotAuthorized(MissingRequiredRoles(projectId,neededRoles,authn))
}
}
case Left(errorResponse) => {
//todo remove when ErrorResponse gets its message
info(s"ErrorResponse message '${errorResponse.errorMessage}' may not have carried through to the NotAuthorized object")
NotAuthorized(errorResponse.problemDigest)
}
}
authStatusAttempt match {
case Success(s) => s
case Failure(x) => NotAuthorized(CouldNotReachPmCell(pmPoster.url,authn,x))
}
}
}
}
object PmAuthorizerComponent {
sealed trait AuthorizationStatus
case class Authorized(user: User) extends AuthorizationStatus
case class NotAuthorized(problemDigest: ProblemDigest) extends AuthorizationStatus {
def toErrorResponse = ErrorResponse(problemDigest.summary,problemDigest)
}
object NotAuthorized {
def apply(problem:Problem):NotAuthorized = NotAuthorized(problem.toDigest)
}
}
case class MissingRequiredRoles(projectId: String, neededRoles: Set[String], authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep) {
- override val summary: String = s"User ${authn.domain}:${authn.username} is missing roles in project '$projectId'"
+ override lazy val summary: String = s"User ${authn.domain}:${authn.username} is missing roles in project '$projectId'"
- override val description:String = s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'"
+ override lazy val description:String = s"User ${authn.domain}:${authn.username} does not have all the needed roles: ${neededRoles.map("'" + _ + "'").mkString(", ")} in the project '$projectId'"
}
case class CouldNotReachPmCell(pmUrl:String,authn: AuthenticationInfo,x:Throwable) extends AbstractProblem(ProblemSources.Qep) {
- override val throwable = Some(x)
- override val summary: String = s"Could not reach PM cell."
- override val description:String = s"Shrine encountered ${throwable.get} while attempting to reach the PM cell at $pmUrl for ${authn.domain}:${authn.username}."
+ override lazy val throwable = Some(x)
+ override lazy val summary: String = s"Could not reach PM cell."
+ override lazy val description:String = s"Shrine encountered ${throwable.get} while attempting to reach the PM cell at $pmUrl for ${authn.domain}:${authn.username}."
}
case class CouldNotInterpretResponseFromPmCell(pmUrl:String,authn: AuthenticationInfo,httpResponse: HttpResponse,x:Throwable) extends AbstractProblem(ProblemSources.Qep) {
- override val throwable = Some(x)
+ override lazy val throwable = Some(x)
override def summary: String = s"Could not interpret response from PM cell."
override def description: String = s"Shrine could not interpret the response from the PM cell at ${pmUrl} for ${authn.domain}:${authn.username}: due to ${throwable.get}"
- override val detailsXml =
+ override lazy val detailsXml =
Response is {httpResponse}
{throwableDetail.getOrElse("")}
}
\ No newline at end of file
diff --git a/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala b/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala
index 7b27030aa..7e5e7dec1 100644
--- a/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala
+++ b/commons/auth/src/main/scala/net/shrine/authorization/StewardQueryAuthorizationService.scala
@@ -1,236 +1,236 @@
package net.shrine.authorization
import java.net.URL
import javax.net.ssl.{KeyManager, SSLContext, X509TrustManager}
import java.security.cert.X509Certificate
import akka.io.IO
import com.typesafe.config.{Config, ConfigFactory}
import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized}
import net.shrine.authorization.steward.{InboundShrineQuery, ResearchersTopics, TopicIdAndName}
import net.shrine.log.Loggable
import net.shrine.protocol.{ApprovedTopic, AuthenticationInfo, ErrorResponse, ReadApprovedQueryTopicsRequest, ReadApprovedQueryTopicsResponse, RunQueryRequest}
import net.shrine.config.ConfigExtensions
import org.json4s.native.JsonMethods.parse
import org.json4s.{DefaultFormats, Formats}
import akka.actor.ActorSystem
import akka.util.Timeout
import akka.pattern.ask
import net.shrine.problem.{AbstractProblem, ProblemSources}
import spray.can.Http
import spray.can.Http.{HostConnectorInfo, HostConnectorSetup}
import spray.http.{BasicHttpCredentials, HttpRequest, HttpResponse}
import spray.http.StatusCodes.{OK, Unauthorized, UnavailableForLegalReasons}
import spray.httpx.TransformerPipelineSupport.WithTransformation
import spray.httpx.Json4sSupport
import spray.client.pipelining.{Get, Post, addCredentials, sendReceive}
import spray.io.{ClientSSLEngineProvider, PipelineContext, SSLContextProvider}
import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
/**
* A QueryAuthorizationService that talks to the standard data steward application to learn about topics (intents) and check that a
* shrine query can be run
*
* @author david
* @since 4/2/15
*/
final case class StewardQueryAuthorizationService(qepUserName:String,
qepPassword:String,
stewardBaseUrl:URL,
defaultTimeout:FiniteDuration = 10 seconds) extends QueryAuthorizationService with Loggable with Json4sSupport {
import system.dispatcher // execution context for futures
implicit val system = ActorSystem("AuthorizationServiceActors",ConfigFactory.load("shrine")) //todo use shrine's config
implicit val timeout:Timeout = Timeout.durationToTimeout(defaultTimeout)//10 seconds
implicit def json4sFormats: Formats = DefaultFormats
val qepCredentials = BasicHttpCredentials(qepUserName,qepPassword)
def sendHttpRequest(httpRequest: HttpRequest):Future[HttpResponse] = {
// Place a special SSLContext in scope here to be used by HttpClient.
// It trusts all server certificates.
// Most important - it will encrypt all of the traffic on the wire.
implicit def trustfulSslContext: SSLContext = {
object BlindFaithX509TrustManager extends X509TrustManager {
def checkClientTrusted(chain: Array[X509Certificate], authType: String) = (info(s"Client asked BlindFaithX509TrustManager to check $chain for $authType"))
def checkServerTrusted(chain: Array[X509Certificate], authType: String) = (info(s"Server asked BlindFaithX509TrustManager to check $chain for $authType"))
def getAcceptedIssuers = Array[X509Certificate]()
}
val context = SSLContext.getInstance("TLS")
context.init(Array[KeyManager](), Array(BlindFaithX509TrustManager), null)
context
}
implicit def trustfulSslContextProvider: SSLContextProvider = {
SSLContextProvider.forContext(trustfulSslContext)
}
class CustomClientSSLEngineProvider extends ClientSSLEngineProvider {
def apply(pc: PipelineContext) = ClientSSLEngineProvider.default(trustfulSslContextProvider).apply(pc)
}
implicit def sslEngineProvider: ClientSSLEngineProvider = new CustomClientSSLEngineProvider
val requestWithCredentials = httpRequest ~> addCredentials(qepCredentials)
val responseFuture: Future[HttpResponse] = for {
HostConnectorInfo(hostConnector, _) <- {
val hostConnectorSetup = new HostConnectorSetup(httpRequest.uri.authority.host.address,
httpRequest.uri.authority.port,
sslEncryption = httpRequest.uri.scheme=="https")(
sslEngineProvider = sslEngineProvider)
IO(Http) ask hostConnectorSetup
}
response <- sendReceive(hostConnector).apply(requestWithCredentials)
_ <- hostConnector ask Http.CloseAll
} yield response
responseFuture
}
/* todo to recycle connections with http://spray.io/documentation/1.2.3/spray-client/ if needed
def sendHttpRequest(httpRequest: HttpRequest):Future[HttpResponse] = {
import akka.io.IO
import akka.pattern.ask
import spray.can.Http
val requestWithCredentials = httpRequest ~> addCredentials(qepCredentials)
//todo failures via onFailure callbacks
for{
sendR:SendReceive <- connectorSource
response:HttpResponse <- sendR(requestWithCredentials)
} yield response
}
val connectorSource: Future[SendReceive] = //Future[HttpRequest => Future[HttpResponse]]
for (
//keep asking for a connector until you get one
//todo correct URL
// Http.HostConnectorInfo(connector, _) <- IO(Http) ? Http.HostConnectorSetup("www.spray.io", port = 8080)
Http.HostConnectorInfo(connector, _) <- IO(Http) ? Http.HostConnectorSetup("localhost", port = 6060)
) yield sendReceive(connector)
*/
def sendAndReceive(httpRequest: HttpRequest,timeout:Duration = defaultTimeout):HttpResponse = {
info("StewardQueryAuthorizationService will request "+httpRequest.uri) //todo someday log request and response
val responseFuture = sendHttpRequest(httpRequest)
val response:HttpResponse = Await.result(responseFuture,timeout)
info("StewardQueryAuthorizationService received response with status "+response.status)
response
}
//Contact a data steward and either return an Authorized or a NotAuthorized or throw an exception
override def authorizeRunQueryRequest(runQueryRequest: RunQueryRequest): AuthorizationResult = {
debug(s"authorizeRunQueryRequest started for ${runQueryRequest.queryDefinition.name}")
val interpreted = runQueryRequest.topicId.fold(
authorizeRunQueryRequestNoTopic(runQueryRequest)
)(
authorizeRunQueryRequestForTopic(runQueryRequest,_)
)
debug(s"authorizeRunQueryRequest completed with $interpreted) for ${runQueryRequest.queryDefinition.name}")
interpreted
}
def authorizeRunQueryRequestNoTopic(runQueryRequest: RunQueryRequest): AuthorizationResult = {
val userName = runQueryRequest.authn.username
val queryId = runQueryRequest.queryDefinition.name
//xml's .text returns something that looks like xquery with backwards slashes. toString() returns xml.
val queryForJson = InboundShrineQuery(runQueryRequest.networkQueryId,queryId,runQueryRequest.queryDefinition.toXml.toString())
val request = Post(s"$stewardBaseUrl/steward/qep/requestQueryAccess/user/$userName", queryForJson)
val response:HttpResponse = sendAndReceive(request,runQueryRequest.waitTime)
interpretAuthorizeRunQueryResponse(response)
}
def authorizeRunQueryRequestForTopic(runQueryRequest: RunQueryRequest,topicIdString:String): AuthorizationResult = {
val userName = runQueryRequest.authn.username
val queryId = runQueryRequest.queryDefinition.name
//xml's .text returns something that looks like xquery with backwards slashes. toString() returns xml.
val queryForJson = InboundShrineQuery(runQueryRequest.networkQueryId,queryId,runQueryRequest.queryDefinition.toXml.toString())
val request = Post(s"$stewardBaseUrl/steward/qep/requestQueryAccess/user/$userName/topic/$topicIdString", queryForJson)
val response:HttpResponse = sendAndReceive(request,runQueryRequest.waitTime)
debug(s"authorizeRunQueryRequestForTopic response is $response")
interpretAuthorizeRunQueryResponse(response)
}
/** Interpret the response from the steward app. Primarily here for testing. */
def interpretAuthorizeRunQueryResponse(response:HttpResponse):AuthorizationResult = {
response.status match {
case OK => {
val topicJson = new String(response.entity.data.toByteArray)
debug(s"topicJson is $topicJson")
val topic:Option[TopicIdAndName] = parse(topicJson).extractOpt[TopicIdAndName]
debug(s"topic is $topic")
Authorized(topic.map(x => (x.id,x.name)))
}
case UnavailableForLegalReasons => NotAuthorized(response.entity.asString)
case Unauthorized => throw new AuthorizationException(s"steward rejected qep's login credentials. $response")
case _ => throw new AuthorizationException(s"QueryAuthorizationService detected a problem: $response")
}
}
//Either read the approved topics from a data steward or have an error response.
override def readApprovedEntries(readTopicsRequest: ReadApprovedQueryTopicsRequest): Either[ErrorResponse, ReadApprovedQueryTopicsResponse] = {
val userName = readTopicsRequest.authn.username
val request = Get(s"$stewardBaseUrl/steward/qep/approvedTopics/user/$userName")
val response:HttpResponse = sendAndReceive(request,readTopicsRequest.waitTime)
if(response.status == OK) {
val topicsJson = new String(response.entity.data.toByteArray)
val topicsFromSteward: ResearchersTopics = parse(topicsJson).extract[ResearchersTopics]
val topics: Seq[ApprovedTopic] = topicsFromSteward.topics.map(topic => ApprovedTopic(topic.id, topic.name))
Right(ReadApprovedQueryTopicsResponse(topics))
}
else Left(ErrorResponse(ErrorStatusFromDataStewardApp(response,stewardBaseUrl)))
}
override def toString() = {
super.toString().replaceAll(qepPassword,"REDACTED")
}
}
object StewardQueryAuthorizationService {
def apply(config:Config):StewardQueryAuthorizationService = StewardQueryAuthorizationService (
qepUserName = config.getString("qepUserName"),
qepPassword = config.getString("qepPassword"),
stewardBaseUrl = config.get("stewardBaseUrl", new URL(_))
)
}
case class ErrorStatusFromDataStewardApp(response:HttpResponse,stewardBaseUrl:URL) extends AbstractProblem(ProblemSources.Qep) {
- override val summary: String = s"Data Steward App responded with status ${response.status}"
- override val description:String = s"The Data Steward App at ${stewardBaseUrl} responded with status ${response.status}, not OK."
- override val detailsXml =
+ override lazy val summary: String = s"Data Steward App responded with status ${response.status}"
+ override lazy val description:String = s"The Data Steward App at ${stewardBaseUrl} responded with status ${response.status}, not OK."
+ override lazy val detailsXml =
Response is {response}
{throwableDetail.getOrElse("")}
}
\ No newline at end of file
diff --git a/commons/data-commons/src/main/scala/net/shrine/problem/Problem.scala b/commons/data-commons/src/main/scala/net/shrine/problem/Problem.scala
index d3b5f6fb3..264fd048f 100644
--- a/commons/data-commons/src/main/scala/net/shrine/problem/Problem.scala
+++ b/commons/data-commons/src/main/scala/net/shrine/problem/Problem.scala
@@ -1,191 +1,202 @@
package net.shrine.problem
import java.net.InetAddress
import java.text.SimpleDateFormat
import java.util.Date
import net.shrine.log.Loggable
import net.shrine.serialization.{XmlMarshaller, XmlUnmarshaller}
import scala.concurrent.Future
import scala.xml.{Elem, Node, NodeSeq}
/**
* Describes what information we have about a problem at the site in code where we discover it.
*
* @author david
* @since 8/6/15
*/
trait Problem extends DelayedInit {
def summary:String
def problemName = getClass.getName
def throwable:Option[Throwable] = None
def stamp:Stamp
def description:String
- def exceptionXml(exception:Option[Throwable]): Option[Elem] = exception.map{x =>
+ def exceptionXml(exception:Option[Throwable]): Option[Elem] = {
+ println("Hello!")
+ println(exception)
+ exception.map{x =>
{x.getClass.getName}
{x.getMessage}
{x.getStackTrace.map(line => {line})}{exceptionXml(Option(x.getCause)).getOrElse("")}
- }
+ }}
- def throwableDetail = exceptionXml(throwable)
+ def throwableDetail: Option[Elem] = exceptionXml(throwable)
def detailsXml: NodeSeq = NodeSeq.fromSeq({throwableDetail.getOrElse("")} )
def toDigest:ProblemDigest = ProblemDigest(problemName,stamp.pretty,summary,description,detailsXml, stamp.time)
override def delayedInit(code: => Unit): Unit = {
code
if (!ProblemConfigSource.turnOffConnector) {
+ println(s"Yello! ${this.throwable}")
+ println(s"Red! ${this.summary}")
val problem = Problems
- problem.DatabaseConnector.insertProblem(toDigest)
+ problem.DatabaseConnector.insertProblem(this.toDigest)
}
}
}
case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq, epoch: Long) extends XmlMarshaller {
override def toXml: Node = {
{codec}
{stampText}
{summary}
{description}
{epoch}
{detailsXml}
}
/**
* Ignores detailXml. equals with scala.xml is impossible. See http://www.scala-lang.org/api/2.10.3/index.html#scala.xml.Equality$
*/
override def equals(other: Any): Boolean =
other match {
case that: ProblemDigest =>
(that canEqual this) &&
codec == that.codec &&
stampText == that.stampText &&
summary == that.summary &&
description == that.description &&
epoch == that.epoch
case _ => false
}
/**
* Ignores detailXml
*/
override def hashCode: Int = {
val prime = 67
codec.hashCode + prime * (stampText.hashCode + prime *(summary.hashCode + prime * (description.hashCode + prime * epoch.hashCode())))
}
}
object ProblemDigest extends XmlUnmarshaller[ProblemDigest] with Loggable {
override def fromXml(xml: NodeSeq): ProblemDigest = {
val problemNode = xml \ "problem"
require(problemNode.nonEmpty,s"No problem tag in $xml")
def extractText(tagName:String) = (problemNode \ tagName).text
val codec = extractText("codec")
val stampText = extractText("stamp")
val summary = extractText("summary")
val description = extractText("description")
val detailsXml: NodeSeq = problemNode \ "details"
val epoch =
try { extractText("epoch").toLong }
catch { case nx:NumberFormatException =>
error(s"While parsing xml representing a ProblemDigest, the epoch could not be parsed into a long", nx)
0
}
ProblemDigest(codec,stampText,summary,description,detailsXml,epoch)
}
}
case class Stamp(host:InetAddress,time:Long,source:ProblemSources.ProblemSource) {
def pretty = s"${new Date(time)} on ${host.getHostName} ${source.pretty}"
}
object Stamp {
//TODO: val dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")?
//TODO: Currently the stamp text is locale specific, which can change depending on the jre/computer running it...
def apply(source:ProblemSources.ProblemSource, timer: => Long): Stamp = Stamp(InetAddress.getLocalHost, timer,source)
}
+/**
+ * An abstract problem to enable easy creation of Problems. Note that when overriding fields,
+ * you should only use def or lazy val, and not val.
+ * See: http://stackoverflow.com/questions/15346600/field-inside-object-which-extends-app-trait-is-set-to-null-why-is-that-so
+ * @param source
+ */
abstract class AbstractProblem(source:ProblemSources.ProblemSource) extends Problem {
def timer = System.currentTimeMillis
- val stamp = Stamp(source, timer)
+ lazy val stamp = Stamp(source, timer)
}
trait ProblemHandler {
def handleProblem(problem:Problem)
}
/**
* An example problem handler
*/
object LoggingProblemHandler extends ProblemHandler with Loggable {
override def handleProblem(problem: Problem): Unit = {
problem.throwable.fold(error(problem.toString))(throwable =>
error(problem.toString,throwable)
)
}
}
object ProblemSources{
sealed trait ProblemSource {
def pretty = getClass.getSimpleName.dropRight(1)
}
case object Adapter extends ProblemSource
case object Hub extends ProblemSource
case object Qep extends ProblemSource
case object Dsa extends ProblemSource
case object Unknown extends ProblemSource
def problemSources = Set(Adapter,Hub,Qep,Dsa,Unknown)
}
case class ProblemNotYetEncoded(internalSummary:String,t:Option[Throwable] = None) extends AbstractProblem(ProblemSources.Unknown){
override val summary = "An unanticipated problem encountered."
override val throwable = {
val rx = t.fold(new IllegalStateException(s"$summary"))(
new IllegalStateException(s"$summary",_)
)
rx.fillInStackTrace()
Some(rx)
}
val reportedAtStackTrace = new IllegalStateException("Capture reporting stack trace.")
override val description = "This problem is not yet classified in Shrine source code. Please report the details to the Shrine dev team."
override val detailsXml: NodeSeq = NodeSeq.fromSeq(
{internalSummary}
{throwableDetail.getOrElse("")}
)
}
object ProblemNotYetEncoded {
def apply(summary:String,x:Throwable):ProblemNotYetEncoded = ProblemNotYetEncoded(summary,Some(x))
}
diff --git a/commons/data-commons/src/main/scala/net/shrine/problem/TestProblem.scala b/commons/data-commons/src/main/scala/net/shrine/problem/TestProblem.scala
index 55fdc2c5d..f961c80dd 100644
--- a/commons/data-commons/src/main/scala/net/shrine/problem/TestProblem.scala
+++ b/commons/data-commons/src/main/scala/net/shrine/problem/TestProblem.scala
@@ -1,15 +1,15 @@
package net.shrine.problem
/**
* @author david
* @since 1.22
*/
case class TestProblem(override val summary: String = "test summary",
override val description:String = "test description",
override val throwable: Option[Throwable] = None) extends AbstractProblem(ProblemSources.Unknown) {
override def timer = 0
// No point in logging test problems
- override def delayedInit(code: => Unit) = {
- code
- }
+ //override def delayedInit(code: => Unit) = {
+ // code
+ //}
}
diff --git a/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala b/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala
index 06c9c66b2..82c786782 100644
--- a/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala
+++ b/commons/data-commons/src/main/scala/net/shrine/slick/TestableDataSourceCreator.scala
@@ -1,79 +1,78 @@
package net.shrine.slick
import java.io.PrintWriter
import java.sql.{Connection, DriverManager}
-import java.util.function.BiConsumer
import java.util.logging.Logger
import javax.naming.{Context, InitialContext}
import javax.sql.DataSource
import com.typesafe.config.Config
import net.shrine.config.ConfigExtensions
/**
* @author david
* @since 1/26/16
*/
object TestableDataSourceCreator {
def dataSource(config:Config):DataSource = {
val dataSourceFrom = config.getString("dataSourceFrom")
if(dataSourceFrom == "JNDI") {
val jndiDataSourceName = config.getString("jndiDataSourceName")
val initialContext:InitialContext = new InitialContext()
// check to see what part blows up
val secondaryContext = initialContext.lookup("java:comp/env/").asInstanceOf[Context]
val printKeyValues = (a: java.util.Enumeration[_]) => while (a.hasMoreElements) { println(a.nextElement()) }
println("Seconday keys:")
printKeyValues(secondaryContext.getEnvironment.keys())
println("Secondary values")
printKeyValues(secondaryContext.getEnvironment.elements())
println("Primary keys:")
printKeyValues(initialContext.getEnvironment.keys())
println("Primary values:")
printKeyValues(initialContext.getEnvironment.elements())
println(initialContext.getEnvironment)
initialContext.lookup(jndiDataSourceName).asInstanceOf[DataSource]
}
else if (dataSourceFrom == "testDataSource") {
val testDataSourceConfig = config.getConfig("testDataSource")
val driverClassName = testDataSourceConfig.getString("driverClassName")
val url = testDataSourceConfig.getString("url")
case class Credentials(username: String,password:String)
def configToCredentials(config:Config) = new Credentials(config.getString("username"),config.getString("password"))
val credentials: Option[Credentials] = testDataSourceConfig.getOptionConfigured("credentials",configToCredentials)
//Creating an instance of the driver register it. (!) From a previous epoch, but it works.
Class.forName(driverClassName).newInstance()
object TestDataSource extends DataSource {
override def getConnection: Connection = {
credentials.fold(DriverManager.getConnection(url))(credentials =>
DriverManager.getConnection(url,credentials.username,credentials.password))
}
override def getConnection(username: String, password: String): Connection = {
DriverManager.getConnection(url, username, password)
}
//unused methods
override def unwrap[T](iface: Class[T]): T = ???
override def isWrapperFor(iface: Class[_]): Boolean = ???
override def setLogWriter(out: PrintWriter): Unit = ???
override def getLoginTimeout: Int = ???
override def setLoginTimeout(seconds: Int): Unit = ???
override def getParentLogger: Logger = ???
override def getLogWriter: PrintWriter = ???
}
TestDataSource
}
else throw new IllegalArgumentException(s"dataSourceFrom config value must be either JNDI or testDataSource, not $dataSourceFrom")
}
}
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
index 59d25fa81..be7f831fa 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
@@ -1,402 +1,402 @@
package net.shrine.protocol
import javax.xml.datatype.XMLGregorianCalendar
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, Problem, ProblemDigest, ProblemSources}
import net.shrine.protocol.QueryResult.StatusType
import scala.xml.NodeSeq
import net.shrine.util.{NodeSeqEnrichments, OptionEnrichments, SEnum, Tries, XmlDateHelper, XmlUtil}
import net.shrine.serialization.{I2b2Marshaller, XmlMarshaller}
import scala.util.Try
/**
* @author Bill Simons
* @since 4/15/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*
* NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing
*/
final case class QueryResult (
resultId: Long,
instanceId: Long,
resultType: Option[ResultOutputType],
setSize: Long,
startDate: Option[XMLGregorianCalendar],
endDate: Option[XMLGregorianCalendar],
description: Option[String],
statusType: StatusType,
statusMessage: Option[String],
problemDigest: Option[ProblemDigest] = None,
breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty
) extends XmlMarshaller with I2b2Marshaller with Loggable {
//only used in tests
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
None, //description
statusType,
None) //statusMessage
}
def this(
resultId: Long,
instanceId: Long,
resultType: ResultOutputType,
setSize: Long,
startDate: XMLGregorianCalendar,
endDate: XMLGregorianCalendar,
description: String,
statusType: QueryResult.StatusType) = {
this(
resultId,
instanceId,
Option(resultType),
setSize,
Option(startDate),
Option(endDate),
Option(description),
statusType,
None) //statusMessage
}
def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match {
case Some(rt) => rt == testedResultType
case _ => false
}
import QueryResult._
//NB: Fragile, non-type-safe ==
def isError = statusType == StatusType.Error
def elapsed: Option[Long] = {
def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis
for {
start <- startDate
end <- endDate
} yield inMillis(end) - inMillis(start)
}
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = {
breakdowns.values.toSeq.sortBy(_.resultType.name)
}
override def toI2b2: NodeSeq = {
import OptionEnrichments._
XmlUtil.stripWhitespace {
{ resultId }
{ instanceId }
{ description.toXml() }
{
resultType.fold( ResultOutputType.ERROR.toI2b2NameOnly("") ){ rt =>
if(rt.isBreakdown) rt.toI2b2NameOnly()
else if (rt.isError) rt.toI2b2NameOnly() //The result type can be an error
else if (statusType.isError) rt.toI2b2NameOnly() //Or the status type can be an error
else rt.toI2b2
}
}
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ statusType }
{ statusType.toI2b2(this) }
{
//NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case,
//so if we're going to do that, let's produce saner XML.
sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data"))
}
}
}
override def toXml: NodeSeq = XmlUtil.stripWhitespace {
import OptionEnrichments._
{ resultId }
{ instanceId }
{ resultType.toXml(_.toXml) }
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ description.toXml() }
{ statusType }
{ statusMessage.toXml() }
{
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
sortedBreakdowns.map(_.toXml)
}
{ problemDigest.map(_.toXml).getOrElse("") }
}
def withId(id: Long): QueryResult = copy(resultId = id)
def withInstanceId(id: Long): QueryResult = copy(instanceId = id)
def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize))
def withSetSize(size: Long): QueryResult = copy(setSize = size)
def withDescription(desc: String): QueryResult = copy(description = Option(desc))
def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType))
def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData))
def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns)
}
object QueryResult {
final case class StatusType(
name: String,
isDone: Boolean,
i2b2Id: Option[Int] = Some(-1),
private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2) extends StatusType.Value {
def isError = this == StatusType.Error
def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult)
}
object StatusType extends SEnum[StatusType] {
private val defaultToI2b2: QueryResult => NodeSeq = { queryResult =>
val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{
throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id")
}
{ i2b2Id }{ queryResult.statusType.name }
}
val noMessage:NodeSeq = null
val Error = StatusType("ERROR", isDone = true, None, { queryResult =>
(queryResult.statusMessage, queryResult.problemDigest) match {
case (Some(msg),Some(pd)) => { if(msg != "ERROR") msg else pd.summary } ++ pd.toXml
case (Some(msg),None) => { msg }
case (None,Some(pd)) => { pd.summary } ++ pd.toXml
case (None, None) => noMessage
}
})
val Finished = StatusType("FINISHED", isDone = true, Some(3))
//TODO: Can we use the same for Queued, Processing, and Incomplete?
val Processing = StatusType("PROCESSING", isDone = false, Some(2)) //todo only used in tests
val Queued = StatusType("QUEUED", isDone = false, Some(2))
val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2))
//TODO: What s should these have? Does anyone care?
val Held = StatusType("HELD", isDone = false)
val SmallQueue = StatusType("SMALL_QUEUE", isDone = false)
val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false)
val LargeQueue = StatusType("LARGE_QUEUE", isDone = false)
val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false)
}
def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong
private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption
def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _)
def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim
def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = {
import ResultOutputType.valueOf
val typeName = asText(elemNames: _*)(xml)
valueOf(typeName) orElse valueOf(breakdownTypes)(typeName)
}
def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = {
val attempt = parse(xml)
attempt.toOption
}
def extractProblemDigest(xml: NodeSeq):Option[ProblemDigest] = {
val subXml = xml \ "problem"
if(subXml.nonEmpty) Some(ProblemDigest.fromXml(xml))
else None
}
def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def extract(elemName: String): Option[String] = {
Option((xml \ elemName).text.trim).filter(!_.isEmpty)
}
def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate)
val asLong = extractLong(xml) _
import NodeSeqEnrichments.Strictness._
import Tries.sequence
def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = {
//noinspection ScalaUnnecessaryParentheses
val mapAttempt = for {
subXml <- xml.withChild(elemName)
envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes)))
mappings = envelopes.map(envelope => (envelope.resultType -> envelope))
} yield Map.empty ++ mappings
mapAttempt.getOrElse(Map.empty)
}
QueryResult(
resultId = asLong("resultId"),
instanceId = asLong("instanceId"),
resultType = extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml),
setSize = asLong("setSize"),
startDate = extractDate("startDate"),
endDate = extractDate("endDate"),
description = extract("description"),
statusType = StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call
statusMessage = extract("statusMessage"),
problemDigest = extractProblemDigest(xml),
breakdowns = extractBreakdowns("resultEnvelope")
)
}
def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def asLong = extractLong(xml) _
def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim)
def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate)
val statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get //TODO: Avoid fragile .get call
val statusMessage: Option[String] = asTextOption("query_status_type", "description")
val encodedProblemDigest = extractProblemDigest(xml \ "query_status_type")
val problemDigest = if (encodedProblemDigest.isDefined) encodedProblemDigest
else if (statusType.isError) Some(ErrorStatusFromCrc(statusMessage,xml.text).toDigest)
else None
case class Filling(
resultType:Option[ResultOutputType],
setSize:Long,
startDate:Option[XMLGregorianCalendar],
endDate:Option[XMLGregorianCalendar]
)
val filling = if(!statusType.isError) {
val resultType: Option[ResultOutputType] = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2)
val setSize = asLong("set_size")
val startDate = asXmlGcOption("start_date")
val endDate = asXmlGcOption("end_date")
Filling(resultType,setSize,startDate,endDate)
}
else {
val resultType = None
val setSize = 0L
val startDate = None
val endDate = None
Filling(resultType,setSize,startDate,endDate)
}
QueryResult(
resultId = asLong("result_instance_id"),
instanceId = asLong("query_instance_id"),
resultType = filling.resultType,
setSize = filling.setSize,
startDate = filling.startDate,
endDate = filling.endDate,
description = asTextOption("description"),
statusType = statusType,
statusMessage = statusMessage,
problemDigest = problemDigest
)
}
def errorResult(description: Option[String], statusMessage: String,problemDigest:ProblemDigest):QueryResult = {
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
def errorResult(description: Option[String], statusMessage: String,problem:Problem):QueryResult = {
val problemDigest = problem.toDigest
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
/**
* For reconstituting errorResults from a database
*/
//todo remove and replace with real Problems
def errorResult(description:Option[String], statusMessage:String, codec:String,stampText:String, summary:String, digestDescription:String,detailsXml:NodeSeq): QueryResult = {
// This would require parsing the stamp text to change, and without a standard locale that's nigh impossible.
// If this is replaced with real problems, then this can be addressed then. For now, passing on zero is the best bet.
// TODO: REFACTOR SQUERYL ERRORS TO USE REAL PROBLEMS
val problemDigest = ProblemDigest(codec,stampText,summary,digestDescription,detailsXml,0)
QueryResult(
resultId = 0L,
instanceId = 0L,
resultType = None,
setSize = 0L,
startDate = None,
endDate = None,
description = description,
statusType = StatusType.Error,
statusMessage = Option(statusMessage),
problemDigest = Option(problemDigest))
}
}
case class ErrorStatusFromCrc(messageFromCrC:Option[String], xmlResponseFromCrc: String) extends AbstractProblem(ProblemSources.Adapter) {
- override val summary: String = "The I2B2 CRC reported an internal error."
- override val description:String = s"The I2B2 CRC responded with status type ERROR ${messageFromCrC.fold(" but no message")(message => s"and a message of '$message'")}"
- override val detailsXml =
+ override lazy val summary: String = "The I2B2 CRC reported an internal error."
+ override lazy val description:String = s"The I2B2 CRC responded with status type ERROR ${messageFromCrC.fold(" but no message")(message => s"and a message of '$message'")}"
+ override lazy val detailsXml =
CRC's Response is {xmlResponseFromCrc}
}
diff --git a/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala b/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala
index 0b444d164..f50e3f910 100644
--- a/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala
+++ b/hms-support/hms-core/src/main/scala/net/shrine/hms/authorization/HmsDataStewardAuthorizationService.scala
@@ -1,89 +1,89 @@
package net.shrine.hms.authorization
import java.net.URL
import com.typesafe.config.Config
import net.shrine.authentication.{AuthenticationResult, Authenticator}
import net.shrine.authorization.{AuthorizationResult, QueryAuthorizationService}
import net.shrine.client.EndpointConfig
import net.shrine.log.Loggable
import net.shrine.protocol.{AuthenticationInfo, CredentialConfig, ErrorResponse, ReadApprovedQueryTopicsRequest, ReadApprovedQueryTopicsResponse, RunQueryRequest}
import net.shrine.config.ConfigExtensions
import net.shrine.problem.{AbstractProblem, ProblemSources}
/**
* @author Bill Simons
* @since 1/30/12
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
final case class HmsDataStewardAuthorizationService(
sheriffClient: SheriffClient,
authenticator: Authenticator
) extends QueryAuthorizationService with Loggable {
import net.shrine.hms.authorization.HmsDataStewardAuthorizationService._
override def readApprovedEntries(request: ReadApprovedQueryTopicsRequest): Either[ErrorResponse, ReadApprovedQueryTopicsResponse] = {
val authn = request.authn
authenticate(authn) match {
case None => Left(ErrorResponse(HMSNotAuthenticatedProblem(authn)))
case Some(ecommonsUsername) =>
val topics = sheriffClient.getApprovedEntries(ecommonsUsername)
Right(ReadApprovedQueryTopicsResponse(topics))
}
}
override def authorizeRunQueryRequest(request: RunQueryRequest): AuthorizationResult = {
val authn = request.authn
if (request.topicId.isEmpty) {
AuthorizationResult.NotAuthorized(s"HMS queries require a topic id; couldn't authenticate user ${toDomainAndUser(authn)}")
} else {
authenticate(authn) match {
case None => AuthorizationResult.NotAuthorized(s"Requested topic is not approved; couldn't authenticate user ${toDomainAndUser(authn)}")
case Some(ecommonsUsername) =>
sheriffClient.isAuthorized(ecommonsUsername, request.topicId.get, request.queryDefinition.toI2b2String)
}
}
}
private def authenticate(authn: AuthenticationInfo): Option[String] = {
val authenticationResult = authenticator.authenticate(authn)
identifyEcommonsUsername(authenticationResult)
}
}
object HmsDataStewardAuthorizationService {
def apply(config:Config,authenticator: Authenticator):HmsDataStewardAuthorizationService = {
val endpointUrl = config.getString("sheriffEndpoint"+EndpointConfig.Keys.url)
val credentials = config.getConfigured("sheriffCredentials", CredentialConfig(_))
val sheriffClient = JerseySheriffClient(endpointUrl, credentials.username, credentials.password)
HmsDataStewardAuthorizationService(sheriffClient, authenticator)
}
private def toDomainAndUser(authn: AuthenticationInfo): String = s"${authn.domain}:${authn.username}"
def identifyEcommonsUsername(authenticationResult: AuthenticationResult): Option[String] = authenticationResult match {
case AuthenticationResult.Authenticated(_, ecommonsUsername) => Option(ecommonsUsername)
case _ => None
}
}
case class HMSNotAuthenticatedProblem(authn: AuthenticationInfo) extends AbstractProblem(ProblemSources.Qep){
- override val summary = s"Can not authenticate ${authn.domain}:${authn.username}."
+ override lazy val summary = s"Can not authenticate ${authn.domain}:${authn.username}."
- override val description = s"Can not authenticate ${authn.domain}:${authn.username}."
+ override lazy val description = s"Can not authenticate ${authn.domain}:${authn.username}."
}
\ No newline at end of file
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
index bfa2100e1..e8d129e82 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
@@ -1,129 +1,129 @@
package net.shrine.aggregation
import java.net.{ConnectException, UnknownHostException}
import com.sun.jersey.api.client.ClientHandlerException
import net.shrine.broadcaster.CouldNotParseResultsException
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemNotYetEncoded, ProblemSources}
import scala.concurrent.duration.Duration
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.Failure
import net.shrine.protocol.NodeId
import net.shrine.protocol.Result
import net.shrine.protocol.SingleNodeResult
import net.shrine.protocol.Timeout
import net.shrine.protocol.BaseShrineResponse
/**
*
* @author Clint Gilbert
* @since Sep 16, 2011
*
* @see http://cbmi.med.harvard.edu
*
* This software is licensed under the LGPL
* @see http://www.gnu.org/licenses/lgpl.html
*
* Represents the basic aggregation strategy shared by several aggregators:
* - Parses a sequence of SpinResultEntries into a sequence of some
* combination of valid responses, ErrorResponses, and invalid
* responses (cases where ShrineResponse.fromXml returns None)
* - Filters the valid responses, weeding out responses that aren't of
* the expected type
* Invokes an abstract method with the valid responses, errors, and
* invalid responses.
*
* Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest)
*/
abstract class BasicAggregator[T <: BaseShrineResponse: Manifest] extends Aggregator with Loggable {
private[aggregation] def isAggregatable(response: BaseShrineResponse): Boolean = {
manifest[T].runtimeClass.isAssignableFrom(response.getClass)
}
import BasicAggregator._
override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): BaseShrineResponse = {
val resultsOrErrors: Iterable[ParsedResult[T]] = {
for {
result <- results
} yield {
val parsedResponse: ParsedResult[T] = result match {
case Result(origin, _, errorResponse: ErrorResponse) => Error(Option(origin), errorResponse)
case Result(origin, elapsed, response: T) if isAggregatable(response) => Valid(origin, elapsed, response)
case Timeout(origin) => Error(Option(origin), ErrorResponse(TimedOutWithAdapter(origin)))
case Failure(origin, cause) => cause match {
case cx: ConnectException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, cx)))
case uhx: UnknownHostException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, uhx)))
case chx: ClientHandlerException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, chx)))
case cnprx:CouldNotParseResultsException =>
if(cnprx.statusCode >= 400) Error(Option(origin), ErrorResponse(HttpErrorResponseProblem(cnprx)))
else Error(Option(origin), ErrorResponse(CouldNotParseResultsProblem(cnprx)))
case x => Error(Option(origin), ErrorResponse(ProblemNotYetEncoded(s"Failure querying node ${origin.name}",x)))
}
case _ => Invalid(None, s"Unexpected response in $getClass:\r\n $result")
}
parsedResponse
}
}
val invalidResponses = resultsOrErrors.collect { case invalid: Invalid => invalid }
val validResponses = resultsOrErrors.collect { case valid: Valid[T] => valid }
val errorResponses: Iterable[Error] = resultsOrErrors.collect { case error: Error => error }
//Log all parsing errors
invalidResponses.map(_.errorMessage).foreach(this.error(_))
val previouslyDetectedErrors = errors.map(Error(None, _))
makeResponseFrom(validResponses, errorResponses ++ previouslyDetectedErrors, invalidResponses)
}
private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse
}
object BasicAggregator {
private[aggregation] sealed abstract class ParsedResult[+T]
private[aggregation] final case class Valid[T](origin: NodeId, elapsed: Duration, response: T) extends ParsedResult[T]
private[aggregation] final case class Error(origin: Option[NodeId], response: ErrorResponse) extends ParsedResult[Nothing]
private[aggregation] final case class Invalid(origin: Option[NodeId], errorMessage: String) extends ParsedResult[Nothing]
}
case class CouldNotConnectToAdapter(origin:NodeId,cx: Exception) extends AbstractProblem(ProblemSources.Hub) {
- override val throwable = Some(cx)
- override val summary: String = "Shrine could not connect to the adapter."
- override val description: String = s"Shrine could not connect to the adapter at ${origin.name} due to ${throwable.get}."
+ override lazy val throwable = Some(cx)
+ override lazy val summary: String = "Shrine could not connect to the adapter."
+ override lazy val description: String = s"Shrine could not connect to the adapter at ${origin.name} due to ${throwable.get}."
}
case class TimedOutWithAdapter(origin:NodeId) extends AbstractProblem(ProblemSources.Hub) {
- override val throwable = None
- override val summary: String = "Timed out with adapter."
- override val description: String = s"Shrine observed a timeout with the adapter at ${origin.name}."
+ override lazy val throwable = None
+ override lazy val summary: String = "Timed out with adapter."
+ override lazy val description: String = s"Shrine observed a timeout with the adapter at ${origin.name}."
}
case class CouldNotParseResultsProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) {
- override val throwable = Some(cnrpx)
- override val summary: String = "Could not parse response."
- override val description = s"While parsing a response from ${cnrpx.url} with http code ${cnrpx.statusCode} caught '${cnrpx.cause}'"
- override val detailsXml =
+ override lazy val throwable = Some(cnrpx)
+ override lazy val summary: String = "Could not parse response."
+ override lazy val description = s"While parsing a response from ${cnrpx.url} with http code ${cnrpx.statusCode} caught '${cnrpx.cause}'"
+ override lazy val detailsXml =
Message body is {cnrpx.body}
{throwableDetail.getOrElse("")}
}
case class HttpErrorResponseProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) {
- override val throwable = Some(cnrpx)
- override val summary: String = "Adapter error."
- override val description = s"Observed http status code ${cnrpx.statusCode} from ${cnrpx.url} and caught ${cnrpx.cause}."
- override val detailsXml =
+ override lazy val throwable = Some(cnrpx)
+ override lazy val summary: String = "Adapter error."
+ override lazy val description = s"Observed http status code ${cnrpx.statusCode} from ${cnrpx.url} and caught ${cnrpx.cause}."
+ override lazy val detailsXml =
Message body is {cnrpx.body}
{throwableDetail.getOrElse("")}
}
\ No newline at end of file
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala
index 97789b287..dae2ca6f7 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/IgnoresErrorsAggregator.scala
@@ -1,43 +1,43 @@
package net.shrine.aggregation
import net.shrine.aggregation.BasicAggregator.{Error, Invalid, Valid}
import net.shrine.problem.{AbstractProblem, ProblemSources}
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.BaseShrineResponse
/**
*
* @author Clint Gilbert
* @since Sep 16, 2011
*
* @see http://cbmi.med.harvard.edu
*
* This software is licensed under the LGPL
* @see http://www.gnu.org/licenses/lgpl.html
*
* Extends BasicAggregator to ignore Errors and Invalid responses
*
* Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest)
*/
abstract class IgnoresErrorsAggregator[T <: BaseShrineResponse : Manifest] extends BasicAggregator[T] {
private[aggregation] override def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse = {
//Filter out errors and invalid responses
makeResponseFrom(validResponses)
}
//Default implementation, just returns first valid response, or if there are none, an ErrorResponse
private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]]): BaseShrineResponse = {
validResponses.map(_.response).toSet.headOption.getOrElse{
val problem = NoValidResponsesToAggregate()
ErrorResponse(problem)
}
}
}
case class NoValidResponsesToAggregate() extends AbstractProblem(ProblemSources.Hub) {
- override val summary: String = "No valid responses to aggregate."
+ override lazy val summary: String = "No valid responses to aggregate."
- override val description:String = "The hub received no valid responses to aggregate."
+ override lazy val description:String = "The hub received no valid responses to aggregate."
}
\ No newline at end of file
diff --git a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
index b61c2a5b0..1721fdc4d 100644
--- a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
+++ b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
@@ -1,533 +1,533 @@
package net.shrine.qep.queries
import java.sql.SQLException
import java.util.concurrent.TimeoutException
import javax.sql.DataSource
import com.typesafe.config.Config
import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName}
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources}
import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, FlagQueryRequest, I2b2ResultEnvelope, QueryMaster, QueryResult, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RenameQueryRequest, ResultOutputType, ResultOutputTypes, RunQueryRequest, UnFlagQueryRequest}
import net.shrine.qep.QepConfigSource
import net.shrine.slick.{CouldNotRunDbIoActionException, TestableDataSourceCreator}
import net.shrine.util.XmlDateHelper
import slick.driver.JdbcProfile
import scala.collection.immutable.Iterable
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, Future, blocking}
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.NonFatal
import scala.xml.XML
/**
* DB code for the QEP's query instances and query results.
*
* @author david
* @since 1/19/16
*/
case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource,timeout:Duration) extends Loggable {
import schemaDef._
import jdbcProfile.api._
val database = Database.forDataSource(dataSource)
def createTables() = schemaDef.createTables(database)
def dropTables() = schemaDef.dropTables(database)
def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = {
val future: Future[R] = database.run(action)
try {
blocking {
Await.result(future, timeout)
}
}
catch {
case tx:TimeoutException => throw CouldNotRunDbIoActionException(dataSource,tx)
case NonFatal(x) => throw CouldNotRunDbIoActionException(dataSource,x)
}
}
def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = {
debug(s"insertQepQuery $runQueryRequest")
insertQepQuery(QepQuery(runQueryRequest))
}
def insertQepQuery(qepQuery: QepQuery):Unit = {
dbRun(allQepQueryQuery += qepQuery)
}
def selectAllQepQueries:Seq[QepQuery] = {
dbRun(mostRecentVisibleQepQueries.result)
}
def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = {
val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain,request.fetchSize)
val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set])
val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId)))
ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2)))
}
def selectPreviousQueriesByUserAndDomain(userName: UserName, domain: String, limit:Int):Seq[QepQuery] = {
dbRun(mostRecentVisibleQepQueries.filter(_.userName === userName).filter(_.userDomain === domain).sortBy(x => x.changeDate.desc).take(limit).result)
}
def renamePreviousQuery(request:RenameQueryRequest):Unit = {
val networkQueryId = request.networkQueryId
dbRun(
for {
queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
_ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName,changeDate = System.currentTimeMillis()))
} yield queryResults
)
}
def markDeleted(request:DeleteQueryRequest):Unit = {
val networkQueryId = request.networkQueryId
dbRun(
for {
queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
_ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true,changeDate = System.currentTimeMillis()))
} yield queryResults
)
}
def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(flagQueryRequest))
}
def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(unflagQueryRequest))
}
def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = {
dbRun(allQepQueryFlags += qepQueryFlag)
}
def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = {
val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result)
flags.map(x => x.networkQueryId -> x).toMap
}
def insertQepResultRow(qepQueryRow:QueryResultRow) = {
dbRun(allQueryResultRows += qepQueryRow)
}
def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = {
val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node"))
val queryResultRow = QueryResultRow(networkQueryId,result)
val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_))
val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq]
dbRun(
for {
_ <- allQueryResultRows += queryResultRow
_ <- allBreakdownResultsRows ++= breakdowns
_ <- allProblemDigestRows ++= problem
} yield ()
)
}
def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = {
dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result)
}
def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = {
val (queryResults, breakdowns,problems) = dbRun(
for {
queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result
breakdowns <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result
problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result
} yield (queryResults, breakdowns, problems)
)
val resultIdsToI2b2ResultEnvelopes: Map[Long, Map[ResultOutputType, I2b2ResultEnvelope]] = breakdowns.groupBy(_.resultId).map(rIdToB => rIdToB._1 -> QepQueryBreakdownResultsRow.resultEnvelopesFrom(rIdToB._2))
def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = {
if(problemSeq.size == 1) problemSeq.head.toProblemDigest
else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq")
}
val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) )
queryResults.map(r => r.toQueryResult(
resultIdsToI2b2ResultEnvelopes.getOrElse(r.resultId,Map.empty),
adapterNodesToProblemDigests.get(r.adapterNode)
))
}
def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = {
dbRun(allBreakdownResultsRows += breakdownResultsRow)
}
def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = {
dbRun(allBreakdownResultsRows.result)
}
}
object QepQueryDb extends Loggable {
val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config)
val timeout = QepQuerySchema.config.getInt("timeout") seconds
val db = QepQueryDb(QepQuerySchema.schema,dataSource,timeout)
val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart")
if(createTablesOnStart) QepQueryDb.db.createTables()
}
/**
* Separate class to support schema generation without actually connecting to the database.
*
* @param jdbcProfile Database profile to use for the schema
*/
case class QepQuerySchema(jdbcProfile: JdbcProfile,moreBreakdowns: Set[ResultOutputType]) extends Loggable {
import jdbcProfile.api._
def ddlForAllTables: jdbcProfile.DDL = {
allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema
}
//to get the schema, use the REPL
//println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n"))
def createTables(database:Database) = {
try {
val future = database.run(ddlForAllTables.create)
Await.result(future,10 seconds)
} catch {
//I'd prefer to check and create schema only if absent. No way to do that with Oracle.
case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x)
}
}
def dropTables(database:Database) = {
val future = database.run(ddlForAllTables.drop)
//Really wait forever for the cleanup
Await.result(future,Duration.Inf)
}
class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") {
def networkId = column[NetworkQueryId]("networkId")
def userName = column[UserName]("userName")
def userDomain = column[String]("domain")
def queryName = column[QueryName]("queryName")
def expression = column[Option[String]]("expression")
def dateCreated = column[Time]("dateCreated")
def deleted = column[Boolean]("deleted")
def queryXml = column[String]("queryXml")
def changeDate = column[Long]("changeDate")
def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply)
}
val allQepQueryQuery = TableQuery[QepQueries]
val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for(
queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists
) yield queries
val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false)
class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") {
def networkId = column[NetworkQueryId]("networkId")
def flagged = column[Boolean]("flagged")
def flagMessage = column[String]("flagMessage")
def changeDate = column[Long]("changeDate")
def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply)
}
val allQepQueryFlags = TableQuery[QepQueryFlags]
val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for(
queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists
) yield queryFlags
val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values ++ moreBreakdowns
val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap
val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap)
implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({
(resultType: ResultOutputType) => queryResultTypesToString(resultType)
},{
(string: String) => stringsToQueryResultTypes(string)
})
implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({
statusType => statusType.name
},{
name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}"))
})
class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") {
def resultId = column[Long]("resultId")
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def instanceId = column[Long]("instanceId")
def adapterNode = column[String]("adapterNode")
def resultType = column[Option[ResultOutputType]]("resultType")
def size = column[Long]("size")
def startDate = column[Option[Long]]("startDate")
def endDate = column[Option[Long]]("endDate")
def status = column[QueryResult.StatusType]("status")
def statusMessage = column[Option[String]]("statusMessage")
def changeDate = column[Long]("changeDate")
def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply)
}
val allQueryResultRows = TableQuery[QepQueryResults]
//Most recent query result rows for each queryId from each adapter
val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for(
queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists
) yield queryResultRows
class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def resultId = column[Long]("resultId")
def resultType = column[ResultOutputType]("resultType")
def dataKey = column[String]("dataKey")
def value = column[Long]("value")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply)
}
val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults]
//Most recent query result rows for each queryId from each adapter
val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for(
breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists
) yield breakdownResultsRows
/*
case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller {
*/
class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def codec = column[String]("codec")
def stamp = column[String]("stamp")
def summary = column[String]("summary")
def description = column[String]("description")
def details = column[String]("details")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply)
}
val allProblemDigestRows = TableQuery[QepResultProblemDigests]
val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for(
problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists
) yield problemDigests
}
object QepQuerySchema {
val allConfig:Config = QepConfigSource.config
val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database")
val slickProfileClassName = config.getString("slickProfileClassName")
val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName)
import net.shrine.config.{ConfigExtensions, Keys}
val moreBreakdowns: Set[ResultOutputType] = config.getOptionConfigured("breakdownResultOutputTypes",ResultOutputTypes.fromConfig).getOrElse(Set.empty)
val schema = QepQuerySchema(slickProfile,moreBreakdowns)
}
case class QepQuery(
networkId:NetworkQueryId,
userName: UserName,
userDomain: String,
queryName: QueryName,
expression: Option[String],
dateCreated: Time,
deleted: Boolean,
queryXml: String,
changeDate: Time
){
def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = {
QueryMaster(
queryMasterId = networkId.toString,
networkQueryId = networkId,
name = queryName,
userId = userName,
groupId = userDomain,
createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated),
flagged = qepQueryFlag.map(_.flagged),
flagMessage = qepQueryFlag.map(_.flagMessage)
)
}
}
object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,Option[String],Time,Boolean,String,Time) => QepQuery) {
def apply(runQueryRequest: RunQueryRequest):QepQuery = {
new QepQuery(
networkId = runQueryRequest.networkQueryId,
userName = runQueryRequest.authn.username,
userDomain = runQueryRequest.authn.domain,
queryName = runQueryRequest.queryDefinition.name,
expression = runQueryRequest.queryDefinition.expr.map(_.toString),
dateCreated = System.currentTimeMillis(),
deleted = false,
queryXml = runQueryRequest.toXmlString,
changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryFlag(
networkQueryId: NetworkQueryId,
flagged:Boolean,
flagMessage:String,
changeDate:Long
)
object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) {
def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = flagQueryRequest.networkQueryId,
flagged = true,
flagMessage = flagQueryRequest.message.getOrElse(""),
changeDate = System.currentTimeMillis()
)
}
def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = unflagQueryRequest.networkQueryId,
flagged = false,
flagMessage = "",
changeDate = System.currentTimeMillis()
)
}
}
case class QueryResultRow(
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:Option[ResultOutputType],
size:Long,
startDate:Option[Long],
endDate:Option[Long],
status:QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long
) {
def toQueryResult(breakdowns:Map[ResultOutputType,I2b2ResultEnvelope],problemDigest:Option[ProblemDigest]) = QueryResult(
resultId = resultId,
instanceId = instanceId,
resultType = resultType,
setSize = size,
startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar),
endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar),
description = Some(adapterNode),
statusType = status,
statusMessage = statusMessage,
breakdowns = breakdowns,
problemDigest = problemDigest
)
}
object QueryResultRow extends ((Long,NetworkQueryId,Long,String,Option[ResultOutputType],Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow)
{
def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = {
new QueryResultRow(
resultId = result.resultId,
networkQueryId = networkQueryId,
instanceId = result.instanceId,
adapterNode = result.description.getOrElse(s"$result has None in its description field, not a name of an adapter node."),
resultType = result.resultType,
size = result.setSize,
startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis),
endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis),
status = result.statusType,
statusMessage = result.statusMessage,
changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryBreakdownResultsRow(
networkQueryId: NetworkQueryId,
adapterNode:String,
resultId:Long,
resultType: ResultOutputType,
dataKey:String,
value:Long,
changeDate:Long
)
object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){
def breakdownRowsFor(networkQueryId:NetworkQueryId,
adapterNode:String,
resultId:Long,
breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = {
breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis()))
}
def resultEnvelopesFrom(breakdowns:Seq[QepQueryBreakdownResultsRow]): Map[ResultOutputType, I2b2ResultEnvelope] = {
def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = {
val data = breakdowns.map(b => b.dataKey -> b.value).toMap
I2b2ResultEnvelope(resultType,data)
}
breakdowns.groupBy(_.resultType).map(r => r._1 -> resultEnvelopeFrom(r._1,r._2))
}
}
case class QepProblemDigestRow(
networkQueryId: NetworkQueryId,
adapterNode: String,
codec: String,
stampText: String,
summary: String,
description: String,
details: String,
changeDate:Long
){
def toProblemDigest = {
ProblemDigest(
codec,
stampText,
summary,
description,
if(!details.isEmpty) XML.loadString(details)
else ,
//TODO: FIGURE OUT HOW TO GET AN ACUTAL EPOCH INTO HERE
0
)
}
}
case class QepDatabaseProblem(x:Exception) extends AbstractProblem(ProblemSources.Qep){
- override val summary = "A problem encountered while using a database."
+ override lazy val summary = "A problem encountered while using a database."
- override val throwable = Some(x)
+ override lazy val throwable = Some(x)
- override val description = x.getMessage
+ override lazy val description = x.getMessage
}
\ No newline at end of file