diff --git a/apps/shrine-app/src/main/scala/net/shrine/status/StatusJaxrs.scala b/apps/shrine-app/src/main/scala/net/shrine/status/StatusJaxrs.scala index 77aef670a..812ff5175 100644 --- a/apps/shrine-app/src/main/scala/net/shrine/status/StatusJaxrs.scala +++ b/apps/shrine-app/src/main/scala/net/shrine/status/StatusJaxrs.scala @@ -1,270 +1,291 @@ package net.shrine.status import javax.ws.rs.{GET, Path, Produces, WebApplicationException} import javax.ws.rs.core.{MediaType, Response} import com.sun.jersey.spi.container.{ContainerRequest, ContainerRequestFilter} import com.typesafe.config.{Config => TsConfig} import net.shrine.authorization.StewardQueryAuthorizationService import net.shrine.broadcaster.{Broadcaster, NodeHandle} import net.shrine.wiring.ShrineOrchestrator import org.json4s.{DefaultFormats, Formats} import org.json4s.native.Serialization import net.shrine.log.Loggable import scala.collection.JavaConverters._ import scala.collection.immutable.{Seq, Map, Set} import net.shrine.config.ConfigExtensions import net.shrine.crypto.SigningCertStrategy import net.shrine.protocol.query.{OccuranceLimited, QueryDefinition, Term} import net.shrine.protocol.{AuthenticationInfo, BroadcastMessage, Credential, Failure, Result, ResultOutputType, RunQueryRequest, Timeout} import net.shrine.util.Versions import scala.concurrent.Await import scala.util.Try /** * A subservice that shares internal state of the shrine servlet. * * @author david * @since 12/2/15 */ @Path("/internalstatus") @Produces(Array(MediaType.APPLICATION_JSON)) case class StatusJaxrs(shrineConfig:TsConfig) extends Loggable { implicit def json4sFormats: Formats = DefaultFormats @GET @Path("version") def version: String = { val version = Version("changeMe") val versionString = Serialization.write(version) versionString } @GET @Path("config") def config: String = { //todo probably better to reach out and grab the config from ManuallyWiredShrineJaxrsResources once it is a singleton Serialization.write(Json4sConfig(shrineConfig)) } @GET @Path("summary") def summary: String = { val summary = Summary() Serialization.write(summary) } @GET @Path("optionalParts") def optionalParts: String = { val optionalParts = OptionalParts() Serialization.write(optionalParts) } @GET @Path("hub") def hub: String = { val hub = Hub() Serialization.write(hub) } @GET @Path("adapter") def adapter: String = { val adapter = Adapter() Serialization.write(adapter) } } case class DownstreamNode(name:String, url:String) +case class Qep( + maxQueryWaitTimeMillis:Long, + create:Boolean, + attachSigningCert:Boolean, + authorizationType:String, + includeAggregateResults:Boolean, + authenticationType:String + ) + +object Qep{ + val key = "shrine.queryEntryPoint." + def apply():Qep = new Qep( + maxQueryWaitTimeMillis = ShrineOrchestrator.queryEntryPointComponents.fold(0L)(_.i2b2Service.queryTimeout.toMicros), + create = ShrineOrchestrator.queryEntryPointComponents.isDefined, + attachSigningCert = ShrineOrchestrator.queryEntryPointComponents.fold(false)(_.i2b2Service.broadcastAndAggregationService.attachSigningCert), + authorizationType = ShrineOrchestrator.queryEntryPointComponents.fold("")(_.i2b2Service.authorizationService.getClass.getSimpleName), + includeAggregateResults = ShrineOrchestrator.queryEntryPointComponents.fold(false)(_.i2b2Service.includeAggregateResult), + authenticationType = ShrineOrchestrator.queryEntryPointComponents.fold("")(_.i2b2Service.authenticator.getClass.getName) + ) +} + object DownstreamNodes { def get():Seq[DownstreamNode] = { ShrineOrchestrator.hubComponents.fold(Seq.empty[DownstreamNode])(_.broadcastDestinations.map(DownstreamNode(_)).to[Seq]) } } object DownstreamNode { def apply(nodeHandle: NodeHandle): DownstreamNode = new DownstreamNode( nodeHandle.nodeId.name, nodeHandle.client.url.map(_.toString).getOrElse("not applicable")) } case class Adapter(crcEndpointUrl:String, setSizeObfuscation:Boolean, adapterLockoutAttemptsThreshold:Int, adapterMappingsFilename:String) object Adapter{ def apply():Adapter = { val crcEndpointUrl = ShrineOrchestrator.adapterComponents.fold("")(_.i2b2AdminService.crcUrl) val setSizeObfuscation = ShrineOrchestrator.adapterComponents.fold(false)(_.i2b2AdminService.obfuscate) val adapterLockoutAttemptsThreshold = ShrineOrchestrator.adapterComponents.fold(0)(_.i2b2AdminService.adapterLockoutAttemptsThreshold) val adapterMappingsFileName = ShrineOrchestrator.adapterComponents.fold("")(_.adapterMappings.source) Adapter(crcEndpointUrl, setSizeObfuscation, adapterLockoutAttemptsThreshold, adapterMappingsFileName) } } case class Hub(shouldQuerySelf:Boolean, //todo don't use this field any more. Drop it when possible create:Boolean, downstreamNodes:Seq[DownstreamNode]) object Hub{ def apply():Hub = { val shouldQuerySelf = false val create = ShrineOrchestrator.hubComponents.isDefined val downstreamNodes = DownstreamNodes.get() Hub(shouldQuerySelf, create, downstreamNodes) } } case class OptionalParts(isHub:Boolean, stewardEnabled:Boolean, shouldQuerySelf:Boolean, //todo don't use this field any more. Drop it when possible downstreamNodes:Seq[DownstreamNode]) object OptionalParts { def apply(): OptionalParts = { OptionalParts( ShrineOrchestrator.hubComponents.isDefined, ShrineOrchestrator.queryEntryPointComponents.fold(false)(_.shrineService.authorizationService.isInstanceOf[StewardQueryAuthorizationService]), false, DownstreamNodes.get() ) } } case class Summary( isHub:Boolean, shrineVersion:String, shrineBuildDate:String, ontologyVersion:String, ontologyTerm:String, adapterOk:Boolean, keystoreOk:Boolean, hubOk:Boolean, qepOk:Boolean ) object Summary { val term = Term(ShrineOrchestrator.shrineConfig.getString("networkStatusQuery")) def runQueryRequest: BroadcastMessage = { val domain = "happy" val username = "happy" val networkAuthn = AuthenticationInfo(domain, username, Credential("", isToken = false)) val queryDefinition = QueryDefinition("TestQuery", OccuranceLimited(1, term)) import scala.concurrent.duration._ val req = RunQueryRequest( "happyProject", 3.minutes, networkAuthn, None, None, Set(ResultOutputType.PATIENT_COUNT_XML), queryDefinition) ShrineOrchestrator.signerVerifier.sign(BroadcastMessage(req.networkQueryId, networkAuthn, req), SigningCertStrategy.Attach) } def apply(): Summary = { val message = runQueryRequest val adapterOk: Boolean = ShrineOrchestrator.adapterService.fold(true) { adapterService => import scala.concurrent.duration._ val start = System.currentTimeMillis val resultAttempt: Try[Result] = Try(adapterService.handleRequest(message)) val end = System.currentTimeMillis val elapsed = (end - start).milliseconds resultAttempt match { case scala.util.Failure(cause) => false case scala.util.Success(response) => true } } val hubOk = ShrineOrchestrator.hubComponents.fold(true){ hubComponents => val maxQueryWaitTime = hubComponents.broadcasterMultiplexerService.maxQueryWaitTime val broadcaster: Broadcaster = hubComponents.broadcasterMultiplexerService.broadcaster val message = runQueryRequest val multiplexer = broadcaster.broadcast(message) val responses = Await.result(multiplexer.responses, maxQueryWaitTime).toSeq val failures = responses.collect { case f: Failure => f } val timeouts = responses.collect { case t: Timeout => t } val validResults = responses.collect { case r: Result => r } failures.isEmpty && timeouts.isEmpty && (validResults.size == broadcaster.destinations.size ) } Summary( isHub = ShrineOrchestrator.hubComponents.isDefined, shrineVersion = Versions.version, shrineBuildDate = Versions.buildDate, ontologyVersion = ShrineOrchestrator.ontologyMetadata.ontologyVersion, ontologyTerm = term.value, adapterOk = adapterOk, keystoreOk = true, //todo something for this hubOk = hubOk, qepOk = true //todo something for this ) } } case class Version(version:String) //todo SortedMap when possible case class Json4sConfig(keyValues:Map[String,String]) object Json4sConfig{ def isPassword(key:String):Boolean = { if(key.toLowerCase.contains("password")) true else false } def apply(config:TsConfig):Json4sConfig = { val entries: Set[(String, String)] = config.entrySet.asScala.to[Set].map(x => (x.getKey,x.getValue.render())).filterNot(x => isPassword(x._1)) val sortedMap: Map[String, String] = entries.toMap Json4sConfig(sortedMap) } } class PermittedHostOnly extends ContainerRequestFilter { //todo generalize for happy, too //todo for tomcat 8 see https://jersey.java.net/documentation/latest/filters-and-interceptors.html for a cleaner version //shell code from http://stackoverflow.com/questions/17143514/how-to-add-custom-response-and-abort-request-in-jersey-1-11-filters //how to apply in http://stackoverflow.com/questions/4358213/how-does-one-intercept-a-request-during-the-jersey-lifecycle override def filter(requestContext: ContainerRequest): ContainerRequest = { val hostOfOrigin = requestContext.getBaseUri.getHost val shrineConfig:TsConfig = ShrineOrchestrator.config val permittedHostOfOrigin:String = shrineConfig.getOption("shrine.status.permittedHostOfOrigin",_.getString).getOrElse("localhost") val path = requestContext.getPath //happy and internalstatus API calls must come from the same host as tomcat is running on (hopefully the dashboard servlet). // todo access to the happy service permitted for SHRINE 1.21 per SHRINE-1366 // restrict access to happy service when database work resumes as part of SHRINE- // if ((path.contains("happy") || path.contains("internalstatus")) && (hostOfOrigin != permittedHostOfOrigin)) { if (path.contains("internalstatus") && (hostOfOrigin != permittedHostOfOrigin)) { val response = Response.status(Response.Status.UNAUTHORIZED).entity(s"Only available from $permittedHostOfOrigin, not $hostOfOrigin, controlled by shrine.status.permittedHostOfOrigin in shrine.conf").build() throw new WebApplicationException(response) } else requestContext } } \ No newline at end of file diff --git a/commons/config/src/main/scala/net/shrine/config/Keys.scala b/commons/config/src/main/scala/net/shrine/config/Keys.scala index 53ecf932f..eae99130a 100644 --- a/commons/config/src/main/scala/net/shrine/config/Keys.scala +++ b/commons/config/src/main/scala/net/shrine/config/Keys.scala @@ -1,32 +1,30 @@ package net.shrine.config /** * @author clint * @since Jan 17, 2014 * * Keys for Shrine */ //todo distribute to where they are used once the rest of config is cleaned up object Keys { val crcEndpoint = "crcEndpoint" val shrineSteward = "shrineSteward" val ontProjectId = "ontProjectId" val crcProjectId = "crcProjectId" val setSizeObfuscation = "setSizeObfuscation" val isAdapter = "isAdapter" val isBroadcaster = "isBroadcaster" - val includeAggregateResults = "includeAggregateResults" val adapterLockoutAttemptsThreshold = "adapterLockoutAttemptsThreshold" val adapterMappingsFileName = "adapterMappingsFileName" val adapterMappingsFileType = "adapterMappingsFileType" val downstreamNodes = "downstreamNodes" val maxSignatureAge = "maxSignatureAge" val adapter = "adapter" val hub = "hub" val queryEntryPoint = "queryEntryPoint" //todo remove once it's not used anymore val broadcasterIsLocal = "broadcasterIsLocal" val broadcasterServiceEndpoint = "broadcasterServiceEndpoint" val immediatelyRunIncomingQueries = "immediatelyRunIncomingQueries" val authenticationType = "authenticationType" - val attachSigningCert = "attachSigningCert" } diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BroadcastAndAggregationService.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BroadcastAndAggregationService.scala index 6125eff50..9fe94cb84 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BroadcastAndAggregationService.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/BroadcastAndAggregationService.scala @@ -1,49 +1,53 @@ package net.shrine.broadcaster import net.shrine.log.Loggable import net.shrine.protocol.ShrineRequest import scala.concurrent.Future import net.shrine.protocol.ShrineResponse import net.shrine.aggregation.Aggregator import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.RunQueryRequest import net.shrine.aggregation.RunQueryAggregator import net.shrine.aggregation.ReadQueryResultAggregator import net.shrine.protocol.BaseShrineResponse import net.shrine.protocol.BaseShrineRequest import net.shrine.protocol.AuthenticationInfo /** * @author clint * @since Mar 13, 2013 */ trait BroadcastAndAggregationService extends Loggable { def sendAndAggregate(message: BroadcastMessage, aggregator: Aggregator, shouldBroadcast: Boolean): Future[BaseShrineResponse] def sendAndAggregate(networkAuthn: AuthenticationInfo, request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean): Future[BaseShrineResponse] = { val (queryId, requestToSend) = addQueryId(request) val broadcastMessage = BroadcastMessage(queryId.getOrElse(newQueryId), networkAuthn, requestToSend) val aggregatorWithCorrectQueryId = addQueryId(broadcastMessage, aggregator) sendAndAggregate(broadcastMessage, aggregatorWithCorrectQueryId, shouldBroadcast) } protected[broadcaster] def addQueryId(request: BaseShrineRequest): (Option[Long], BaseShrineRequest) = request match { case runQueryReq: RunQueryRequest if runQueryReq.networkQueryId == -1L => { val queryId = newQueryId debug(s"Replaced id ${runQueryReq.networkQueryId} with $queryId") (Some(queryId), runQueryReq.withNetworkQueryId(queryId)) } case _ => (None, request) } protected[broadcaster] def addQueryId(message: BroadcastMessage, aggregator: Aggregator): Aggregator = aggregator match { case runQueryAggregator: RunQueryAggregator => runQueryAggregator.withQueryId(message.requestId) case readQueryResultAggregator: ReadQueryResultAggregator => readQueryResultAggregator.withShrineNetworkQueryId(message.requestId) case _ => aggregator } private def newQueryId = BroadcastMessage.Ids.next + + + def attachSigningCert:Boolean + } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/HubBroadcastAndAggregationService.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/HubBroadcastAndAggregationService.scala index aebce5ff4..f1e38d77b 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/HubBroadcastAndAggregationService.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/HubBroadcastAndAggregationService.scala @@ -1,24 +1,26 @@ package net.shrine.broadcaster import net.shrine.log.Loggable import scala.concurrent.Future import scala.concurrent.duration.Duration import net.shrine.aggregation.Aggregator import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.ErrorResponse import net.shrine.protocol.Failure import net.shrine.protocol.Result import net.shrine.protocol.ShrineResponse import net.shrine.protocol.SingleNodeResult import net.shrine.protocol.Timeout import net.shrine.util.XmlDateHelper import net.shrine.crypto.Signer import net.shrine.protocol.BaseShrineResponse import net.shrine.broadcaster.dao.HubDao /** * @author clint * @date Nov 15, 2013 */ -final class HubBroadcastAndAggregationService(broadcasterClient: BroadcasterClient) extends AbstractBroadcastAndAggregationService(broadcasterClient) \ No newline at end of file +final class HubBroadcastAndAggregationService(broadcasterClient: BroadcasterClient) extends AbstractBroadcastAndAggregationService(broadcasterClient) { + override def attachSigningCert: Boolean = false +} \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationService.scala b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationService.scala index 4a5727cc0..02b26dc4a 100644 --- a/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationService.scala +++ b/hub/broadcaster-aggregator/src/main/scala/net/shrine/broadcaster/SigningBroadcastAndAggregationService.scala @@ -1,52 +1,54 @@ package net.shrine.broadcaster import com.typesafe.config.Config import net.shrine.broadcaster.dao.HubDao import net.shrine.client.{EndpointConfig, Poster} import net.shrine.config.ConfigExtensions import net.shrine.crypto.{DefaultSignerVerifier, KeyStoreCertCollection, Signer, SigningCertStrategy} import net.shrine.protocol.ResultOutputType /** * @author clint * @since Feb 28, 2014 */ final case class SigningBroadcastAndAggregationService(broadcasterClient: BroadcasterClient, signer: Signer, signingCertStrategy: SigningCertStrategy) extends AbstractBroadcastAndAggregationService(broadcasterClient, - signer.sign(_, signingCertStrategy)) + signer.sign(_, signingCertStrategy)) { + override def attachSigningCert: Boolean = signingCertStrategy == SigningCertStrategy.Attach +} object SigningBroadcastAndAggregationService { def apply(qepConfig:Config, shrineCertCollection: KeyStoreCertCollection, breakdownTypes: Set[ResultOutputType], //todo I'm surprised you need this to support a remote hub. Figure out why. Remove if possible broadcastDestinations: Option[Set[NodeHandle]], //todo remove when you use loopback for a local hub hubDao: HubDao //todo remove when you use loopback for a local hub ):SigningBroadcastAndAggregationService = { val signerVerifier: DefaultSignerVerifier = new DefaultSignerVerifier(shrineCertCollection) val broadcasterClient: BroadcasterClient = { //todo don't bother with a distinction between local and remote QEPs. Just use loopback. val remoteHubEndpoint = qepConfig.getOptionConfigured("broadcasterServiceEndpoint", EndpointConfig(_)) remoteHubEndpoint.fold{ require(broadcastDestinations.isDefined, s"The QEP's config implied a local hub (no broadcasterServiceEndpoint), but either no downstream nodes were configured, the hub was not configured, or the hub's configuration specified not to create it.") val broadcaster: AdapterClientBroadcaster = AdapterClientBroadcaster(broadcastDestinations.get, hubDao) val broadcastClient:BroadcasterClient = InJvmBroadcasterClient(broadcaster) broadcastClient }{ hubEndpointConfig => PosterBroadcasterClient(Poster(shrineCertCollection,hubEndpointConfig), breakdownTypes) } } //todo ditch the option and use reference.conf val attachSigningCerts: Boolean = qepConfig.getOption("attachSigningCert", _.getBoolean).getOrElse(false) val signingCertStrategy:SigningCertStrategy = if(attachSigningCerts) SigningCertStrategy.Attach else SigningCertStrategy.DontAttach new SigningBroadcastAndAggregationService(broadcasterClient, signerVerifier, signingCertStrategy) } } \ No newline at end of file diff --git a/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/BroadcastAndAggregationServiceTest.scala b/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/BroadcastAndAggregationServiceTest.scala index a6678546c..c47254cb2 100644 --- a/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/BroadcastAndAggregationServiceTest.scala +++ b/hub/broadcaster-aggregator/src/test/scala/net/shrine/broadcaster/BroadcastAndAggregationServiceTest.scala @@ -1,174 +1,176 @@ package net.shrine.broadcaster import net.shrine.util.ShouldMatchersForJUnit import org.junit.Test import net.shrine.protocol.DeleteQueryRequest import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.ShrineResponse import net.shrine.aggregation.Aggregator import scala.concurrent.Future import net.shrine.protocol.RunQueryRequest import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.Credential import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.query.Term import net.shrine.aggregation.DeleteQueryAggregator import net.shrine.aggregation.RunQueryAggregator import net.shrine.aggregation.ReadQueryResultAggregator /** * @author clint * @since Mar 14, 2013 */ final class BroadcastAndAggregationServiceTest extends ShouldMatchersForJUnit { import BroadcastAndAggregationServiceTest._ private val authn = AuthenticationInfo("some-domain", "some-user", Credential("some-password", isToken = false)) private val queryDef = QueryDefinition("yo", Term("foo")) import scala.concurrent.duration._ @Test def testSendAndAggregateShrineRequest() { val service = new TestBroadcastAndAggregationService { val req = DeleteQueryRequest("projectId", 1.millisecond, authn, 123L) val aggregator = new DeleteQueryAggregator val networkAuthn = AuthenticationInfo("d", "u", Credential("p", isToken = false)) service.sendAndAggregate(networkAuthn, req, aggregator, shouldBroadcast = true) service.args.shouldBroadcast should be(Some(true)) service.sendAndAggregate(networkAuthn, req, aggregator, shouldBroadcast = false) service.args.shouldBroadcast should be(Some(false)) service.args.aggregator should be(aggregator) service.args.message.networkAuthn should be(networkAuthn) service.args.message.request should be(req) (service.args.message.requestId > 0) should be(right = true) } { val invalidQueryId = -1L val req = RunQueryRequest("projectId", 1.millisecond, authn, invalidQueryId, Some("topicId"), Some("Topic Name"), Set.empty, queryDef) val aggregator = new RunQueryAggregator(invalidQueryId, authn.username, authn.domain, queryDef, true) val networkAuthn = AuthenticationInfo("d", "u", Credential("p", isToken = false)) service.sendAndAggregate(networkAuthn, req, aggregator, shouldBroadcast = true) service.args.shouldBroadcast should be(Some(true)) (service.args.message.requestId > 0) should be(right = true) service.args.message.request should not be req service.args.message.request.asInstanceOf[RunQueryRequest].networkQueryId should be(service.args.message.requestId) service.args.message.networkAuthn should be(networkAuthn) service.args.aggregator should not be aggregator service.args.aggregator.asInstanceOf[RunQueryAggregator].queryId should be(service.args.message.requestId) } } @Test def testAddQueryIdAggregator() { val service = new TestBroadcastAndAggregationService { val aggregator = new DeleteQueryAggregator val munged = service.addQueryId(null, aggregator) (munged eq aggregator) should be(right = true) } { val aggregator = new RunQueryAggregator(-1L, authn.username, authn.domain, queryDef, true) val message = BroadcastMessage(999L, authn, null) val munged = service.addQueryId(message, aggregator).asInstanceOf[RunQueryAggregator] munged.queryId should be(message.requestId) munged.userId should equal(aggregator.userId) munged.groupId should equal(aggregator.groupId) munged.requestQueryDefinition should equal(aggregator.requestQueryDefinition) munged.addAggregatedResult should equal(aggregator.addAggregatedResult) } def doTestWithReadQueryResultAggregator(showAggregation: Boolean) { val aggregator = new ReadQueryResultAggregator(-1L, showAggregation) val message = BroadcastMessage(999L, authn, null) val munged = service.addQueryId(message, aggregator).asInstanceOf[ReadQueryResultAggregator] munged should not be aggregator munged.shrineNetworkQueryId should be(message.requestId) munged.showAggregation should be(aggregator.showAggregation) } doTestWithReadQueryResultAggregator(true) doTestWithReadQueryResultAggregator(false) } @Test def testAddQueryIdShrineRequest() { val service = new TestBroadcastAndAggregationService { val req = DeleteQueryRequest("projectId", 1.millisecond, authn, 123L) val (queryIdOption, transformedReq) = service.addQueryId(req) queryIdOption should be(None) transformedReq should be(req) } { val req = RunQueryRequest("projectId", 1.millisecond, authn, -1L, Some("topicId"), Some("Topic Name"), Set.empty, QueryDefinition("yo", Term("foo"))) val (queryIdOption, transformedReq: RunQueryRequest) = service.addQueryId(req) queryIdOption should not be None (queryIdOption.get > 0) should be(right = true) transformedReq.networkQueryId should be(queryIdOption.get) transformedReq.projectId should be(req.projectId) transformedReq.waitTime should be(req.waitTime) transformedReq.authn should be(req.authn) transformedReq.topicId should be(req.topicId) transformedReq.outputTypes should be(req.outputTypes) transformedReq.queryDefinition should be(req.queryDefinition) } } } object BroadcastAndAggregationServiceTest { private final class TestBroadcastAndAggregationService extends BroadcastAndAggregationService { object args { var message: BroadcastMessage = _ var aggregator: Aggregator = _ var shouldBroadcast: Option[Boolean] = None } override def sendAndAggregate(message: BroadcastMessage, aggregator: Aggregator, shouldBroadcast: Boolean): Future[ShrineResponse] = { args.message = message args.aggregator = aggregator args.shouldBroadcast = Some(shouldBroadcast) Future.successful(null) } + + override def attachSigningCert: Boolean = false } } \ No newline at end of file diff --git a/tools/scanner/src/test/scala/net/shrine/utilities/scanner/BroadcastServiceScannerClientTest.scala b/tools/scanner/src/test/scala/net/shrine/utilities/scanner/BroadcastServiceScannerClientTest.scala index e2e083ab7..e4ab6ef3a 100644 --- a/tools/scanner/src/test/scala/net/shrine/utilities/scanner/BroadcastServiceScannerClientTest.scala +++ b/tools/scanner/src/test/scala/net/shrine/utilities/scanner/BroadcastServiceScannerClientTest.scala @@ -1,141 +1,145 @@ package net.shrine.utilities.scanner import net.shrine.util.ShouldMatchersForJUnit import org.junit.Test import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.Credential import net.shrine.broadcaster.BroadcastAndAggregationService import net.shrine.protocol.BroadcastMessage import net.shrine.aggregation.Aggregator import scala.concurrent.Future import net.shrine.protocol.ShrineResponse import net.shrine.authentication.Authenticator import net.shrine.authentication.AuthenticationResult import net.shrine.protocol.DeleteQueryResponse import net.shrine.protocol.AggregatedRunQueryResponse import net.shrine.util.XmlDateHelper import net.shrine.protocol.query.Term import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.QueryResult import net.shrine.protocol.ResultOutputType import scala.concurrent.Await import scala.concurrent.duration.Duration import net.shrine.authentication.NotAuthenticatedException import net.shrine.protocol.AggregatedReadQueryResultResponse /** * @author clint * @date Jan 14, 2014 */ final class BroadcastServiceScannerClientTest extends ShouldMatchersForJUnit { private val authn = AuthenticationInfo("d", "u", Credential("p", false)) private val projectId = "SHRINE-PROJECT" import BroadcastServiceScannerClientTest._ import scala.concurrent.ExecutionContext.Implicits.{ global => executionContext } @Test def testQuery { val broadcastAndAggregationService = new MockQueryBroadcastAndAggregationService val term = "t" { val clientThatWorks: BroadcastServiceScannerClient = new BroadcastServiceScannerClient(projectId, authn, broadcastAndAggregationService, Authenticators.alwaysWorks, executionContext) {} val result = Await.result(clientThatWorks.query(term), Duration.Inf) result.count should equal(setSize) result.networkQueryId should equal(masterId) result.networkResultId should equal(resultId) result.status should equal(QueryResult.StatusType.Finished) result.term should equal(term) } { val clientThatDoesntAuthenticate: BroadcastServiceScannerClient = new BroadcastServiceScannerClient(projectId, authn, broadcastAndAggregationService, Authenticators.neverWorks, executionContext) {} intercept[NotAuthenticatedException] { clientThatDoesntAuthenticate.query(term) } } } @Test def testRetrieveResults { val broadcastAndAggregationService = new MockRetrieveResultsBroadcastAndAggregationService val term = "t" val termResult = TermResult(masterId, resultId, term, QueryResult.StatusType.Processing, setSize) { val clientThatWorks: BroadcastServiceScannerClient = new BroadcastServiceScannerClient(projectId, authn, broadcastAndAggregationService, Authenticators.alwaysWorks, executionContext) {} val result = Await.result(clientThatWorks.retrieveResults(termResult), Duration.Inf) result.count should equal(setSize) result.networkQueryId should equal(masterId) result.networkResultId should equal(resultId) result.status should equal(QueryResult.StatusType.Finished) result.term should equal(term) } { val clientThatDoesntAuthenticate: BroadcastServiceScannerClient = new BroadcastServiceScannerClient(projectId, authn, broadcastAndAggregationService, Authenticators.neverWorks, executionContext) {} intercept[NotAuthenticatedException] { clientThatDoesntAuthenticate.retrieveResults(termResult) } } } } object BroadcastServiceScannerClientTest { private val masterId = 12345L private val instanceId = 98765L private val resultId = 378256L private val setSize = 123L private final class MockQueryBroadcastAndAggregationService extends BroadcastAndAggregationService { override def sendAndAggregate(message: BroadcastMessage, aggregator: Aggregator, shouldBroadcast: Boolean): Future[ShrineResponse] = Future.successful { AggregatedRunQueryResponse( masterId, XmlDateHelper.now, "u", "d", QueryDefinition("foo", Term("t")), instanceId, Seq( QueryResult( resultId, instanceId, Some(ResultOutputType.PATIENT_COUNT_XML), setSize, Some(XmlDateHelper.now), Some(XmlDateHelper.now), None, QueryResult.StatusType.Finished, None))) } + + override def attachSigningCert: Boolean = false } private final class MockRetrieveResultsBroadcastAndAggregationService extends BroadcastAndAggregationService { override def sendAndAggregate(message: BroadcastMessage, aggregator: Aggregator, shouldBroadcast: Boolean): Future[ShrineResponse] = Future.successful { AggregatedReadQueryResultResponse( masterId, Seq( QueryResult( resultId, instanceId, Some(ResultOutputType.PATIENT_COUNT_XML), setSize, Some(XmlDateHelper.now), Some(XmlDateHelper.now), None, QueryResult.StatusType.Finished, None))) } + + override def attachSigningCert: Boolean = false } } \ No newline at end of file