diff --git a/commons/util/src/main/scala/net/shrine/problem/Problem.scala b/commons/util/src/main/scala/net/shrine/problem/Problem.scala index 9b702bdb4..d5a822721 100644 --- a/commons/util/src/main/scala/net/shrine/problem/Problem.scala +++ b/commons/util/src/main/scala/net/shrine/problem/Problem.scala @@ -1,146 +1,128 @@ package net.shrine.problem import java.net.{InetAddress, ConnectException} import java.util.Date import net.shrine.log.Loggable import net.shrine.serialization.{XmlUnmarshaller, XmlMarshaller} import scala.xml.{Node, NodeSeq} /** * Describes what information we have about a problem at the site in code where we discover it. * * @author david * @since 8/6/15 */ trait Problem { def summary:String def problemName = getClass.getName def throwable:Option[Throwable] = None def stamp:Stamp def description = s"${stamp.pretty}" def throwableDetail = throwable.map(x => x.getStackTrace.mkString(sys.props("line.separator"))) def details:String = s"${throwableDetail.getOrElse("")}" def toDigest:ProblemDigest = ProblemDigest(problemName,summary,description,details) } case class ProblemDigest(codec:String,summary:String,description:String,details:String) extends XmlMarshaller { override def toXml: Node = { {codec} {summary} {description}
{details}
} } object ProblemDigest extends XmlUnmarshaller[ProblemDigest] with Loggable { def apply(oldMessage:String):ProblemDigest = { val ex = new IllegalStateException(s"'$oldMessage' detected, not in codec. Please report this problem and stack trace to Shrine dev.") ex.fillInStackTrace() warn(ex) ProblemDigest("ProblemNotInCodec",oldMessage,"","") } override def fromXml(xml: NodeSeq): ProblemDigest = { val problemNode = xml \ "problem" require(problemNode.nonEmpty,s"No problem tag in $xml") def extractText(tagName:String) = (problemNode \ tagName).text val codec = extractText("codec") val summary = extractText("summary") val description = extractText("description") val details = extractText("details") ProblemDigest(codec,summary,description,details) } } case class Stamp(host:InetAddress,time:Long,source:ProblemSources.ProblemSource) { def pretty = s"at ${new Date(time)} on $host ${source.pretty}" } object Stamp { def apply(source:ProblemSources.ProblemSource): Stamp = Stamp(InetAddress.getLocalHost,System.currentTimeMillis(),source) } abstract class AbstractProblem(source:ProblemSources.ProblemSource) extends Problem { val stamp = Stamp(source) } trait ProblemHandler { def handleProblem(problem:Problem) } /** * An example problem handler */ object LoggingProblemHandler extends ProblemHandler with Loggable { override def handleProblem(problem: Problem): Unit = { problem.throwable.fold(error(problem.toString))(throwable => error(problem.toString,throwable) ) } } object ProblemSources{ sealed trait ProblemSource { //todo name without $ def pretty = getClass.getSimpleName } case object Adapter extends ProblemSource case object Hub extends ProblemSource case object Qep extends ProblemSource case object Dsa extends ProblemSource case object Unknown extends ProblemSource def problemSources = Set(Adapter,Hub,Qep,Dsa,Unknown) } case class ProblemNotInCodec(summary:String,t:Throwable) extends AbstractProblem(ProblemSources.Unknown){ override val throwable = Some(t) override val description = s"${super.description} . This error is not yet in the codec. Please report the stack trace to the Shrine development team at TODO" } object ProblemNotInCodec { def apply(summary:String):ProblemNotInCodec = { val x = new IllegalStateException(s"$summary , is not yet in the codec.") x.fillInStackTrace() new ProblemNotInCodec(summary,x) } } - -/** - * For "Failure querying node 'SITE NAME': java.net.ConnectException: Connection refused" - * - * This one is interesting because "Connection refused" is different from "Connection timed out" according to Keith's - * notes, but the only way to pick that up is to pull the text out of that contained exception. However, all four options - * are probably worth checking no matter what the exception's message. - */ - -//todo NodeId is in protocol, which will be accessible from the hub code where this class should live - -//case class CouldNotConnectToQueryNode(nodeId:NodeId,connectExcepition:ConnectException) extends Problem { -case class CouldNotConnectToNode(nodeName:String,connectException:ConnectException) extends AbstractProblem(ProblemSources.Hub) { - - val summary = s"Could not connect to node $nodeName" - - override def throwable = Some(connectException) -} diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala index debbbf4c7..e7edf19cd 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala @@ -1,96 +1,115 @@ package net.shrine.aggregation import com.sun.mail.iap.ConnectionException +import net.shrine.broadcaster.CouldNotParseResultsException import net.shrine.log.Loggable import net.shrine.problem.{ProblemNotInCodec, ProblemSources, AbstractProblem} import scala.concurrent.duration.Duration import net.shrine.protocol.ErrorResponse import net.shrine.protocol.Failure import net.shrine.protocol.NodeId import net.shrine.protocol.Result import net.shrine.protocol.SingleNodeResult import net.shrine.protocol.Timeout import net.shrine.protocol.BaseShrineResponse /** * * @author Clint Gilbert * @since Sep 16, 2011 * * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Represents the basic aggregation strategy shared by several aggregators: * - Parses a sequence of SpinResultEntries into a sequence of some * combination of valid responses, ErrorResponses, and invalid * responses (cases where ShrineResponse.fromXml returns None) * - Filters the valid responses, weeding out responses that aren't of * the expected type * Invokes an abstract method with the valid responses, errors, and * invalid responses. * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class BasicAggregator[T <: BaseShrineResponse: Manifest] extends Aggregator with Loggable { private[aggregation] def isAggregatable(response: BaseShrineResponse): Boolean = { manifest[T].runtimeClass.isAssignableFrom(response.getClass) } import BasicAggregator._ override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): BaseShrineResponse = { val resultsOrErrors: Iterable[ParsedResult[T]] = { for { result <- results } yield { val parsedResponse: ParsedResult[T] = result match { case Result(origin, _, errorResponse: ErrorResponse) => Error(Option(origin), errorResponse) case Result(origin, elapsed, response: T) if isAggregatable(response) => Valid(origin, elapsed, response) case Timeout(origin) => Error(Option(origin), ErrorResponse(s"Timed out querying node '${origin.name}'")) //todo failure becomes an ErrorResponse and Error status type here. And the stack trace gets eaten. case Failure(origin, cause) => { cause match { case cx: ConnectionException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, cx))) + case cnprx:CouldNotParseResultsException => { + if(cnprx.statusCode >= 400) Error(Option(origin), ErrorResponse(HttpErrorResponseProblem(cnprx))) + else Error(Option(origin), ErrorResponse(CouldNotParseResultsProblem(cnprx))) + } case x => Error(Option(origin), ErrorResponse(ProblemNotInCodec(s"Failure querying node ${origin.name}",x))) } } case _ => Invalid(None, s"Unexpected response in $getClass:\r\n $result") } parsedResponse } } val invalidResponses = resultsOrErrors.collect { case invalid: Invalid => invalid } val validResponses = resultsOrErrors.collect { case valid: Valid[T] => valid } val errorResponses: Iterable[Error] = resultsOrErrors.collect { case error: Error => error } //Log all parsing errors invalidResponses.map(_.errorMessage).foreach(this.error(_)) val previouslyDetectedErrors = errors.map(Error(None, _)) makeResponseFrom(validResponses, errorResponses ++ previouslyDetectedErrors, invalidResponses) } private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse } object BasicAggregator { private[aggregation] sealed abstract class ParsedResult[+T] private[aggregation] final case class Valid[T](origin: NodeId, elapsed: Duration, response: T) extends ParsedResult[T] private[aggregation] final case class Error(origin: Option[NodeId], response: ErrorResponse) extends ParsedResult[Nothing] private[aggregation] final case class Invalid(origin: Option[NodeId], errorMessage: String) extends ParsedResult[Nothing] } case class CouldNotConnectToAdapter(origin:NodeId,cx:ConnectionException) extends AbstractProblem(ProblemSources.Hub) { - override def summary: String = s"Could not connect to adapter at ${origin.name}." - override def throwable = Some(cx) + override val summary: String = s"Could not connect to adapter at ${origin.name}." + override val throwable = Some(cx) +} + +case class CouldNotParseResultsProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) { + override val summary: String = s"Caught a ${cnrpx.cause.getClass.getSimpleName} while parsing a response from ${cnrpx.url}" + override val throwable = Some(cnrpx) + override val description = s"${super.description} While parsing a response from ${cnrpx.url} with http code ${cnrpx.url} caught '${cnrpx.cause.getMessage}'" + override val details = s"${super.details}\n\nMessage body is: \n ${cnrpx.body}" +} + +case class HttpErrorResponseProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) { + override val summary: String = s"Observed ${cnrpx.statusCode} and caught a ${cnrpx.cause.getClass.getSimpleName} while parsing a response from ${cnrpx.url}" + override val throwable = Some(cnrpx) + override val description = s"${super.description} While parsing a response from ${cnrpx.url} with http code ${cnrpx.url} caught '${cnrpx.cause.getMessage}'" + override val details = s"${super.details}\n\nMessage body is: \n ${cnrpx.body}" } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/PosterBroadcasterClient.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/PosterBroadcasterClient.scala index 0b66f37f6..69492e74f 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/PosterBroadcasterClient.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/PosterBroadcasterClient.scala @@ -1,37 +1,45 @@ package net.shrine.broadcaster import net.shrine.client.{HttpResponse, Poster} import net.shrine.log.Loggable import net.shrine.protocol.{NodeId, BroadcastMessage, SingleNodeResult, MultiplexedResults, ResultOutputType} import scala.concurrent.Future import scala.concurrent.blocking import scala.util.{Success, Try} import scala.xml.XML /** * @author clint * @since Mar 3, 2014 */ final case class PosterBroadcasterClient(poster: Poster, breakdownTypes: Set[ResultOutputType]) extends BroadcasterClient with Loggable { override def broadcast(message: BroadcastMessage): Future[Iterable[SingleNodeResult]] = { //TODO: REVISIT import scala.concurrent.ExecutionContext.Implicits.global for { response: HttpResponse <- Future { blocking { poster.post(message.toXmlString) } } } yield { val tryResults: Try[Seq[SingleNodeResult]] = MultiplexedResults.fromXml(breakdownTypes)(XML.loadString(response.body)).map(_.results) //todo use fold()() in Scala 2.12 tryResults match { case Success(results) => results case scala.util.Failure(ex) => { error(s"Exception while parsing response with status ${response.statusCode} from ${poster.url} while parsing ${response.body}",ex) //todo where to get a real nodeId? - Seq(net.shrine.protocol.Failure(NodeId(poster.url),ex)) + val x = CouldNotParseResultsException(response.statusCode,poster.url,response.body,ex) + x.fillInStackTrace() + Seq(net.shrine.protocol.Failure(NodeId(poster.url),x)) } } } } +} + +case class CouldNotParseResultsException(statusCode:Int,url:String,body:String,cause:Throwable) extends Exception(CouldNotParseResultsException.createMessage(statusCode,url),cause) + +object CouldNotParseResultsException { + def createMessage(statusCode:Int,url:String) = s"While parsing response with status ${statusCode} from ${url}" } \ No newline at end of file