diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineError.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineError.scala index 230fa4d9f..7b404e8f2 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineError.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineError.scala @@ -1,16 +1,14 @@ package net.shrine.adapter.dao.model import net.shrine.protocol.QueryResult /** * @author clint - * @date Oct 16, 2012 + * @since Oct 16, 2012 * - * NB: Named ShrineError to avoid clashes with java.lang.Error - * NB: Can't be final, since Squeryl runs this class through cglib to make a synthetic subclass :( */ final case class ShrineError(id: Int, resultId: Int, message: String) extends HasResultId { def toQueryResult: QueryResult = { QueryResult.errorResult(Option(message), QueryResult.StatusType.Error.name) } } diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineQueryResult.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineQueryResult.scala index 216475318..a47318b04 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineQueryResult.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/ShrineQueryResult.scala @@ -1,76 +1,75 @@ package net.shrine.adapter.dao.model import net.shrine.protocol.ResultOutputType import net.shrine.protocol.QueryResult import net.shrine.protocol.I2b2ResultEnvelope /** * @author clint - * @date Oct 16, 2012 + * @since Oct 16, 2012 * * NB: Named ShrineQueryResult to avoid clashes with net.shrine.protocol.QueryResult - * NB: Can't be final, since Squeryl runs this class through cglib to make a synthetic subclass :( */ final case class ShrineQueryResult( networkQueryId: Long, localId: String, wasRun: Boolean, isFlagged: Boolean, flagMessage: Option[String], count: Count, breakdowns: Seq[Breakdown], errors: Seq[ShrineError]) { //TODO: include breakdowns as well? What if they're PROCESSING while the count is FINISHED? Can this even happen? val isDone = count.statusType.isDone def wasNotRun: Boolean = !wasRun def toQueryResults(doObfuscation: Boolean): Option[QueryResult] = { val countResult = count.toQueryResult.map { countQueryResult => //add breakdowns val byType = Map.empty ++ breakdowns.map(b => (b.resultType, b.data)) val getRealOrObfuscated: ObfuscatedPair => Long = { if(doObfuscation) { _.obfuscated } else { _.original } } val typesToData = byType.mapValues(_.mapValues(getRealOrObfuscated)) countQueryResult.withBreakdowns(typesToData.map { case (resultType, data) => (resultType, I2b2ResultEnvelope(resultType, data)) }) } def firstError = errors.headOption.map(_.toQueryResult) countResult orElse firstError } } object ShrineQueryResult { def fromRows(queryRow: ShrineQuery, resultRows: Seq[QueryResultRow], countRow: CountRow, breakdownRows: Map[ResultOutputType, Seq[BreakdownResultRow]], errorRows: Seq[ShrineError]): Option[ShrineQueryResult] = { if(resultRows.isEmpty) { None } else { val resultRowsByType = resultRows.map(r => r.resultType -> r).toMap val breakdowns = (for { (resultType, resultRow) <- resultRowsByType rows <- breakdownRows.get(resultType) breakdown <- Breakdown.fromRows(resultType, resultRow.localId, rows) } yield breakdown).toSeq import ResultOutputType.PATIENT_COUNT_XML for { resultRow <- resultRowsByType.get(PATIENT_COUNT_XML) count = Count.fromRows(resultRow, countRow) } yield ShrineQueryResult(queryRow.networkId, queryRow.localId, queryRow.hasBeenRun, queryRow.isFlagged, queryRow.flagMessage, count, breakdowns, errorRows) } } } \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/squeryl/SquerylShrineError.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/squeryl/SquerylShrineError.scala index 6be3537c2..4bee3601e 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/squeryl/SquerylShrineError.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/dao/model/squeryl/SquerylShrineError.scala @@ -1,23 +1,23 @@ package net.shrine.adapter.dao.model.squeryl import net.shrine.adapter.dao.model.ShrineError import org.squeryl.KeyedEntity import org.squeryl.annotations.Column /** * @author clint - * @date May 28, 2013 + * @since May 28, 2013 */ case class SquerylShrineError( @Column(name = "ID") id: Int, @Column(name = "RESULT_ID") resultId: Int, @Column(name = "MESSAGE") message: String) extends KeyedEntity[Int] { //NB: For Squeryl, ugh :( def this() = this(0, 0, "") def toShrineError = ShrineError(id, resultId, message) } diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala b/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala index 9b74b5fb8..930c6c625 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/ErrorResponse.scala @@ -1,92 +1,92 @@ package net.shrine.protocol import xml.NodeSeq import net.shrine.util.XmlUtil import net.shrine.serialization.XmlUnmarshaller import net.shrine.serialization.I2b2Unmarshaller import net.shrine.util.NodeSeqEnrichments import scala.util.Try import scala.util.control.NonFatal /** * @author Bill Simons - * @date 4/25/11 - * @link http://cbmi.med.harvard.edu - * @link http://chip.org + * @since 4/25/11 + * @see http://cbmi.med.harvard.edu + * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source - * @link http://www.gnu.org/licenses/lgpl.html + * @see http://www.gnu.org/licenses/lgpl.html * * NB: Now a case class for structural equality */ -final case class ErrorResponse(val errorMessage: String) extends ShrineResponse { +final case class ErrorResponse(errorMessage: String) extends ShrineResponse { override protected def status = { errorMessage } override protected def i2b2MessageBody = null import ErrorResponse.rootTagName override def toXml = XmlUtil.stripWhitespace { XmlUtil.renameRootTag(rootTagName) { { errorMessage } } } } object ErrorResponse extends XmlUnmarshaller[ErrorResponse] with I2b2Unmarshaller[ErrorResponse] with HasRootTagName { val rootTagName = "errorResponse" override def fromXml(xml: NodeSeq): ErrorResponse = { - val messageXml = (xml \ "message") + val messageXml = xml \ "message" //NB: Fail fast require(messageXml.nonEmpty) ErrorResponse(XmlUtil.trim(messageXml)) } override def fromI2b2(xml: NodeSeq): ErrorResponse = { import NodeSeqEnrichments.Strictness._ def parseFormatA: Try[ErrorResponse] = { for { statusXml <- xml withChild "response_header" withChild "result_status" withChild "status" - typeText <- (statusXml attribute "type") + typeText <- statusXml attribute "type" if typeText == "ERROR" //NB: Fail fast statusMessage = XmlUtil.trim(statusXml) } yield { ErrorResponse(statusMessage) } } def parseFormatB: Try[ErrorResponse] = { for { conditionXml <- xml withChild "message_body" withChild "response" withChild "status" withChild "condition" typeText <- conditionXml attribute "type" if typeText == "ERROR" statusMessage = XmlUtil.trim(conditionXml) } yield { ErrorResponse(statusMessage) } } parseFormatA.recoverWith { case NonFatal(e) => parseFormatB }.get } /** * * * * * * Query result instance id 3126 not found * * * * */ } \ No newline at end of file diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala index faa3dbda8..b8813d6b7 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/QueryResult.scala @@ -1,241 +1,250 @@ package net.shrine.protocol import javax.xml.datatype.XMLGregorianCalendar import scala.xml.NodeSeq import net.shrine.util.{Tries, XmlUtil, NodeSeqEnrichments, SEnum, XmlDateHelper, OptionEnrichments} import net.shrine.serialization.{ I2b2Marshaller, I2b2Unmarshaller, XmlMarshaller, XmlUnmarshaller } import scala.util.Try /** * @author Bill Simons * @since 4/15/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html * * NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing */ final case class QueryResult( - val resultId: Long, - val instanceId: Long, - val resultType: Option[ResultOutputType], //this won't be present in the case of an error result - val setSize: Long, - val startDate: Option[XMLGregorianCalendar], - val endDate: Option[XMLGregorianCalendar], - val description: Option[String], - val statusType: QueryResult.StatusType, - val statusMessage: Option[String], - val breakdowns: Map[ResultOutputType, I2b2ResultEnvelope] = Map.empty) extends XmlMarshaller with I2b2Marshaller { + resultId: Long, + instanceId: Long, + resultType: Option[ResultOutputType], //this won't be present in the case of an error result + setSize: Long, + startDate: Option[XMLGregorianCalendar], + endDate: Option[XMLGregorianCalendar], + description: Option[String], + statusType: QueryResult.StatusType, + statusMessage: Option[String], //todo maybe add an optional field here for the error message text + breakdowns: Map[ResultOutputType, I2b2ResultEnvelope] = Map.empty) extends XmlMarshaller with I2b2Marshaller { def this(resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, statusType: QueryResult.StatusType) = { this(resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), None, statusType, None) } def this(resultId: Long, instanceId: Long, resultType: ResultOutputType, setSize: Long, startDate: XMLGregorianCalendar, endDate: XMLGregorianCalendar, description: String, statusType: QueryResult.StatusType) = { this(resultId, instanceId, Option(resultType), setSize, Option(startDate), Option(endDate), Option(description), statusType, None) } def resultTypeIs(testedResultType: ResultOutputType): Boolean = resultType match { case Some(rt) => rt == testedResultType case _ => false } import QueryResult._ //NB: Fragile, non-type-safe == def isError = statusType == StatusType.Error def elapsed: Option[Long] = { def inMillis(xmlGc: XMLGregorianCalendar) = xmlGc.toGregorianCalendar.getTimeInMillis for { start <- startDate end <- endDate } yield inMillis(end) - inMillis(start) } //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. private def sortedBreakdowns: Seq[I2b2ResultEnvelope] = { breakdowns.values.toSeq.sortBy(_.resultType.name) } override def toI2b2: NodeSeq = { import OptionEnrichments._ XmlUtil.stripWhitespace { { resultId } { instanceId } { description.toXml() } { resultType match { case Some(rt) if !rt.isError => { if (rt.isBreakdown) { rt.toI2b2NameOnly() } else { rt.toI2b2 } } case _ => ResultOutputType.ERROR.toI2b2NameOnly("") } } { setSize } { startDate.toXml() } { endDate.toXml() } { statusType } { statusType.toI2b2(this) } { //NB: Deliberately use Shrine XML format instead of the i2b2 one. Adding breakdowns to i2b2-format XML here is deviating from the i2b2 XSD schema in any case, //so if we're going to do that, let's produce saner XML. sortedBreakdowns.map(_.toXml.head).map(XmlUtil.renameRootTag("breakdown_data")) } } } override def toXml: NodeSeq = XmlUtil.stripWhitespace { import OptionEnrichments._ { resultId } { instanceId } { resultType.toXml(_.toXml) } { setSize } { startDate.toXml() } { endDate.toXml() } { description.toXml() } { statusType } { statusMessage.toXml() } { //Sorting isn't strictly necessary, but makes deterministic unit testing easier. //The number of breakdowns will be at most 4, so performance should not be an issue. sortedBreakdowns.map(_.toXml) } } def withId(id: Long): QueryResult = copy(resultId = id) def withInstanceId(id: Long): QueryResult = copy(instanceId = id) def modifySetSize(f: Long => Long): QueryResult = withSetSize(f(setSize)) def withSetSize(size: Long): QueryResult = copy(setSize = size) def withDescription(desc: String): QueryResult = copy(description = Option(desc)) def withResultType(resType: ResultOutputType): QueryResult = copy(resultType = Option(resType)) def withBreakdown(breakdownData: I2b2ResultEnvelope) = copy(breakdowns = breakdowns + (breakdownData.resultType -> breakdownData)) def withBreakdowns(newBreakdowns: Map[ResultOutputType, I2b2ResultEnvelope]) = copy(breakdowns = newBreakdowns) } object QueryResult { - final case class StatusType(name: String, isDone: Boolean, i2b2Id: Option[Int] = Some(-1), private val doToI2b2: QueryResult => NodeSeq = StatusType.defaultToI2b2) extends StatusType.Value { + final case class StatusType( + name: String, + isDone: Boolean, + i2b2Id: Option[Int] = Some(-1), + private val doToI2b2:(QueryResult => NodeSeq) = StatusType.defaultToI2b2) extends StatusType.Value { + def isError = this == StatusType.Error def toI2b2(queryResult: QueryResult): NodeSeq = doToI2b2(queryResult) } object StatusType extends SEnum[StatusType] { - //TODO: Avoid .get, but it's reasonably safe here - private val defaultToI2b2: QueryResult => NodeSeq = { queryResult => { queryResult.statusType.i2b2Id.get }{ queryResult.statusType.name } } + private val defaultToI2b2: QueryResult => NodeSeq = { queryResult => + val i2b2Id: Int = queryResult.statusType.i2b2Id.getOrElse{ + throw new IllegalStateException(s"queryResult.statusType ${queryResult.statusType} has no i2b2Id") + } + { queryResult.statusType.i2b2Id.get }{ queryResult.statusType.name } + } val Error = StatusType("ERROR", true, None, _.statusMessage.map(msg => { msg }).orNull) - val Finished = StatusType("FINISHED", true, Some(3)) + val Finished = StatusType("FINISHED", isDone = true, Some(3)) //TODO: Can we use the same for Queued, Processing, and Incomplete? - val Processing = StatusType("PROCESSING", false, Some(2)) - val Queued = StatusType("QUEUED", false, Some(2)) - val Incomplete = StatusType("INCOMPLETE", false, Some(2)) + val Processing = StatusType("PROCESSING", isDone = false, Some(2)) + val Queued = StatusType("QUEUED", isDone = false, Some(2)) + val Incomplete = StatusType("INCOMPLETE", isDone = false, Some(2)) //TODO: What s should these have? Does anyone care? - val Held = StatusType("HELD", false) - val SmallQueue = StatusType("SMALL_QUEUE", false) - val MediumQueue = StatusType("MEDIUM_QUEUE", false) - val LargeQueue = StatusType("LARGE_QUEUE", false) - val NoMoreQueue = StatusType("NO_MORE_QUEUE", false) + val Held = StatusType("HELD", isDone = false) + val SmallQueue = StatusType("SMALL_QUEUE", isDone = false) + val MediumQueue = StatusType("MEDIUM_QUEUE", isDone = false) + val LargeQueue = StatusType("LARGE_QUEUE", isDone = false) + val NoMoreQueue = StatusType("NO_MORE_QUEUE", isDone = false) } def extractLong(nodeSeq: NodeSeq)(elemName: String): Long = (nodeSeq \ elemName).text.toLong private def parseDate(lexicalRep: String): Option[XMLGregorianCalendar] = XmlDateHelper.parseXmlTime(lexicalRep).toOption def elemAt(path: String*)(xml: NodeSeq): NodeSeq = path.foldLeft(xml)(_ \ _) def asText(path: String*)(xml: NodeSeq): String = elemAt(path: _*)(xml).text.trim def asResultOutputTypeOption(elemNames: String*)(breakdownTypes: Set[ResultOutputType], xml: NodeSeq): Option[ResultOutputType] = { import ResultOutputType.valueOf val typeName = asText(elemNames: _*)(xml) valueOf(typeName) orElse valueOf(breakdownTypes)(typeName) } def extractResultOutputType(xml: NodeSeq)(parse: NodeSeq => Try[ResultOutputType]): Option[ResultOutputType] = { val attempt = parse(xml) attempt.toOption } def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def extract(elemName: String): Option[String] = { Option((xml \ elemName).text.trim).filter(!_.isEmpty) } def extractDate(elemName: String): Option[XMLGregorianCalendar] = extract(elemName).flatMap(parseDate) val asLong = extractLong(xml) _ import NodeSeqEnrichments.Strictness._ import Tries.sequence def extractBreakdowns(elemName: String): Map[ResultOutputType, I2b2ResultEnvelope] = { val mapAttempt = for { subXml <- xml.withChild(elemName) envelopes <- sequence(subXml.map(I2b2ResultEnvelope.fromXml(breakdownTypes))) mappings = envelopes.map(envelope => (envelope.resultType -> envelope)) } yield Map.empty ++ mappings mapAttempt.getOrElse(Map.empty) } QueryResult( asLong("resultId"), asLong("instanceId"), extractResultOutputType(xml \ "resultType")(ResultOutputType.fromXml), asLong("setSize"), extractDate("startDate"), extractDate("endDate"), extract("description"), StatusType.valueOf(asText("status")(xml)).get, //TODO: Avoid fragile .get call extract("statusMessage"), extractBreakdowns("resultEnvelope")) } def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): QueryResult = { def asLong = extractLong(xml) _ def asTextOption(path: String*): Option[String] = elemAt(path: _*)(xml).headOption.map(_.text.trim) def asXmlGcOption(path: String): Option[XMLGregorianCalendar] = asTextOption(path).filter(!_.isEmpty).flatMap(parseDate) QueryResult( - asLong("result_instance_id"), - asLong("query_instance_id"), - extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2), - asLong("set_size"), - asXmlGcOption("start_date"), - asXmlGcOption("end_date"), - asTextOption("description"), - StatusType.valueOf(asText("query_status_type", "name")(xml)).get, //TODO: Avoid fragile .get call - asTextOption("query_status_type", "description")) + resultId = asLong("result_instance_id"), + instanceId = asLong("query_instance_id"), + resultType = extractResultOutputType(xml \ "query_result_type")(ResultOutputType.fromI2b2), + setSize = asLong("set_size"), + startDate = asXmlGcOption("start_date"), + endDate = asXmlGcOption("end_date"), + description = asTextOption("description"), + statusType = StatusType.valueOf(asText("query_status_type", "name")(xml)).get, //TODO: Avoid fragile .get call + statusMessage = asTextOption("query_status_type", "description")) } def errorResult(description: Option[String], statusMessage: String) = { QueryResult(0L, 0L, None, 0L, None, None, description, StatusType.Error, Option(statusMessage)) } } diff --git a/commons/util/src/main/scala/net/shrine/util/SEnum.scala b/commons/util/src/main/scala/net/shrine/util/SEnum.scala index 0dced04ab..e778ad9e0 100644 --- a/commons/util/src/main/scala/net/shrine/util/SEnum.scala +++ b/commons/util/src/main/scala/net/shrine/util/SEnum.scala @@ -1,87 +1,87 @@ package net.shrine.util import scala.collection.mutable.Buffer import scala.collection.mutable.ListBuffer import scala.math.Ordering import scala.util.Try /** * @author clint - * @date Mar 11, 2011 + * @since Mar 11, 2011 * * Adapted from http://stackoverflow.com/questions/1898932/case-classes-vs-enumerations-in-scala/4958905#4958905 * * Enum objects containing enum constants mix in SEnum, with T being the enum constant class */ // trait SEnum[T] { private type ValueType = T with Value //Enum constants extend Value trait Value extends Ordered[Value] { self: T => register(this) //name must be supplied somehow def name: String //ordinal field, like Java (is this valuable?) val ordinal: Int = nextOrdinal() override def toString: String = name //Enums can be ordered by their ordinal field final override def compare(other: Value): Int = this.ordinal - other.asInstanceOf[Value].ordinal override def hashCode: Int = ordinal.hashCode override def equals(other: Any): Boolean = { //NB: This can't be a pattern-match, like case that: ValueType, because that leads to //ClassCastExceptions under Scala 2.11.5 and (very indirectly) blows up Squeryl. :( :( :( other != null && other.isInstanceOf[ValueType] && other.asInstanceOf[ValueType].ordinal == this.ordinal } } implicit object ValueTypeOrdering extends Ordering[ValueType] { final override def compare(x: ValueType, y: ValueType): Int = x.compare(y) } final def values: Seq[T] = constants.toSeq private def asKey(name: String): String = name.toLowerCase.filter(_ != '-') import scala.util.{Failure, Success} final def tryValueOf(name: String): Try[T] = name match { case null => Failure(new Exception("Null name passed in")) case _ => constantsByName.get(asKey(name)) match { case Some(value) => Success(value) case None => Failure(new Exception(s"Unknown name '$name' passed in; known values are: ${values.map(v => s"'$v'").mkString(",")}")) } } final def valueOf(name: String): Option[T] = tryValueOf(name).toOption final def valueOfOrElse(name: String)(default: T): T = valueOf(name) match { case Some(v) => v case None => default } private var ordinalCounter = 0 private def nextOrdinal() = { val current = ordinalCounter ordinalCounter += 1 current } private def register(v: ValueType) { constants += v constantsByName += (asKey(v.name) -> v) } private val constants: Buffer[ValueType] = new ListBuffer private var constantsByName: Map[String, ValueType] = Map.empty } diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala index db3194847..15dadac63 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala @@ -1,83 +1,83 @@ package net.shrine.aggregation import net.shrine.log.Loggable import scala.concurrent.duration.Duration import net.shrine.protocol.ErrorResponse import net.shrine.protocol.Failure import net.shrine.protocol.NodeId import net.shrine.protocol.Result import net.shrine.protocol.SingleNodeResult import net.shrine.protocol.Timeout import net.shrine.protocol.BaseShrineResponse /** * * @author Clint Gilbert * @since Sep 16, 2011 * * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Represents the basic aggregation strategy shared by several aggregators: * - Parses a sequence of SpinResultEntries into a sequence of some * combination of valid responses, ErrorResponses, and invalid * responses (cases where ShrineResponse.fromXml returns None) * - Filters the valid responses, weeding out responses that aren't of * the expected type * Invokes an abstract method with the valid responses, errors, and * invalid responses. * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class BasicAggregator[T <: BaseShrineResponse: Manifest] extends Aggregator with Loggable { private[aggregation] def isAggregatable(response: BaseShrineResponse): Boolean = { manifest[T].runtimeClass.isAssignableFrom(response.getClass) } import BasicAggregator._ override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): BaseShrineResponse = { val resultsOrErrors: Iterable[ParsedResult[T]] = { for { result <- results } yield { val parsedResponse: ParsedResult[T] = result match { case Result(origin, _, errorResponse: ErrorResponse) => Error(Option(origin), errorResponse) case Result(origin, elapsed, response: T) if isAggregatable(response) => Valid(origin, elapsed, response) case Timeout(origin) => Error(Option(origin), ErrorResponse(s"Timed out querying node '${origin.name}'")) case Failure(origin, cause) => Error(Option(origin), ErrorResponse(s"Failure querying node '${origin.name}': ${cause.getMessage}")) - case _ => Invalid(None, s"Unexpected response in ${getClass}:\r\n $result") + case _ => Invalid(None, s"Unexpected response in $getClass:\r\n $result") } parsedResponse } } val invalidResponses = resultsOrErrors.collect { case invalid: Invalid => invalid } val validResponses = resultsOrErrors.collect { case valid: Valid[T] => valid } val errorResponses = resultsOrErrors.collect { case error: Error => error } //Log all parsing errors invalidResponses.map(_.errorMessage).foreach(this.error(_)) val previouslyDetectedErrors = errors.map(Error(None, _)) makeResponseFrom(validResponses, errorResponses ++ previouslyDetectedErrors, invalidResponses) } private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse } object BasicAggregator { private[aggregation] sealed abstract class ParsedResult[+T] private[aggregation] final case class Valid[T](origin: NodeId, elapsed: Duration, response: T) extends ParsedResult[T] private[aggregation] final case class Error(origin: Option[NodeId], response: ErrorResponse) extends ParsedResult[Nothing] private[aggregation] final case class Invalid(origin: Option[NodeId], errorMessage: String) extends ParsedResult[Nothing] } \ No newline at end of file