diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineError.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineError.scala
index 230fa4d9f..7b404e8f2 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineError.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineError.scala
@@ -1,16 +1,14 @@
package net.shrine.adapter.dao.model
import net.shrine.protocol.QueryResult
/**
* @author clint
- * @date Oct 16, 2012
+ * @since Oct 16, 2012
*
- * NB: Named ShrineError to avoid clashes with java.lang.Error
- * NB: Can't be final, since Squeryl runs this class through cglib to make a synthetic subclass :(
*/
final case class ShrineError(id: Int, resultId: Int, message: String) extends HasResultId {
def toQueryResult: QueryResult = {
QueryResult.errorResult(Option(message), QueryResult.StatusType.Error.name)
}
}
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineQueryResult.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineQueryResult.scala
index 216475318..a47318b04 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineQueryResult.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineQueryResult.scala
@@ -1,76 +1,75 @@
package net.shrine.adapter.dao.model
import net.shrine.protocol.ResultOutputType
import net.shrine.protocol.QueryResult
import net.shrine.protocol.I2b2ResultEnvelope
/**
* @author clint
- * @date Oct 16, 2012
+ * @since Oct 16, 2012
*
* NB: Named ShrineQueryResult to avoid clashes with net.shrine.protocol.QueryResult
- * NB: Can't be final, since Squeryl runs this class through cglib to make a synthetic subclass :(
*/
final case class ShrineQueryResult(
networkQueryId: Long,
localId: String,
wasRun: Boolean,
isFlagged: Boolean,
flagMessage: Option[String],
count: Count,
breakdowns: Seq[Breakdown],
errors: Seq[ShrineError]) {
//TODO: include breakdowns as well? What if they're PROCESSING while the count is FINISHED? Can this even happen?
val isDone = count.statusType.isDone
def wasNotRun: Boolean = !wasRun
def toQueryResults(doObfuscation: Boolean): Option[QueryResult] = {
val countResult = count.toQueryResult.map { countQueryResult =>
//add breakdowns
val byType = Map.empty ++ breakdowns.map(b => (b.resultType, b.data))
val getRealOrObfuscated: ObfuscatedPair => Long = {
if(doObfuscation) { _.obfuscated }
else { _.original }
}
val typesToData = byType.mapValues(_.mapValues(getRealOrObfuscated))
countQueryResult.withBreakdowns(typesToData.map {
case (resultType, data) =>
(resultType, I2b2ResultEnvelope(resultType, data))
})
}
def firstError = errors.headOption.map(_.toQueryResult)
countResult orElse firstError
}
}
object ShrineQueryResult {
def fromRows(queryRow: ShrineQuery, resultRows: Seq[QueryResultRow], countRow: CountRow, breakdownRows: Map[ResultOutputType, Seq[BreakdownResultRow]], errorRows: Seq[ShrineError]): Option[ShrineQueryResult] = {
if(resultRows.isEmpty) {
None
} else {
val resultRowsByType = resultRows.map(r => r.resultType -> r).toMap
val breakdowns = (for {
(resultType, resultRow) <- resultRowsByType
rows <- breakdownRows.get(resultType)
breakdown <- Breakdown.fromRows(resultType, resultRow.localId, rows)
} yield breakdown).toSeq
import ResultOutputType.PATIENT_COUNT_XML
for {
resultRow <- resultRowsByType.get(PATIENT_COUNT_XML)
count = Count.fromRows(resultRow, countRow)
} yield ShrineQueryResult(queryRow.networkId, queryRow.localId, queryRow.hasBeenRun, queryRow.isFlagged, queryRow.flagMessage, count, breakdowns, errorRows)
}
}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/squeryl/SquerylShrineError.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/squeryl/SquerylShrineError.scala
index 6be3537c2..4bee3601e 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/squeryl/SquerylShrineError.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/squeryl/SquerylShrineError.scala
@@ -1,23 +1,23 @@
package net.shrine.adapter.dao.model.squeryl
import net.shrine.adapter.dao.model.ShrineError
import org.squeryl.KeyedEntity
import org.squeryl.annotations.Column
/**
* @author clint
- * @date May 28, 2013
+ * @since May 28, 2013
*/
case class SquerylShrineError(
@Column(name = "ID")
id: Int,
@Column(name = "RESULT_ID")
resultId: Int,
@Column(name = "MESSAGE")
message: String) extends KeyedEntity[Int] {
//NB: For Squeryl, ugh :(
def this() = this(0, 0, "")
def toShrineError = ShrineError(id, resultId, message)
}
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala b/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala
index 9b74b5fb8..930c6c625 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala
@@ -1,92 +1,92 @@
package net.shrine.protocol
import xml.NodeSeq
import net.shrine.util.XmlUtil
import net.shrine.serialization.XmlUnmarshaller
import net.shrine.serialization.I2b2Unmarshaller
import net.shrine.util.NodeSeqEnrichments
import scala.util.Try
import scala.util.control.NonFatal
/**
* @author Bill Simons
- * @date 4/25/11
- * @link http://cbmi.med.harvard.edu
- * @link http://chip.org
+ * @since 4/25/11
+ * @see http://cbmi.med.harvard.edu
+ * @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
- * @link http://www.gnu.org/licenses/lgpl.html
+ * @see http://www.gnu.org/licenses/lgpl.html
*
* NB: Now a case class for structural equality
*/
-final case class ErrorResponse(val errorMessage: String) extends ShrineResponse {
+final case class ErrorResponse(errorMessage: String) extends ShrineResponse {
override protected def status = { errorMessage }
override protected def i2b2MessageBody = null
import ErrorResponse.rootTagName
override def toXml = XmlUtil.stripWhitespace {
XmlUtil.renameRootTag(rootTagName) {
{ errorMessage }
}
}
}
object ErrorResponse extends XmlUnmarshaller[ErrorResponse] with I2b2Unmarshaller[ErrorResponse] with HasRootTagName {
val rootTagName = "errorResponse"
override def fromXml(xml: NodeSeq): ErrorResponse = {
- val messageXml = (xml \ "message")
+ val messageXml = xml \ "message"
//NB: Fail fast
require(messageXml.nonEmpty)
ErrorResponse(XmlUtil.trim(messageXml))
}
override def fromI2b2(xml: NodeSeq): ErrorResponse = {
import NodeSeqEnrichments.Strictness._
def parseFormatA: Try[ErrorResponse] = {
for {
statusXml <- xml withChild "response_header" withChild "result_status" withChild "status"
- typeText <- (statusXml attribute "type")
+ typeText <- statusXml attribute "type"
if typeText == "ERROR" //NB: Fail fast
statusMessage = XmlUtil.trim(statusXml)
} yield {
ErrorResponse(statusMessage)
}
}
def parseFormatB: Try[ErrorResponse] = {
for {
conditionXml <- xml withChild "message_body" withChild "response" withChild "status" withChild "condition"
typeText <- conditionXml attribute "type"
if typeText == "ERROR"
statusMessage = XmlUtil.trim(conditionXml)
} yield {
ErrorResponse(statusMessage)
}
}
parseFormatA.recoverWith { case NonFatal(e) => parseFormatB }.get
}
/**
*
*
*
*
*
* Query result instance id 3126 not found
*
*
*
*
*/
}
\ No newline at end of file
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
index faa3dbda8..b8813d6b7 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala
@@ -1,241 +1,250 @@
package net.shrine.protocol
import javax.xml.datatype.XMLGregorianCalendar
import scala.xml.NodeSeq
import net.shrine.util.{Tries, XmlUtil, NodeSeqEnrichments, SEnum, XmlDateHelper, OptionEnrichments}
import net.shrine.serialization.{ I2b2Marshaller, I2b2Unmarshaller, XmlMarshaller, XmlUnmarshaller }
import scala.util.Try
/**
* @author Bill Simons
* @since 4/15/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*
* NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing
*/
final case class QueryResult(
- val resultId: Long,
- val instanceId: Long,
- val resultType: Option[ResultOutputType], //this won't be present in the case of an error result
- val setSize: Long,
- val startDate: Option[XMLGregorianCalendar],
- val endDate: Option[XMLGregorianCalendar],
- val description: Option[String],
- val statusType: QueryResult.StatusType,
- val statusMessage: Option[String],
- val breakdowns: Map[ResultOutputType, I2b2ResultEnvelope] = Map.empty) extends XmlMarshaller with I2b2Marshaller {
+ resultId: Long,
+ instanceId: Long,
+ resultType: Option[ResultOutputType], //this won't be present in the case of an error result
+ setSize: Long,
+ startDate: Option[XMLGregorianCalendar],
+ endDate: Option[XMLGregorianCalendar],
+ description: Option[String],
+ statusType: QueryResult.StatusType,
+ statusMessage: Option[String], //todo maybe add an optional field here for the error message text
+ breakdowns: Map[ResultOutputType, I2b2ResultEnvelope] = Map.empty) extends XmlMarshaller with I2b2Marshaller {
def this(resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, statusType: QueryResult.StatusType) = {
this(resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), None, statusType, None)
}
def this(resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, description: String, statusType: QueryResult.StatusType) = {
this(resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), Option(description), statusType, None)
}
def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match {
case Some(rt) => rt == testedResultType
case _ => false
}
import QueryResult._
//NB: Fragile, non-type-safe ==
def isError = statusType == StatusType.Error
def elapsed: Option[Long] = {
def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis
for {
start <- startDate
end <- endDate
} yield inMillis(end) - inMillis(start)
}
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = {
breakdowns.values.toSeq.sortBy(_.resultType.name)
}
override def toI2b2: NodeSeq = {
import OptionEnrichments._
XmlUtil.stripWhitespace {
{ resultId }
{ instanceId }
{ description.toXml() }
{
resultType match {
case Some(rt) if !rt.isError => {
if (rt.isBreakdown) { rt.toI2b2NameOnly() }
else { rt.toI2b2 }
}
case _ => ResultOutputType.ERROR.toI2b2NameOnly("")
}
}
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ statusType }
{ statusType.toI2b2(this) }
{
//NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case,
//so if we're going to do that, let's produce saner XML.
sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data"))
}
}
}
override def toXml: NodeSeq = XmlUtil.stripWhitespace {
import OptionEnrichments._
{ resultId }
{ instanceId }
{ resultType.toXml(_.toXml) }
{ setSize }
{ startDate.toXml() }
{ endDate.toXml() }
{ description.toXml() }
{ statusType }
{ statusMessage.toXml() }
{
//Sorting isn't strictly necessary, but makes deterministic unit testing easier.
//The number of breakdowns will be at most 4, so performance should not be an issue.
sortedBreakdowns.map(_.toXml)
}
}
def withId(id: Long): QueryResult = copy(resultId = id)
def withInstanceId(id: Long): QueryResult = copy(instanceId = id)
def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize))
def withSetSize(size: Long): QueryResult = copy(setSize = size)
def withDescription(desc: String): QueryResult = copy(description = Option(desc))
def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType))
def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData))
def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns)
}
object QueryResult {
- final case class StatusType(name: String, isDone: Boolean, i2b2Id: Option[Int] = Some(-1), private val doToI2b2: QueryResult => NodeSeq = StatusType.defaultToI2b2) extends StatusType.Value {
+ final case class StatusType(
+ name: String,
+ isDone: Boolean,
+ i2b2Id: Option[Int] = Some(-1),
+ private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2) extends StatusType.Value {
+
def isError = this == StatusType.Error
def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult)
}
object StatusType extends SEnum[StatusType] {
- //TODO: Avoid .get, but it's reasonably safe here
- private val defaultToI2b2: QueryResult => NodeSeq = { queryResult => { queryResult.statusType.i2b2Id.get }{ queryResult.statusType.name } }
+ private val defaultToI2b2: QueryResult => NodeSeq = { queryResult =>
+ val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{
+ throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id")
+ }
+ { queryResult.statusType.i2b2Id.get }{ queryResult.statusType.name }
+ }
val Error = StatusType("ERROR", true, None, _.statusMessage.map(msg => { msg }).orNull)
- val Finished = StatusType("FINISHED", true, Some(3))
+ val Finished = StatusType("FINISHED", isDone = true, Some(3))
//TODO: Can we use the same for Queued, Processing, and Incomplete?
- val Processing = StatusType("PROCESSING", false, Some(2))
- val Queued = StatusType("QUEUED", false, Some(2))
- val Incomplete = StatusType("INCOMPLETE", false, Some(2))
+ val Processing = StatusType("PROCESSING", isDone = false, Some(2))
+ val Queued = StatusType("QUEUED", isDone = false, Some(2))
+ val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2))
//TODO: What s should these have? Does anyone care?
- val Held = StatusType("HELD", false)
- val SmallQueue = StatusType("SMALL_QUEUE", false)
- val MediumQueue = StatusType("MEDIUM_QUEUE", false)
- val LargeQueue = StatusType("LARGE_QUEUE", false)
- val NoMoreQueue = StatusType("NO_MORE_QUEUE", false)
+ val Held = StatusType("HELD", isDone = false)
+ val SmallQueue = StatusType("SMALL_QUEUE", isDone = false)
+ val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false)
+ val LargeQueue = StatusType("LARGE_QUEUE", isDone = false)
+ val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false)
}
def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong
private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption
def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _)
def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim
def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = {
import ResultOutputType.valueOf
val typeName = asText(elemNames: _*)(xml)
valueOf(typeName) orElse valueOf(breakdownTypes)(typeName)
}
def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = {
val attempt = parse(xml)
attempt.toOption
}
def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def extract(elemName: String): Option[String] = {
Option((xml \ elemName).text.trim).filter(!_.isEmpty)
}
def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate)
val asLong = extractLong(xml) _
import NodeSeqEnrichments.Strictness._
import Tries.sequence
def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = {
val mapAttempt = for {
subXml <- xml.withChild(elemName)
envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes)))
mappings = envelopes.map(envelope => (envelope.resultType -> envelope))
} yield Map.empty ++ mappings
mapAttempt.getOrElse(Map.empty)
}
QueryResult(
asLong("resultId"),
asLong("instanceId"),
extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml),
asLong("setSize"),
extractDate("startDate"),
extractDate("endDate"),
extract("description"),
StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call
extract("statusMessage"),
extractBreakdowns("resultEnvelope"))
}
def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = {
def asLong = extractLong(xml) _
def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim)
def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate)
QueryResult(
- asLong("result_instance_id"),
- asLong("query_instance_id"),
- extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2),
- asLong("set_size"),
- asXmlGcOption("start_date"),
- asXmlGcOption("end_date"),
- asTextOption("description"),
- StatusType.valueOf(asText("query_status_type", "name")(xml)).get, //TODO: Avoid fragile .get call
- asTextOption("query_status_type", "description"))
+ resultId = asLong("result_instance_id"),
+ instanceId = asLong("query_instance_id"),
+ resultType = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2),
+ setSize = asLong("set_size"),
+ startDate = asXmlGcOption("start_date"),
+ endDate = asXmlGcOption("end_date"),
+ description = asTextOption("description"),
+ statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get, //TODO: Avoid fragile .get call
+ statusMessage = asTextOption("query_status_type", "description"))
}
def errorResult(description: Option[String], statusMessage: String) = {
QueryResult(0L, 0L, None, 0L, None, None, description, StatusType.Error, Option(statusMessage))
}
}
diff --git a/commons/util/src/main/scala/net/shrine/util/SEnum.scala b/commons/util/src/main/scala/net/shrine/util/SEnum.scala
index 0dced04ab..e778ad9e0 100644
--- a/commons/util/src/main/scala/net/shrine/util/SEnum.scala
+++ b/commons/util/src/main/scala/net/shrine/util/SEnum.scala
@@ -1,87 +1,87 @@
package net.shrine.util
import scala.collection.mutable.Buffer
import scala.collection.mutable.ListBuffer
import scala.math.Ordering
import scala.util.Try
/**
* @author clint
- * @date Mar 11, 2011
+ * @since Mar 11, 2011
*
* Adapted from http://stackoverflow.com/questions/1898932/case-classes-vs-enumerations-in-scala/4958905#4958905
*
* Enum objects containing enum constants mix in SEnum, with T being the enum constant class
*/
//
trait SEnum[T] {
private type ValueType = T with Value
//Enum constants extend Value
trait Value extends Ordered[Value] { self: T =>
register(this)
//name must be supplied somehow
def name: String
//ordinal field, like Java (is this valuable?)
val ordinal: Int = nextOrdinal()
override def toString: String = name
//Enums can be ordered by their ordinal field
final override def compare(other: Value): Int = this.ordinal - other.asInstanceOf[Value].ordinal
override def hashCode: Int = ordinal.hashCode
override def equals(other: Any): Boolean = {
//NB: This can't be a pattern-match, like case that: ValueType, because that leads to
//ClassCastExceptions under Scala 2.11.5 and (very indirectly) blows up Squeryl. :( :( :(
other != null && other.isInstanceOf[ValueType] && other.asInstanceOf[ValueType].ordinal == this.ordinal
}
}
implicit object ValueTypeOrdering extends Ordering[ValueType] {
final override def compare(x: ValueType, y: ValueType): Int = x.compare(y)
}
final def values: Seq[T] = constants.toSeq
private def asKey(name: String): String = name.toLowerCase.filter(_ != '-')
import scala.util.{Failure, Success}
final def tryValueOf(name: String): Try[T] = name match {
case null => Failure(new Exception("Null name passed in"))
case _ => constantsByName.get(asKey(name)) match {
case Some(value) => Success(value)
case None => Failure(new Exception(s"Unknown name '$name' passed in; known values are: ${values.map(v => s"'$v'").mkString(",")}"))
}
}
final def valueOf(name: String): Option[T] = tryValueOf(name).toOption
final def valueOfOrElse(name: String)(default: T): T = valueOf(name) match {
case Some(v) => v
case None => default
}
private var ordinalCounter = 0
private def nextOrdinal() = {
val current = ordinalCounter
ordinalCounter += 1
current
}
private def register(v: ValueType) {
constants += v
constantsByName += (asKey(v.name) -> v)
}
private val constants: Buffer[ValueType] = new ListBuffer
private var constantsByName: Map[String, ValueType] = Map.empty
}
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
index db3194847..15dadac63 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
@@ -1,83 +1,83 @@
package net.shrine.aggregation
import net.shrine.log.Loggable
import scala.concurrent.duration.Duration
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.Failure
import net.shrine.protocol.NodeId
import net.shrine.protocol.Result
import net.shrine.protocol.SingleNodeResult
import net.shrine.protocol.Timeout
import net.shrine.protocol.BaseShrineResponse
/**
*
* @author Clint Gilbert
* @since Sep 16, 2011
*
* @see http://cbmi.med.harvard.edu
*
* This software is licensed under the LGPL
* @see http://www.gnu.org/licenses/lgpl.html
*
* Represents the basic aggregation strategy shared by several aggregators:
* - Parses a sequence of SpinResultEntries into a sequence of some
* combination of valid responses, ErrorResponses, and invalid
* responses (cases where ShrineResponse.fromXml returns None)
* - Filters the valid responses, weeding out responses that aren't of
* the expected type
* Invokes an abstract method with the valid responses, errors, and
* invalid responses.
*
* Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest)
*/
abstract class BasicAggregator[T <: BaseShrineResponse: Manifest] extends Aggregator with Loggable {
private[aggregation] def isAggregatable(response: BaseShrineResponse): Boolean = {
manifest[T].runtimeClass.isAssignableFrom(response.getClass)
}
import BasicAggregator._
override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): BaseShrineResponse = {
val resultsOrErrors: Iterable[ParsedResult[T]] = {
for {
result <- results
} yield {
val parsedResponse: ParsedResult[T] = result match {
case Result(origin, _, errorResponse: ErrorResponse) => Error(Option(origin), errorResponse)
case Result(origin, elapsed, response: T) if isAggregatable(response) => Valid(origin, elapsed, response)
case Timeout(origin) => Error(Option(origin), ErrorResponse(s"Timed out querying node '${origin.name}'"))
case Failure(origin, cause) => Error(Option(origin), ErrorResponse(s"Failure querying node '${origin.name}': ${cause.getMessage}"))
- case _ => Invalid(None, s"Unexpected response in ${getClass}:\r\n $result")
+ case _ => Invalid(None, s"Unexpected response in $getClass:\r\n $result")
}
parsedResponse
}
}
val invalidResponses = resultsOrErrors.collect { case invalid: Invalid => invalid }
val validResponses = resultsOrErrors.collect { case valid: Valid[T] => valid }
val errorResponses = resultsOrErrors.collect { case error: Error => error }
//Log all parsing errors
invalidResponses.map(_.errorMessage).foreach(this.error(_))
val previouslyDetectedErrors = errors.map(Error(None, _))
makeResponseFrom(validResponses, errorResponses ++ previouslyDetectedErrors, invalidResponses)
}
private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse
}
object BasicAggregator {
private[aggregation] sealed abstract class ParsedResult[+T]
private[aggregation] final case class Valid[T](origin: NodeId, elapsed: Duration, response: T) extends ParsedResult[T]
private[aggregation] final case class Error(origin: Option[NodeId], response: ErrorResponse) extends ParsedResult[Nothing]
private[aggregation] final case class Invalid(origin: Option[NodeId], errorMessage: String) extends ParsedResult[Nothing]
}
\ No newline at end of file