diff --git a/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala b/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala index 820f72f9e..bfe391c37 100644 --- a/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala +++ b/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala @@ -1,127 +1,127 @@ package net.shrine.adapter.client import java.net.SocketTimeoutException import net.shrine.problem.{ProblemNotYetEncoded, ProblemSources, AbstractProblem} import org.xml.sax.SAXParseException import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.blocking import scala.concurrent.duration.Duration import scala.concurrent.duration.DurationInt import scala.util.control.NonFatal import scala.xml.{NodeSeq, XML} import com.sun.jersey.api.client.ClientHandlerException import net.shrine.client.{HttpResponse, TimeoutException, Poster} import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.ErrorResponse import net.shrine.protocol.NodeId import net.shrine.protocol.Result import scala.util.{Failure, Success, Try} import net.shrine.protocol.ResultOutputType /** * @author clint * @since Nov 15, 2013 * * */ final class RemoteAdapterClient private (val poster: Poster, val breakdownTypes: Set[ResultOutputType]) extends AdapterClient { import RemoteAdapterClient._ //NB: Overriding apply in the companion object screws up case-class code generation for some reason, so //we add the would-have-been-generated methods here override def toString = s"RemoteAdapterClient($poster)" override def hashCode: Int = 31 * (if(poster == null) 1 else poster.hashCode) override def equals(other: Any): Boolean = other match { case that: RemoteAdapterClient if that != null => poster == that.poster case _ => false } //TODO: Revisit this import scala.concurrent.ExecutionContext.Implicits.global override def query(request: BroadcastMessage): Future[Result] = { val requestXml = request.toXml Future { blocking { val response: HttpResponse = poster.post(requestXml.toString()) interpretResponse(response) } }.recover { case e if isTimeout(e) => throw new TimeoutException(s"Invoking adapter at ${poster.url} timed out", e) } } def interpretResponse(response:HttpResponse):Result = { if(response.statusCode <= 400){ val responseXml = response.body import scala.concurrent.duration._ //Should we know the NodeID here? It would let us make a better error response. Try(XML.loadString(responseXml)).flatMap(Result.fromXml(breakdownTypes)) match { case Success(result) => result case Failure(x) => { val errorResponse = x match { case sx: SAXParseException => ErrorResponse(CouldNotParseXmlFromAdapter(poster.url,response.statusCode,responseXml,sx)) case _ => ErrorResponse(ProblemNotYetEncoded(s"Couldn't understand response from adapter at '${poster.url}': $responseXml", x)) } Result(NodeId.Unknown, 0.milliseconds, errorResponse) } } } else { Result(NodeId.Unknown,0.milliseconds,ErrorResponse(HttpErrorCodeFromAdapter(poster.url,response.statusCode,response.body))) } } } object RemoteAdapterClient { def apply(poster: Poster, breakdownTypes: Set[ResultOutputType]): RemoteAdapterClient = { //NB: Replicate URL-munging that used to be performed by JerseyAdapterClient val posterToUse = { if(poster.url.endsWith("requests")) { poster } else { poster.mapUrl(_ + "/requests") } } new RemoteAdapterClient(posterToUse, breakdownTypes) } def isTimeout(e: Throwable): Boolean = e match { case e: SocketTimeoutException => true case e: ClientHandlerException => { val cause = e.getCause cause != null && cause.isInstanceOf[SocketTimeoutException] } case _ => false } } case class HttpErrorCodeFromAdapter(url:String,statusCode:Int,responseBody:String) extends AbstractProblem(ProblemSources.Adapter) { - override def summary: String = s"Hub received a fatal error response" + override def summary: String = "Hub received a fatal error response" override def description: String = s"Hub received error code $statusCode from the adapter at $url" override def detailsXml:NodeSeq =
{s"Http response body was $responseBody"}
} case class CouldNotParseXmlFromAdapter(url:String,statusCode:Int,responseBody:String,saxx: SAXParseException) extends AbstractProblem(ProblemSources.Adapter) { override def throwable = Some(saxx) override def summary: String = s"Hub could not parse response from adapter" override def description: String = s"Hub could not parse xml from $url due to ${saxx.toString}" override def detailsXml:NodeSeq =
{s"Http response code was $statusCode and the body was $responseBody"} {throwableDetail}
} \ No newline at end of file diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala index 35577f1a3..c93d74968 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala @@ -1,79 +1,79 @@ package net.shrine.adapter import java.net.InetAddress import net.shrine.log.Loggable import net.shrine.problem.{Problem, ProblemNotYetEncoded, LoggingProblemHandler, ProblemSources, AbstractProblem} import net.shrine.protocol.{ShrineRequest, BroadcastMessage, ErrorResponse, BaseShrineResponse, AuthenticationInfo} /** * @author Bill Simons * @since 4/8/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ abstract class Adapter extends Loggable { final def perform(message: BroadcastMessage): BaseShrineResponse = { def problemToErrorResponse(problem:Problem):ErrorResponse = { LoggingProblemHandler.handleProblem(problem) ErrorResponse(problem) } val shrineResponse = try { processRequest(message) } catch { case e: AdapterLockoutException => problemToErrorResponse(AdapterLockout(message.request.authn,e)) case e @ CrcInvocationException(invokedCrcUrl, request, cause) => problemToErrorResponse(CrcCouldNotBeInvoked(invokedCrcUrl,request,e)) case e: AdapterMappingException => problemToErrorResponse(AdapterMappingProblem(e)) //noinspection RedundantBlock case e: Exception => { val summary = if(message == null) "Unknown problem in Adapter.perform with null BroadcastMessage" else s"Unexpected exception in Adapter" problemToErrorResponse(ProblemNotYetEncoded(summary,e)) } } shrineResponse } protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse //NOOP, may be overridden by subclasses def shutdown(): Unit = () } case class AdapterLockout(authn:AuthenticationInfo,x:AdapterLockoutException) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) - override val summary: String = s"User '${authn.domain}:${authn.username}' locked out" - override val description:String = s"User '${authn.domain}:${authn.username}' has run too many queries that produce the same result at ${x.url}" + override val summary: String = s"User '${authn.domain}:${authn.username}' locked out." + override val description:String = s"User '${authn.domain}:${authn.username}' has run too many queries that produce the same result at ${x.url} ." } case class CrcCouldNotBeInvoked(crcUrl:String,request:ShrineRequest,x:CrcInvocationException) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) override val summary: String = s"Error communicating with I2B2 CRC." override val description: String = s"Error invoking the CRC at '$crcUrl' with a ${request.getClass.getSimpleName} due to ${throwable.get}." override val detailsXml =

Request is {request} {throwableDetail.getOrElse("")}
} case class AdapterMappingProblem(x:AdapterMappingException) extends AbstractProblem(ProblemSources.Adapter) { override val throwable = Some(x) - override val summary: String = s"Could not map query term(s)." + override val summary: String = "Could not map query term(s)." override val description = s"The Shrine Adapter on ${stamp.host} cannot map this query to its local terms." override val detailsXml =
Query Defitiontion is {x.runQueryRequest.queryDefinition} RunQueryRequest is ${x.runQueryRequest.elideAuthenticationInfo} {throwableDetail.getOrElse("")}
} diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala index 3610ec590..b9563283c 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala @@ -1,126 +1,125 @@ package net.shrine.aggregation import java.net.ConnectException import net.shrine.broadcaster.CouldNotParseResultsException import net.shrine.log.Loggable import net.shrine.problem.{ProblemNotYetEncoded, ProblemSources, AbstractProblem} import scala.concurrent.duration.Duration import net.shrine.protocol.ErrorResponse import net.shrine.protocol.Failure import net.shrine.protocol.NodeId import net.shrine.protocol.Result import net.shrine.protocol.SingleNodeResult import net.shrine.protocol.Timeout import net.shrine.protocol.BaseShrineResponse /** * * @author Clint Gilbert * @since Sep 16, 2011 * * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Represents the basic aggregation strategy shared by several aggregators: * - Parses a sequence of SpinResultEntries into a sequence of some * combination of valid responses, ErrorResponses, and invalid * responses (cases where ShrineResponse.fromXml returns None) * - Filters the valid responses, weeding out responses that aren't of * the expected type * Invokes an abstract method with the valid responses, errors, and * invalid responses. * * Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest) */ abstract class BasicAggregator[T <: BaseShrineResponse: Manifest] extends Aggregator with Loggable { private[aggregation] def isAggregatable(response: BaseShrineResponse): Boolean = { manifest[T].runtimeClass.isAssignableFrom(response.getClass) } import BasicAggregator._ override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): BaseShrineResponse = { val resultsOrErrors: Iterable[ParsedResult[T]] = { for { result <- results } yield { val parsedResponse: ParsedResult[T] = result match { case Result(origin, _, errorResponse: ErrorResponse) => Error(Option(origin), errorResponse) case Result(origin, elapsed, response: T) if isAggregatable(response) => Valid(origin, elapsed, response) case Timeout(origin) => Error(Option(origin), ErrorResponse(s"Timed out querying node '${origin.name}'")) //todo failure becomes an ErrorResponse and Error status type here. And the stack trace gets eaten. case Failure(origin, cause) => { cause match { case cx: ConnectException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, cx))) case cnprx:CouldNotParseResultsException => { if(cnprx.statusCode >= 400) Error(Option(origin), ErrorResponse(HttpErrorResponseProblem(cnprx))) else Error(Option(origin), ErrorResponse(CouldNotParseResultsProblem(cnprx))) } case x => Error(Option(origin), ErrorResponse(ProblemNotYetEncoded(s"Failure querying node ${origin.name}",x))) } } case _ => Invalid(None, s"Unexpected response in $getClass:\r\n $result") } parsedResponse } } val invalidResponses = resultsOrErrors.collect { case invalid: Invalid => invalid } val validResponses = resultsOrErrors.collect { case valid: Valid[T] => valid } val errorResponses: Iterable[Error] = resultsOrErrors.collect { case error: Error => error } //Log all parsing errors invalidResponses.map(_.errorMessage).foreach(this.error(_)) val previouslyDetectedErrors = errors.map(Error(None, _)) makeResponseFrom(validResponses, errorResponses ++ previouslyDetectedErrors, invalidResponses) } private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse } object BasicAggregator { private[aggregation] sealed abstract class ParsedResult[+T] private[aggregation] final case class Valid[T](origin: NodeId, elapsed: Duration, response: T) extends ParsedResult[T] private[aggregation] final case class Error(origin: Option[NodeId], response: ErrorResponse) extends ParsedResult[Nothing] private[aggregation] final case class Invalid(origin: Option[NodeId], errorMessage: String) extends ParsedResult[Nothing] } case class CouldNotConnectToAdapter(origin:NodeId,cx:ConnectException) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(cx) - override val summary: String = s"Shrine could not connect to the adapter." - override val description: String = s"Shrine could not connect to the adapter at ${origin.name} due to ${throwable.get}" + override val summary: String = "Shrine could not connect to the adapter." + override val description: String = s"Shrine could not connect to the adapter at ${origin.name} due to ${throwable.get}." } case class CouldNotParseResultsProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(cnrpx) - override val summary: String = s"Could not parse response." + override val summary: String = "Could not parse response." override val description = s"While parsing a response from ${cnrpx.url} with http code ${cnrpx.statusCode} caught '${cnrpx.cause}'" override val detailsXml =
Message body is {cnrpx.body} {throwableDetail.getOrElse("")}
} case class HttpErrorResponseProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) { override val throwable = Some(cnrpx) -// override val summary: String = s"Observed ${cnrpx.statusCode} and caught a ${cnrpx.cause.getClass.getSimpleName} while parsing a response from ${cnrpx.url}" - override val summary: String = s"Adapter error" - override val description = s"Observed http status code ${cnrpx.statusCode} from ${cnrpx.url} and caught '${cnrpx.cause}'" + override val summary: String = "Adapter error." + override val description = s"Observed http status code ${cnrpx.statusCode} from ${cnrpx.url} and caught ${cnrpx.cause}." override val detailsXml =
Message body is {cnrpx.body} {throwableDetail.getOrElse("")}
} \ No newline at end of file