diff --git a/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala b/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala
index 820f72f9e..bfe391c37 100644
--- a/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala
+++ b/adapter/adapter-api/src/main/scala/net/shrine/adapter/client/RemoteAdapterClient.scala
@@ -1,127 +1,127 @@
package net.shrine.adapter.client
import java.net.SocketTimeoutException
import net.shrine.problem.{ProblemNotYetEncoded, ProblemSources, AbstractProblem}
import org.xml.sax.SAXParseException
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.blocking
import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationInt
import scala.util.control.NonFatal
import scala.xml.{NodeSeq, XML}
import com.sun.jersey.api.client.ClientHandlerException
import net.shrine.client.{HttpResponse, TimeoutException, Poster}
import net.shrine.protocol.BroadcastMessage
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.NodeId
import net.shrine.protocol.Result
import scala.util.{Failure, Success, Try}
import net.shrine.protocol.ResultOutputType
/**
* @author clint
* @since Nov 15, 2013
*
*
*/
final class RemoteAdapterClient private (val poster: Poster, val breakdownTypes: Set[ResultOutputType]) extends AdapterClient {
import RemoteAdapterClient._
//NB: Overriding apply in the companion object screws up case-class code generation for some reason, so
//we add the would-have-been-generated methods here
override def toString = s"RemoteAdapterClient($poster)"
override def hashCode: Int = 31 * (if(poster == null) 1 else poster.hashCode)
override def equals(other: Any): Boolean = other match {
case that: RemoteAdapterClient if that != null => poster == that.poster
case _ => false
}
//TODO: Revisit this
import scala.concurrent.ExecutionContext.Implicits.global
override def query(request: BroadcastMessage): Future[Result] = {
val requestXml = request.toXml
Future {
blocking {
val response: HttpResponse = poster.post(requestXml.toString())
interpretResponse(response)
}
}.recover {
case e if isTimeout(e) => throw new TimeoutException(s"Invoking adapter at ${poster.url} timed out", e)
}
}
def interpretResponse(response:HttpResponse):Result = {
if(response.statusCode <= 400){
val responseXml = response.body
import scala.concurrent.duration._
//Should we know the NodeID here? It would let us make a better error response.
Try(XML.loadString(responseXml)).flatMap(Result.fromXml(breakdownTypes)) match {
case Success(result) => result
case Failure(x) => {
val errorResponse = x match {
case sx: SAXParseException => ErrorResponse(CouldNotParseXmlFromAdapter(poster.url,response.statusCode,responseXml,sx))
case _ => ErrorResponse(ProblemNotYetEncoded(s"Couldn't understand response from adapter at '${poster.url}': $responseXml", x))
}
Result(NodeId.Unknown, 0.milliseconds, errorResponse)
}
}
}
else {
Result(NodeId.Unknown,0.milliseconds,ErrorResponse(HttpErrorCodeFromAdapter(poster.url,response.statusCode,response.body)))
}
}
}
object RemoteAdapterClient {
def apply(poster: Poster, breakdownTypes: Set[ResultOutputType]): RemoteAdapterClient = {
//NB: Replicate URL-munging that used to be performed by JerseyAdapterClient
val posterToUse = {
if(poster.url.endsWith("requests")) { poster }
else { poster.mapUrl(_ + "/requests") }
}
new RemoteAdapterClient(posterToUse, breakdownTypes)
}
def isTimeout(e: Throwable): Boolean = e match {
case e: SocketTimeoutException => true
case e: ClientHandlerException => {
val cause = e.getCause
cause != null && cause.isInstanceOf[SocketTimeoutException]
}
case _ => false
}
}
case class HttpErrorCodeFromAdapter(url:String,statusCode:Int,responseBody:String) extends AbstractProblem(ProblemSources.Adapter) {
- override def summary: String = s"Hub received a fatal error response"
+ override def summary: String = "Hub received a fatal error response"
override def description: String = s"Hub received error code $statusCode from the adapter at $url"
override def detailsXml:NodeSeq = {s"Http response body was $responseBody"}
}
case class CouldNotParseXmlFromAdapter(url:String,statusCode:Int,responseBody:String,saxx: SAXParseException) extends AbstractProblem(ProblemSources.Adapter) {
override def throwable = Some(saxx)
override def summary: String = s"Hub could not parse response from adapter"
override def description: String = s"Hub could not parse xml from $url due to ${saxx.toString}"
override def detailsXml:NodeSeq =
{s"Http response code was $statusCode and the body was $responseBody"}
{throwableDetail}
}
\ No newline at end of file
diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
index 35577f1a3..c93d74968 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/Adapter.scala
@@ -1,79 +1,79 @@
package net.shrine.adapter
import java.net.InetAddress
import net.shrine.log.Loggable
import net.shrine.problem.{Problem, ProblemNotYetEncoded, LoggingProblemHandler, ProblemSources, AbstractProblem}
import net.shrine.protocol.{ShrineRequest, BroadcastMessage, ErrorResponse, BaseShrineResponse, AuthenticationInfo}
/**
* @author Bill Simons
* @since 4/8/11
* @see http://cbmi.med.harvard.edu
* @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @see http://www.gnu.org/licenses/lgpl.html
*/
abstract class Adapter extends Loggable {
final def perform(message: BroadcastMessage): BaseShrineResponse = {
def problemToErrorResponse(problem:Problem):ErrorResponse = {
LoggingProblemHandler.handleProblem(problem)
ErrorResponse(problem)
}
val shrineResponse = try {
processRequest(message)
} catch {
case e: AdapterLockoutException => problemToErrorResponse(AdapterLockout(message.request.authn,e))
case e @ CrcInvocationException(invokedCrcUrl, request, cause) => problemToErrorResponse(CrcCouldNotBeInvoked(invokedCrcUrl,request,e))
case e: AdapterMappingException => problemToErrorResponse(AdapterMappingProblem(e))
//noinspection RedundantBlock
case e: Exception => {
val summary = if(message == null) "Unknown problem in Adapter.perform with null BroadcastMessage"
else s"Unexpected exception in Adapter"
problemToErrorResponse(ProblemNotYetEncoded(summary,e))
}
}
shrineResponse
}
protected[adapter] def processRequest(message: BroadcastMessage): BaseShrineResponse
//NOOP, may be overridden by subclasses
def shutdown(): Unit = ()
}
case class AdapterLockout(authn:AuthenticationInfo,x:AdapterLockoutException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
- override val summary: String = s"User '${authn.domain}:${authn.username}' locked out"
- override val description:String = s"User '${authn.domain}:${authn.username}' has run too many queries that produce the same result at ${x.url}"
+ override val summary: String = s"User '${authn.domain}:${authn.username}' locked out."
+ override val description:String = s"User '${authn.domain}:${authn.username}' has run too many queries that produce the same result at ${x.url} ."
}
case class CrcCouldNotBeInvoked(crcUrl:String,request:ShrineRequest,x:CrcInvocationException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
override val summary: String = s"Error communicating with I2B2 CRC."
override val description: String = s"Error invoking the CRC at '$crcUrl' with a ${request.getClass.getSimpleName} due to ${throwable.get}."
override val detailsXml =
Request is {request}
{throwableDetail.getOrElse("")}
}
case class AdapterMappingProblem(x:AdapterMappingException) extends AbstractProblem(ProblemSources.Adapter) {
override val throwable = Some(x)
- override val summary: String = s"Could not map query term(s)."
+ override val summary: String = "Could not map query term(s)."
override val description = s"The Shrine Adapter on ${stamp.host} cannot map this query to its local terms."
override val detailsXml =
Query Defitiontion is {x.runQueryRequest.queryDefinition}
RunQueryRequest is ${x.runQueryRequest.elideAuthenticationInfo}
{throwableDetail.getOrElse("")}
}
diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
index 3610ec590..b9563283c 100644
--- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
+++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/aggregation/BasicAggregator.scala
@@ -1,126 +1,125 @@
package net.shrine.aggregation
import java.net.ConnectException
import net.shrine.broadcaster.CouldNotParseResultsException
import net.shrine.log.Loggable
import net.shrine.problem.{ProblemNotYetEncoded, ProblemSources, AbstractProblem}
import scala.concurrent.duration.Duration
import net.shrine.protocol.ErrorResponse
import net.shrine.protocol.Failure
import net.shrine.protocol.NodeId
import net.shrine.protocol.Result
import net.shrine.protocol.SingleNodeResult
import net.shrine.protocol.Timeout
import net.shrine.protocol.BaseShrineResponse
/**
*
* @author Clint Gilbert
* @since Sep 16, 2011
*
* @see http://cbmi.med.harvard.edu
*
* This software is licensed under the LGPL
* @see http://www.gnu.org/licenses/lgpl.html
*
* Represents the basic aggregation strategy shared by several aggregators:
* - Parses a sequence of SpinResultEntries into a sequence of some
* combination of valid responses, ErrorResponses, and invalid
* responses (cases where ShrineResponse.fromXml returns None)
* - Filters the valid responses, weeding out responses that aren't of
* the expected type
* Invokes an abstract method with the valid responses, errors, and
* invalid responses.
*
* Needs to be an abstract class instead of a trait due to the view bound on T (: Manifest)
*/
abstract class BasicAggregator[T <: BaseShrineResponse: Manifest] extends Aggregator with Loggable {
private[aggregation] def isAggregatable(response: BaseShrineResponse): Boolean = {
manifest[T].runtimeClass.isAssignableFrom(response.getClass)
}
import BasicAggregator._
override def aggregate(results: Iterable[SingleNodeResult], errors: Iterable[ErrorResponse]): BaseShrineResponse = {
val resultsOrErrors: Iterable[ParsedResult[T]] = {
for {
result <- results
} yield {
val parsedResponse: ParsedResult[T] = result match {
case Result(origin, _, errorResponse: ErrorResponse) => Error(Option(origin), errorResponse)
case Result(origin, elapsed, response: T) if isAggregatable(response) => Valid(origin, elapsed, response)
case Timeout(origin) => Error(Option(origin), ErrorResponse(s"Timed out querying node '${origin.name}'"))
//todo failure becomes an ErrorResponse and Error status type here. And the stack trace gets eaten.
case Failure(origin, cause) => {
cause match {
case cx: ConnectException => Error(Option(origin), ErrorResponse(CouldNotConnectToAdapter(origin, cx)))
case cnprx:CouldNotParseResultsException => {
if(cnprx.statusCode >= 400) Error(Option(origin), ErrorResponse(HttpErrorResponseProblem(cnprx)))
else Error(Option(origin), ErrorResponse(CouldNotParseResultsProblem(cnprx)))
}
case x => Error(Option(origin), ErrorResponse(ProblemNotYetEncoded(s"Failure querying node ${origin.name}",x)))
}
}
case _ => Invalid(None, s"Unexpected response in $getClass:\r\n $result")
}
parsedResponse
}
}
val invalidResponses = resultsOrErrors.collect { case invalid: Invalid => invalid }
val validResponses = resultsOrErrors.collect { case valid: Valid[T] => valid }
val errorResponses: Iterable[Error] = resultsOrErrors.collect { case error: Error => error }
//Log all parsing errors
invalidResponses.map(_.errorMessage).foreach(this.error(_))
val previouslyDetectedErrors = errors.map(Error(None, _))
makeResponseFrom(validResponses, errorResponses ++ previouslyDetectedErrors, invalidResponses)
}
private[aggregation] def makeResponseFrom(validResponses: Iterable[Valid[T]], errorResponses: Iterable[Error], invalidResponses: Iterable[Invalid]): BaseShrineResponse
}
object BasicAggregator {
private[aggregation] sealed abstract class ParsedResult[+T]
private[aggregation] final case class Valid[T](origin: NodeId, elapsed: Duration, response: T) extends ParsedResult[T]
private[aggregation] final case class Error(origin: Option[NodeId], response: ErrorResponse) extends ParsedResult[Nothing]
private[aggregation] final case class Invalid(origin: Option[NodeId], errorMessage: String) extends ParsedResult[Nothing]
}
case class CouldNotConnectToAdapter(origin:NodeId,cx:ConnectException) extends AbstractProblem(ProblemSources.Hub) {
override val throwable = Some(cx)
- override val summary: String = s"Shrine could not connect to the adapter."
- override val description: String = s"Shrine could not connect to the adapter at ${origin.name} due to ${throwable.get}"
+ override val summary: String = "Shrine could not connect to the adapter."
+ override val description: String = s"Shrine could not connect to the adapter at ${origin.name} due to ${throwable.get}."
}
case class CouldNotParseResultsProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) {
override val throwable = Some(cnrpx)
- override val summary: String = s"Could not parse response."
+ override val summary: String = "Could not parse response."
override val description = s"While parsing a response from ${cnrpx.url} with http code ${cnrpx.statusCode} caught '${cnrpx.cause}'"
override val detailsXml =
Message body is {cnrpx.body}
{throwableDetail.getOrElse("")}
}
case class HttpErrorResponseProblem(cnrpx:CouldNotParseResultsException) extends AbstractProblem(ProblemSources.Hub) {
override val throwable = Some(cnrpx)
-// override val summary: String = s"Observed ${cnrpx.statusCode} and caught a ${cnrpx.cause.getClass.getSimpleName} while parsing a response from ${cnrpx.url}"
- override val summary: String = s"Adapter error"
- override val description = s"Observed http status code ${cnrpx.statusCode} from ${cnrpx.url} and caught '${cnrpx.cause}'"
+ override val summary: String = "Adapter error."
+ override val description = s"Observed http status code ${cnrpx.statusCode} from ${cnrpx.url} and caught ${cnrpx.cause}."
override val detailsXml =
Message body is {cnrpx.body}
{throwableDetail.getOrElse("")}
}
\ No newline at end of file