diff --git a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
index 4131ebb6b..94a6a49ce 100644
--- a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
+++ b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
@@ -1,281 +1,318 @@
package net.shrine.qep
import net.shrine.aggregation.{Aggregator, Aggregators, DeleteQueryAggregator, FlagQueryAggregator, ReadInstanceResultsAggregator, ReadQueryDefinitionAggregator, RenameQueryAggregator, RunQueryAggregator, UnFlagQueryAggregator}
+import net.shrine.audit.NetworkQueryId
import net.shrine.authentication.AuthenticationResult.Authenticated
import net.shrine.authentication.{AuthenticationResult, Authenticator, NotAuthenticatedException}
import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized}
import net.shrine.authorization.QueryAuthorizationService
import net.shrine.broadcaster.BroadcastAndAggregationService
import net.shrine.log.Loggable
+import net.shrine.problem.{AbstractProblem, ProblemSources}
import net.shrine.protocol.{AggregatedReadInstanceResultsResponse, AggregatedRunQueryResponse, AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, Credential, DeleteQueryRequest, FlagQueryRequest, QueryInstance, QueryResult, ReadApprovedQueryTopicsRequest, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, ReadQueryDefinitionRequest, ReadQueryInstancesRequest, ReadQueryInstancesResponse, ReadResultOutputTypesRequest, ReadResultOutputTypesResponse, RenameQueryRequest, ResultOutputType, RunQueryRequest, UnFlagQueryRequest}
import net.shrine.qep.audit.QepAuditDb
import net.shrine.qep.dao.AuditDao
import net.shrine.qep.querydb.QepQueryDb
import net.shrine.util.XmlDateHelper
-import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+import scala.util.control.NonFatal
+import scala.xml.NodeSeq
/**
* @author clint
* @since Feb 19, 2014
*/
trait AbstractQepService[BaseResp <: BaseShrineResponse] extends Loggable {
val commonName:String
val auditDao: AuditDao
val authenticator: Authenticator
val authorizationService: QueryAuthorizationService
val includeAggregateResult: Boolean
val broadcastAndAggregationService: BroadcastAndAggregationService
val queryTimeout: Duration
val breakdownTypes: Set[ResultOutputType]
val collectQepAudit:Boolean
protected def doReadResultOutputTypes(request: ReadResultOutputTypesRequest): BaseResp = {
info(s"doReadResultOutputTypes($request)")
authenticateAndThen(request) { authResult =>
val resultOutputTypes = ResultOutputType.nonErrorTypes ++ breakdownTypes
//TODO: XXX: HACK: Would like to remove the cast
ReadResultOutputTypesResponse(resultOutputTypes).asInstanceOf[BaseResp]
}
}
protected def doFlagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = {
authenticateAndThen(request) { authResult =>
QepQueryDb.db.insertQepQueryFlag(request)
doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast,authResult)
}
}
protected def doUnFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = {
authenticateAndThen(request) { authResult =>
QepQueryDb.db.insertQepQueryFlag(request)
doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast,authResult)
}
}
protected def doRunQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor")
//store the query in the qep's database
doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast,authResult)
}
}
protected def doReadQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doReadQueryDefinition($request,$shouldBroadcast)")
doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast,authResult)
}
}
protected def doReadInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doReadInstanceResults($request,$shouldBroadcast)")
val networkId = request.shrineNetworkQueryId
//read from the QEP database code here. Only broadcast if some result is in some sketchy state
val resultsFromDb: Seq[QueryResult] = QepQueryDb.db.selectMostRecentQepResultsFor(networkId)
//If any query result was pending
val response = if (resultsFromDb.nonEmpty && (!resultsFromDb.exists(!_.statusType.isDone))) {
debug(s"Using qep cached results for query $networkId")
AggregatedReadInstanceResultsResponse(networkId, resultsFromDb).asInstanceOf[BaseResp]
}
else {
debug(s"Requesting results for $networkId from network")
val response = doBroadcastQuery(request, new ReadInstanceResultsAggregator(networkId, false), shouldBroadcast,authResult)
//put the new results in the database if we got what we wanted
response match {
case arirr: AggregatedReadInstanceResultsResponse => arirr.results.foreach(r => QepQueryDb.db.insertQueryResult(networkId, r))
case _ => //do nothing
}
response
}
response
}
}
protected def doReadQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doReadQueryInstances($request,$shouldBroadcast)")
val now = XmlDateHelper.now
val networkQueryId = request.networkQueryId
val username = request.authn.username
val groupId = request.projectId
//NB: Return a dummy response, with a dummy QueryInstance containing the network (Shrine) id of the query we'd like
//to get "instances" for. This allows the legacy web client to formulate a request for query results that Shrine
//can understand, while meeting the conversational requirements of the legacy web client.
val instance = QueryInstance(networkQueryId.toString, networkQueryId.toString, username, groupId, now, now)
//TODO: XXX: HACK: Would like to remove the cast
//NB: Munge in username from authentication result
ReadQueryInstancesResponse(networkQueryId, authResult.username, groupId, Seq(instance)).asInstanceOf[BaseResp]
}
}
protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): ReadPreviousQueriesResponse = {
authenticateAndThen(request){ authResult =>
info(s"doReadPreviousQueries($request,$shouldBroadcast)")
- //todo if any results are in one of the pending states go ahead and request them async (has to wait for async Shrine)
+ //todo if any results are in one of the pending states go ahead and request them async (has to wait for async Shrine 1.24)
//pull queries from the local database.
QepQueryDb.db.selectPreviousQueries(request)
}
}
protected def doRenameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doRenameQuery($request,$shouldBroadcast)")
QepQueryDb.db.renamePreviousQuery(request)
doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast,authResult)
}
}
protected def doDeleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
info(s"doDeleteQuery($request,$shouldBroadcast)")
QepQueryDb.db.markDeleted(request)
doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast,authResult)
}
}
protected def doReadApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseResp = authenticateAndThen(request) { _ =>
info(s"doReadApprovedQueryTopics($request,$shouldBroadcast)")
//TODO: XXX: HACK: Would like to remove the cast
authorizationService.readApprovedEntries(request) match {
case Left(errorResponse) => errorResponse.asInstanceOf[BaseResp]
case Right(validResponse) => validResponse.asInstanceOf[BaseResp]
}
}
import broadcastAndAggregationService.sendAndAggregate
protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean, authResult:Authenticated): BaseResp = {
debug(s"doBroadcastQuery($request) authResult is $authResult")
//NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another
//NB: Only audit RunQueryRequests
//When making BroadcastMessages
val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false))
def queryHub( authorizedRequest: RunQueryRequest): Unit = {
- //todo here's the part that sends the request to the hub - tracking SHRINE-2140
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
- //todo update comment with SHRINE-2149
- //Future[BaseShrineResponse] that this code fires off to the hub, then records the response in the database
- sendAndAggregate(networkAuthn,authorizedRequest,aggregator,shouldBroadcast).transform (
- { hubResponse => hubResponse match {
- //todo here's the part that puts results in the QEP cache database - tracking SHRINE-2140
- //todo once queries arrive at the QEP via a queue, no need to put them into the database. They can just fall on the floor
+ info(s"Sending RunQueryRequest ${authorizedRequest.networkQueryId} to the Hub")
+ //Future[Unit] that this code fires off to the hub, then handles the BaseShrineResponse
+ sendAndAggregate(networkAuthn,authorizedRequest,aggregator,shouldBroadcast).transform ( { hubResponse:BaseShrineResponse =>
+ debug(s"Received $hubResponse for ${authorizedRequest.networkQueryId}")
+ hubResponse match {
case aggregated: AggregatedRunQueryResponse =>
+ info(s"Received ${aggregated.statusTypeName} for ${authorizedRequest.networkQueryId}")
blocking {
+ //here's the part that puts results in the QEP cache database
+ //todo once queries arrive at the QEP via a queue, no need to put them into the database. They can just fall on the floor
+ //todo remove with SHRINE-2149
+ //todo record the query's state in a way that will stop polling in 1.23. See SHRINE-2148
aggregated.results.foreach(QepQueryDb.db.insertQueryResult(authorizedRequest.networkQueryId, _))
+ debug(s"Recorded ${authorizedRequest.networkQueryId} aggregated response in the QEP database")
}
- //todo here's the AggregatedRunQueryResponse returning - tracking SHRINE-2140
- //todo record the query's state in a way that will stop polling in 1.23. See SHRINE-2148
- case _ =>
- debug(s"Unanticipated response type $hubResponse")
+ case _ => IncorrectResponseFromHub(hubResponse,authorizedRequest.networkQueryId)
}
- hubResponse
}
- , throwable => //todo log the throwable
+ , throwable => {
+ throwable match {
+ case NonFatal(t) => ExceptionWhileHubRanQuery(t, authorizedRequest.networkQueryId)
+ case _ => //Let the infrastructure handle fatal exceptions
+ }
throwable
+ }
)
}
request match {
case runQueryRequest: RunQueryRequest =>
// inject modified, authorized runQueryRequest
//although it might make more sense to put this whole if block in the aggregator, the RunQueryAggregator lives in the hub, far from this DB code
auditAuthorizeAndThen(runQueryRequest) { authorizedRequest =>
debug(s"doBroadcastQuery authorizedRequest is $authorizedRequest")
// tuck the ACT audit metrics data into a database here
if (collectQepAudit) QepAuditDb.db.insertQepQuery(authorizedRequest,commonName)
QepQueryDb.db.insertQepQuery(authorizedRequest)
queryHub(authorizedRequest)
val response = AggregatedRunQueryResponse(
queryId = runQueryRequest.networkQueryId,
createDate = XmlDateHelper.now,
userId = networkAuthn.username,
groupId = networkAuthn.domain,
requestXml = runQueryRequest.queryDefinition,
queryInstanceId = runQueryRequest.networkQueryId,
results = Seq.empty,
statusTypeName = "RECEIVED_BY_QEP" //todo figure out the right statuses for 1.23. See SHRINE-2148
)
response.asInstanceOf[BaseResp]
}
case _ => doSynchronousQuery(networkAuthn,request,aggregator,shouldBroadcast)
}
}
private def doSynchronousQuery(networkAuthn: AuthenticationInfo,request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean) = {
info(s"doSynchronousQuery($request) started")
val response = waitFor(sendAndAggregate(networkAuthn, request, aggregator, shouldBroadcast)).asInstanceOf[BaseResp]
info(s"doSynchronousQuery($request) completed with response $response")
response
}
private[qep] val runQueryAggregatorFor: RunQueryRequest => RunQueryAggregator = Aggregators.forRunQueryRequest(includeAggregateResult)
protected def waitFor[R](futureResponse: Future[R]): R = {
XmlDateHelper.time("Waiting for aggregated results")(debug(_)) {
Await.result(futureResponse, queryTimeout)
}
}
private[qep] def auditAuthorizeAndThen[T](request: RunQueryRequest)(body: (RunQueryRequest => T)): T = {
auditTransactionally(request) {
debug(s"auditAuthorizeAndThen($request) with $authorizationService")
val authorizedRequest = authorizationService.authorizeRunQueryRequest(request) match {
case na: NotAuthorized => throw na.toException
case authorized: Authorized => request.copy(topicName = authorized.topicIdAndName.map(x => x._2))
}
body(authorizedRequest)
}
}
private[qep] def auditTransactionally[T](request: RunQueryRequest)(body: => T): T = {
try { body } finally {
auditDao.addAuditEntry(
request.projectId,
request.authn.domain,
request.authn.username,
request.queryDefinition.toI2b2String, //TODO: Use i2b2 format Still?
request.topicId)
}
}
//todo move auth code with SHRINE-1322
import AuthenticationResult._
private[qep] def authenticateAndThen[T](request: BaseShrineRequest)(f: Authenticated => T): T = {
val authResult = authenticator.authenticate(request.authn)
authResult match {
case a: Authenticated => f(a)
case na:NotAuthenticated => throw NotAuthenticatedException(na)
}
}
+}
+
+case class ExceptionWhileHubRanQuery(t: Throwable,networkQueryId: NetworkQueryId) extends AbstractProblem(ProblemSources.Qep) {
+
+ override val throwable = Some(t)
+
+ override def summary: String = s"${t.getClass.getSimpleName} encountered in an http call to run $networkQueryId query at the hub."
+
+ override def description: String = "The QEP generated an exception while making an http call to run a query at the hub."
+
+ override def detailsXml: NodeSeq = NodeSeq.fromSeq(
+ networkQueryId is {networkQueryId}
+ {throwableDetail.getOrElse("")}
+ )
+
+}
+
+case class IncorrectResponseFromHub(hubResponse:BaseShrineResponse,networkQueryId: NetworkQueryId) extends AbstractProblem(ProblemSources.Qep) {
+
+ override def summary: String = s"The hub responded to query $networkQueryId with a ${hubResponse.getClass.getSimpleName}, not a ${classOf[AggregatedRunQueryResponse].getSimpleName}"
+
+ override def description: String = s"The hub responded with something other than a ${classOf[AggregatedRunQueryResponse].getSimpleName} to the QEP's run query request."
+
+ override def detailsXml: NodeSeq = NodeSeq.fromSeq(
+
+ networkQueryId is {networkQueryId}
+ hubResponse is {hubResponse}
+ )
}
\ No newline at end of file