diff --git a/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala b/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala index c8cd62073..9e066d675 100644 --- a/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala +++ b/integration/src/test/scala/net/shrine/integration/NetworkSimulationTest.scala @@ -1,338 +1,337 @@ package net.shrine.integration import net.shrine.log.Loggable import scala.concurrent.Future import scala.concurrent.duration.DurationInt import org.junit.Test import net.shrine.util.ShouldMatchersForJUnit import net.shrine.adapter.AdapterMap import net.shrine.adapter.DeleteQueryAdapter import net.shrine.adapter.client.AdapterClient import net.shrine.adapter.dao.squeryl.AbstractSquerylAdapterTest import net.shrine.adapter.service.AdapterRequestHandler import net.shrine.adapter.service.AdapterService import net.shrine.broadcaster.AdapterClientBroadcaster import net.shrine.broadcaster.NodeHandle import net.shrine.crypto.DefaultSignerVerifier import net.shrine.crypto.TestKeystore import net.shrine.protocol.{HiveCredentials, AuthenticationInfo, BroadcastMessage, Credential, DeleteQueryRequest, DeleteQueryResponse, NodeId, Result, RunQueryRequest, CertId, RequestType, FlagQueryRequest, FlagQueryResponse, RawCrcRunQueryResponse, ResultOutputType, QueryResult, RunQueryResponse, AggregatedRunQueryResponse, UnFlagQueryRequest, UnFlagQueryResponse, DefaultBreakdownResultOutputTypes} import net.shrine.qep.QepService import net.shrine.broadcaster.SigningBroadcastAndAggregationService import net.shrine.broadcaster.InJvmBroadcasterClient import net.shrine.adapter.FlagQueryAdapter import net.shrine.protocol.query.Term import net.shrine.adapter.RunQueryAdapter import net.shrine.client.Poster import net.shrine.client.HttpClient import net.shrine.client.HttpResponse import net.shrine.adapter.translators.QueryDefinitionTranslator import net.shrine.adapter.translators.ExpressionTranslator import net.shrine.util.XmlDateHelper import net.shrine.adapter.ReadQueryResultAdapter import net.shrine.protocol.query.QueryDefinition import net.shrine.adapter.UnFlagQueryAdapter import net.shrine.crypto.SigningCertStrategy /** * @author clint * @since Nov 27, 2013 * * An in-JVM simulation of a Shrine network with one hub and 4 doanstream adapters. * * The hub and adapters are wired up with mock AdapterClients that do in-JVM communication via method calls * instead of remotely. * * The adapters are configured to respond with valid results for DeleteQueryRequests * only. Other requests could be handled, but that would not provide benefit to offset the effort of wiring * up more and more-complex Adapters. * * The test network is queried, and the final result, as well as the state of each adapter, is inspected to * ensure that the right messages were sent between elements of the system. * */ final class NetworkSimulationTest extends AbstractSquerylAdapterTest with ShouldMatchersForJUnit { -/* + private val certCollection = TestKeystore.certCollection private lazy val myCertId: CertId = certCollection.myCertId.get private lazy val signerVerifier = new DefaultSignerVerifier(certCollection) private val domain = "test-domain" private val username = "test-username" private val password = "test-password" import NetworkSimulationTest._ import scala.concurrent.duration._ private def deleteQueryAdapter: DeleteQueryAdapter = new DeleteQueryAdapter(dao) private def flagQueryAdapter: FlagQueryAdapter = new FlagQueryAdapter(dao) private def unFlagQueryAdapter: UnFlagQueryAdapter = new UnFlagQueryAdapter(dao) private def mockPoster = Poster("http://example.com", new HttpClient { override def post(input: String, url: String): HttpResponse = ??? }) private val hiveCredentials = HiveCredentials("d", "u", "pwd", "pid") private def queuesQueriesRunQueryAdapter: RunQueryAdapter = { val translator = new QueryDefinitionTranslator(new ExpressionTranslator(Map("n1" -> Set("l1")))) RunQueryAdapter( mockPoster, dao, hiveCredentials, translator, 10000, doObfuscation = false, runQueriesImmediately = false, DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false ) } private def immediatelyRunsQueriesRunQueryAdapter(setSize: Long): RunQueryAdapter = { val mockCrcPoster = Poster("http://example.com", new HttpClient { override def post(input: String, url: String): HttpResponse = { val req = RunQueryRequest.fromI2b2String(DefaultBreakdownResultOutputTypes.toSet)(input).get val now = XmlDateHelper.now val queryResult = QueryResult(1L, 42L, Some(ResultOutputType.PATIENT_COUNT_XML), setSize, Some(now), Some(now), Some("desc"), QueryResult.StatusType.Finished, Some("status")) val mockCrcXml = RawCrcRunQueryResponse(req.networkQueryId, XmlDateHelper.now, req.authn.username, req.projectId, req.queryDefinition, 42L, Map(ResultOutputType.PATIENT_COUNT_XML -> Seq(queryResult))).toI2b2String HttpResponse.ok(mockCrcXml) } }) queuesQueriesRunQueryAdapter.copy(poster = mockCrcPoster, runQueriesImmediately = true) } private def readQueryResultAdapter(setSize: Long): ReadQueryResultAdapter = { new ReadQueryResultAdapter( mockPoster, hiveCredentials, dao, doObfuscation = false, DefaultBreakdownResultOutputTypes.toSet, collectAdapterAudit = false ) } private lazy val adaptersByNodeId: Seq[(NodeId, MockAdapterRequestHandler)] = { import NodeName._ import RequestType.{ MasterDeleteRequest => MasterDeleteRequestRT, FlagQueryRequest => FlagQueryRequestRT, QueryDefinitionRequest => RunQueryRT, GetQueryResult => ReadQueryResultRT, UnFlagQueryRequest => UnFlagQueryRequestRT } (for { (childName, setSize) <- Seq((A, 1L), (B, 2L), (C, 3L), (D, 4L)) } yield { val nodeId = NodeId(childName.name) val maxSignatureAge = 1.hour val adapterMap = AdapterMap(Map( MasterDeleteRequestRT -> deleteQueryAdapter, FlagQueryRequestRT -> flagQueryAdapter, UnFlagQueryRequestRT -> unFlagQueryAdapter, RunQueryRT -> queuesQueriesRunQueryAdapter, ReadQueryResultRT -> readQueryResultAdapter(setSize))) nodeId -> MockAdapterRequestHandler(new AdapterService(nodeId, signerVerifier, maxSignatureAge, adapterMap)) }) } private lazy val shrineService: QepService = { val destinations: Set[NodeHandle] = { (for { (nodeId, adapterRequestHandler) <- adaptersByNodeId } yield { NodeHandle(nodeId, MockAdapterClient(nodeId, adapterRequestHandler)) }).toSet } QepService( "example.com", MockAuditDao, MockAuthenticator, MockQueryAuthorizationService, true, SigningBroadcastAndAggregationService(InJvmBroadcasterClient(AdapterClientBroadcaster(destinations, MockHubDao)), signerVerifier, SigningCertStrategy.Attach), 1.hour, DefaultBreakdownResultOutputTypes.toSet, false) } @Test def testSimulatedNetwork = afterCreatingTables { val authn = AuthenticationInfo(domain, username, Credential(password, false)) val masterId = 12345L import scala.concurrent.duration._ val req = DeleteQueryRequest("some-project-id", 1.second, authn, masterId) val resp = shrineService.deleteQuery(req, true) for { (nodeId, mockAdapter) <- adaptersByNodeId } { mockAdapter.lastMessage.networkAuthn.domain should equal(authn.domain) mockAdapter.lastMessage.networkAuthn.username should equal(authn.username) mockAdapter.lastMessage.request should equal(req) mockAdapter.lastResult.response should equal(DeleteQueryResponse(masterId)) } resp should equal(DeleteQueryResponse(masterId)) } @Test def testQueueQuery = afterCreatingTables { val authn = AuthenticationInfo(domain, username, Credential(password, false)) val topicId = "askldjlkas" val topicName = "Topic Name" val queryName = "lsadj3028940" import scala.concurrent.duration._ val runQueryReq = RunQueryRequest("some-project-id", 1.second, authn, 12345L, Some(topicId), Some(topicName), Set(ResultOutputType.PATIENT_COUNT_XML), QueryDefinition(queryName, Term("n1"))) val aggregatedRunQueryResp = shrineService.runQuery(runQueryReq, true).asInstanceOf[AggregatedRunQueryResponse] var broadcastMessageId: Option[Long] = None //Broadcast the original run query request; all nodes should queue the query for { (nodeId, mockAdapter) <- adaptersByNodeId } { broadcastMessageId = Option(mockAdapter.lastMessage.requestId) mockAdapter.lastMessage.networkAuthn.domain should equal(authn.domain) mockAdapter.lastMessage.networkAuthn.username should equal(authn.username) val lastReq = mockAdapter.lastMessage.request.asInstanceOf[RunQueryRequest] lastReq.authn should equal(runQueryReq.authn) lastReq.requestType should equal(runQueryReq.requestType) lastReq.waitTime should equal(runQueryReq.waitTime) //todo what to do with this check? lastReq.networkQueryId should equal(mockAdapter.lastMessage.requestId) lastReq.outputTypes should equal(runQueryReq.outputTypes) lastReq.projectId should equal(runQueryReq.projectId) lastReq.queryDefinition should equal(runQueryReq.queryDefinition) lastReq.topicId should equal(runQueryReq.topicId) val runQueryResp = mockAdapter.lastResult.response.asInstanceOf[RunQueryResponse] runQueryResp.queryId should equal(-1L) runQueryResp.singleNodeResult.statusType should equal(QueryResult.StatusType.Held) runQueryResp.singleNodeResult.setSize should equal(-1L) } aggregatedRunQueryResp.queryId should equal(broadcastMessageId.get) aggregatedRunQueryResp.results.map(_.setSize) should equal(Seq(-1L, -1L, -1L, -1L, -4L)) } @Test def testFlagQuery = afterCreatingTables { val authn = AuthenticationInfo(domain, username, Credential(password, false)) val masterId = 12345L import scala.concurrent.duration._ val networkQueryId = 9999L val name = "some query" val expr = Term("foo") val fooQuery = QueryDefinition(name,expr) dao.insertQuery(masterId.toString, networkQueryId, authn, fooQuery, isFlagged = false, hasBeenRun = true, flagMessage = None) dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(false) dao.findQueryByNetworkId(networkQueryId).get.hasBeenRun should be(true) dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(None) val req = FlagQueryRequest("some-project-id", 1.second, authn, networkQueryId, Some("foo")) val resp = shrineService.flagQuery(req, true) resp should equal(FlagQueryResponse) dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(true) dao.findQueryByNetworkId(networkQueryId).get.hasBeenRun should be(true) dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(Some("foo")) } @Test def testUnFlagQuery = afterCreatingTables { val authn = AuthenticationInfo(domain, username, Credential(password, false)) val masterId = 12345L import scala.concurrent.duration._ val networkQueryId = 9999L val flagMsg = Some("foo") val name = "some query" val expr = Term("foo") val fooQuery = QueryDefinition(name,expr) dao.insertQuery(masterId.toString, networkQueryId, authn, fooQuery, isFlagged = true, hasBeenRun = true, flagMessage = flagMsg) dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(true) dao.findQueryByNetworkId(networkQueryId).get.hasBeenRun should be(true) dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(flagMsg) val req = UnFlagQueryRequest("some-project-id", 1.second, authn, networkQueryId) val resp = shrineService.unFlagQuery(req, true) resp should equal(UnFlagQueryResponse) dao.findQueryByNetworkId(networkQueryId).get.isFlagged should be(false) dao.findQueryByNetworkId(networkQueryId).get.hasBeenRun should be(true) dao.findQueryByNetworkId(networkQueryId).get.flagMessage should be(None) } } object NetworkSimulationTest { private final case class MockAdapterClient(nodeId: NodeId, adapter: AdapterRequestHandler) extends AdapterClient with Loggable { import scala.concurrent.ExecutionContext.Implicits.global override def query(message: BroadcastMessage): Future[Result] = Future.successful { debug(s"Invoking Adapter $nodeId with $message") val result = adapter.handleRequest(message) debug(s"Got result from $nodeId: $result") result } } private final case class MockAdapterRequestHandler(delegate: AdapterRequestHandler) extends AdapterRequestHandler { @volatile var lastMessage: BroadcastMessage = _ @volatile var lastResult: Result = _ override def handleRequest(request: BroadcastMessage): Result = { lastMessage = request val result = delegate.handleRequest(request) lastResult = result result } } - */ } diff --git a/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala b/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala index b09839d8a..d6baf2068 100644 --- a/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala +++ b/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala @@ -1,91 +1,90 @@ package net.shrine.integration import net.shrine.protocol.{DeleteQueryRequest, DeleteQueryResponse, RequestType, Result} import net.shrine.util.ShouldMatchersForJUnit import org.junit.{After, Before, Test} /** * @author clint * @since Jan 8, 2014 * * An end-to-end JAX-RS test that fires up a Hub and two spokes, makes a query, * and verifies that the correct requests were broadcast to the spokes, and * that the correct responses were received and aggregated at the hub. * * DeleteQueryResponses are used because they have very few fields and are easy * to construct and verify. It might be nice in the future to use * RunQuery{Request,Response}, but that was more trouble than it was worth for * a first pass. * * NB: The hub runs on port 9997, and the two spokes run on ports 9998 and 9999. */ final class OneHubTwoSpokesJaxrsTest extends AbstractHubAndSpokesTest with ShouldMatchersForJUnit { thisTest => -/* + @Test def testBroadcastDeleteQueryShrine: Unit = { doTestBroadcastDeleteQuery(shrineHubComponent) } @Test def testBroadcastDeleteQueryI2b2: Unit = { doTestBroadcastDeleteQuery(i2b2HubComponent) } lazy val shrineHubComponent = Hubs.Shrine(thisTest, port = 9997) lazy val i2b2HubComponent = Hubs.I2b2(thisTest, port = 9996) private def doTestBroadcastDeleteQuery[H <: AnyRef](hubComponent: AbstractHubComponent[H]): Unit = { val masterId = 123456L val projectId = "some-project-id" val client = hubComponent.clientFor(projectId, networkAuthn) //Broadcast a message val resp = client.deleteQuery(masterId, true) //Make sure we got the right response resp.queryId should equal(masterId) //Make sure all the spokes received the right message spokes.foreach { spoke => val lastMessage = spoke.mockHandler.lastMessage.get lastMessage.networkAuthn.domain should equal(networkAuthn.domain) lastMessage.networkAuthn.username should equal(networkAuthn.username) val req = lastMessage.request.asInstanceOf[DeleteQueryRequest] req.networkQueryId should equal(masterId) req.projectId should equal(projectId) req.requestType should equal(RequestType.MasterDeleteRequest) req.authn should equal(networkAuthn) } //Make sure we got the right responses at the hub val multiplexer = hubComponent.broadcaster.lastMultiplexer.get val expectedResponses = spokes.map { spoke => Result(spoke.nodeId, spoke.mockHandler.elapsed, DeleteQueryResponse(masterId)) }.toSet multiplexer.resultsSoFar.toSet should equal(expectedResponses) } @Before override def setUp() { super.setUp() shrineHubComponent.JerseyTest.setUp() i2b2HubComponent.JerseyTest.setUp() } @After override def tearDown() { shrineHubComponent.JerseyTest.tearDown() i2b2HubComponent.JerseyTest.tearDown() super.tearDown() } - */ } \ No newline at end of file diff --git a/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala b/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala index d61168cb3..1d6455386 100644 --- a/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala +++ b/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala @@ -1,249 +1,248 @@ package net.shrine.integration import net.shrine.adapter.client.RemoteAdapterClient import net.shrine.adapter.service.JerseyTestComponent import net.shrine.broadcaster.{AdapterClientBroadcaster, NodeHandle, PosterBroadcasterClient} import net.shrine.broadcaster.service.{BroadcasterMultiplexerRequestHandler, BroadcasterMultiplexerResource, BroadcasterMultiplexerService} import net.shrine.protocol.query.{Constrained, Modifiers, Or, QueryDefinition, Term, ValueConstraint} import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, DeleteQueryResponse, FlagQueryRequest, FlagQueryResponse, NodeId, RequestType, Result, ResultOutputType, RunQueryRequest, RunQueryResponse, UnFlagQueryRequest, UnFlagQueryResponse} import net.shrine.util.ShouldMatchersForJUnit import org.junit.{After, Before, Test} /** * @author clint * @since Mar 6, 2014 */ final class OneQepOneHubTwoSpokesJaxrsTest extends AbstractHubAndSpokesTest with ShouldMatchersForJUnit { thisTest => -/* + @Test def testBroadcastDeleteQueryShrine(): Unit = doTestBroadcastDeleteQuery(shrineQueryEntryPointComponent) @Test def testBroadcastDeleteQueryI2b2(): Unit = doTestBroadcastDeleteQuery(i2b2QueryEntryPointComponent) @Test def testBroadcastFlagQueryShrine(): Unit = doTestBroadcastFlagQuery(shrineQueryEntryPointComponent) @Test def testBroadcastFlagQueryI2b2(): Unit = doTestBroadcastFlagQuery(i2b2QueryEntryPointComponent) @Test def testBroadcastUnFlagQueryShrine(): Unit = doTestBroadcastUnFlagQuery(shrineQueryEntryPointComponent) @Test def testBroadcastUnFlagQueryI2b2(): Unit = doTestBroadcastUnFlagQuery(i2b2QueryEntryPointComponent) @Test def testBroadcastRunQueryShrine(): Unit = doTestBroadcastRunQuery(shrineQueryEntryPointComponent) @Test def testBroadcastRunQueryI2b2(): Unit = doTestBroadcastRunQuery(i2b2QueryEntryPointComponent) private def doTestBroadcastDeleteQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = { val masterId = 123456L val projectId = "some-project-id" val client = queryEntryPointComponent.clientFor(projectId, networkAuthn) //Broadcast a message val resp = client.deleteQuery(masterId, true) //Make sure we got the right response resp.queryId should equal(masterId) //Make sure all the spokes received the right message spokes.foreach { spoke => val lastMessage = spoke.mockHandler.lastMessage.get lastMessage.networkAuthn.domain should equal(networkAuthn.domain) lastMessage.networkAuthn.username should equal(networkAuthn.username) val req = lastMessage.request.asInstanceOf[DeleteQueryRequest] req.networkQueryId should equal(masterId) req.projectId should equal(projectId) req.requestType should equal(RequestType.MasterDeleteRequest) req.authn should equal(networkAuthn) } //Make sure we got the right responses at the hub val multiplexer = HubComponent.broadcaster.lastMultiplexer.get val expectedResponses = spokes.map { spoke => Result(spoke.nodeId, spoke.mockHandler.elapsed, DeleteQueryResponse(masterId)) }.toSet multiplexer.resultsSoFar.toSet should equal(expectedResponses) } private def doTestBroadcastRunQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = { val masterId = 123456L val projectId = "some-project-id" val client = queryEntryPointComponent.clientFor(projectId, networkAuthn) //Include a modified term, to ensure they're parsed properly val queryDefinition = QueryDefinition("foo", Or(Term("x"), Constrained(Term("y"), Modifiers("some-modifier", "ap", "k"), ValueConstraint("foo", Some("bar"), "baz", "nuh")))) //Broadcast a message val resp = client.runQuery("some-topic-id", Set(ResultOutputType.PATIENT_COUNT_XML), queryDefinition, true) resp.results.size should equal(spokes.size) //Make sure all the spokes received the right message spokes.foreach { spoke => val lastMessage = spoke.mockHandler.lastMessage.get lastMessage.networkAuthn.domain should equal(networkAuthn.domain) lastMessage.networkAuthn.username should equal(networkAuthn.username) val req = lastMessage.request.asInstanceOf[RunQueryRequest] req.projectId should equal(projectId) req.requestType should equal(RequestType.QueryDefinitionRequest) req.authn should equal(networkAuthn) req.queryDefinition should equal(queryDefinition) } //Make sure we got the right responses at the hub val multiplexer = HubComponent.broadcaster.lastMultiplexer.get multiplexer.resultsSoFar.collect { case Result(_, _, payload) => payload.getClass } should equal((1 to spokes.size).map(_ => classOf[RunQueryResponse])) val expectedResponders = spokes.map(_.nodeId).toSet multiplexer.resultsSoFar.map(_.origin).toSet should equal(expectedResponders) } private def doTestBroadcastFlagQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = { val networkQueryId = 123456L val projectId = "some-project-id" val client = queryEntryPointComponent.clientFor(projectId, networkAuthn) val message = "flag message" //Broadcast a message val resp = client.flagQuery(networkQueryId, Some(message), true) //Make sure we got the right response resp should be(FlagQueryResponse) //Make sure all the spokes received the right message spokes.foreach { spoke => val lastMessage = spoke.mockHandler.lastMessage.get lastMessage.networkAuthn.domain should equal(networkAuthn.domain) lastMessage.networkAuthn.username should equal(networkAuthn.username) val req = lastMessage.request.asInstanceOf[FlagQueryRequest] req.networkQueryId should equal(networkQueryId) req.projectId should equal(projectId) req.requestType should equal(RequestType.FlagQueryRequest) req.authn should equal(networkAuthn) req.message should be(Some(message)) } //Make sure we got the right responses at the hub val multiplexer = HubComponent.broadcaster.lastMultiplexer.get val expectedResponses = spokes.map { spoke => Result(spoke.nodeId, spoke.mockHandler.elapsed, FlagQueryResponse) }.toSet multiplexer.resultsSoFar.toSet should equal(expectedResponses) } private def doTestBroadcastUnFlagQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = { val networkQueryId = 123456L val projectId = "some-project-id" val client = queryEntryPointComponent.clientFor(projectId, networkAuthn) //Broadcast a message val resp = client.unFlagQuery(networkQueryId, true) //Make sure we got the right response resp should be(UnFlagQueryResponse) //Make sure all the spokes received the right message spokes.foreach { spoke => val lastMessage = spoke.mockHandler.lastMessage.get lastMessage.networkAuthn.domain should equal(networkAuthn.domain) lastMessage.networkAuthn.username should equal(networkAuthn.username) val req = lastMessage.request.asInstanceOf[UnFlagQueryRequest] req.networkQueryId should equal(networkQueryId) req.projectId should equal(projectId) req.requestType should equal(RequestType.UnFlagQueryRequest) req.authn should equal(networkAuthn) } //Make sure we got the right responses at the hub val multiplexer = HubComponent.broadcaster.lastMultiplexer.get val expectedResponses = spokes.map { spoke => Result(spoke.nodeId, spoke.mockHandler.elapsed, UnFlagQueryResponse) }.toSet multiplexer.resultsSoFar.toSet should equal(expectedResponses) } import scala.concurrent.duration._ lazy val i2b2QueryEntryPointComponent = Hubs.I2b2(thisTest, port = 9995, broadcasterClient = Some(PosterBroadcasterClient(posterFor(HubComponent), DefaultBreakdownResultOutputTypes.toSet))) lazy val shrineQueryEntryPointComponent = Hubs.Shrine(thisTest, port = 9996, broadcasterClient = Some(PosterBroadcasterClient(posterFor(HubComponent), DefaultBreakdownResultOutputTypes.toSet))) object HubComponent extends JerseyTestComponent[BroadcasterMultiplexerRequestHandler] { override val basePath = "broadcaster/broadcast" override val port = 9997 override def resourceClass(handler: BroadcasterMultiplexerRequestHandler) = BroadcasterMultiplexerResource(handler) lazy val broadcaster: InspectableDelegatingBroadcaster = { val destinations: Set[NodeHandle] = spokes.map { spoke => val client = RemoteAdapterClient(NodeId.Unknown,posterFor(spoke), DefaultBreakdownResultOutputTypes.toSet) NodeHandle(spoke.nodeId, client) } InspectableDelegatingBroadcaster(AdapterClientBroadcaster(destinations, MockHubDao)) } override lazy val makeHandler: BroadcasterMultiplexerRequestHandler = { BroadcasterMultiplexerService(broadcaster, 1.hour) } } @Before override def setUp() { super.setUp() HubComponent.JerseyTest.setUp() shrineQueryEntryPointComponent.JerseyTest.setUp() i2b2QueryEntryPointComponent.JerseyTest.setUp() } @After override def tearDown() { i2b2QueryEntryPointComponent.JerseyTest.tearDown() shrineQueryEntryPointComponent.JerseyTest.tearDown() HubComponent.JerseyTest.tearDown() super.tearDown() } - */ } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala index 7e1bbec6d..fdf59e56d 100644 --- a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala +++ b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala @@ -1,253 +1,250 @@ package net.shrine.qep import net.shrine.aggregation.{Aggregator, Aggregators, DeleteQueryAggregator, FlagQueryAggregator, ReadInstanceResultsAggregator, ReadQueryDefinitionAggregator, RenameQueryAggregator, RunQueryAggregator, UnFlagQueryAggregator} import net.shrine.authentication.AuthenticationResult.Authenticated import net.shrine.authentication.{AuthenticationResult, Authenticator, NotAuthenticatedException} import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized} import net.shrine.authorization.QueryAuthorizationService import net.shrine.broadcaster.BroadcastAndAggregationService import net.shrine.log.Loggable import net.shrine.protocol.{QueryResult, AggregatedReadInstanceResultsResponse, AggregatedRunQueryResponse, AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, Credential, DeleteQueryRequest, FlagQueryRequest, QueryInstance, ReadApprovedQueryTopicsRequest, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, ReadQueryDefinitionRequest, ReadQueryInstancesRequest, ReadQueryInstancesResponse, ReadResultOutputTypesRequest, ReadResultOutputTypesResponse, RenameQueryRequest, ResultOutputType, RunQueryRequest, UnFlagQueryRequest} import net.shrine.qep.audit.QepAuditDb import net.shrine.qep.dao.AuditDao import net.shrine.qep.queries.QepQueryDb import net.shrine.util.XmlDateHelper import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration /** * @author clint * @since Feb 19, 2014 */ trait AbstractQepService[BaseResp <: BaseShrineResponse] extends Loggable { val commonName:String val auditDao: AuditDao val authenticator: Authenticator val authorizationService: QueryAuthorizationService val includeAggregateResult: Boolean val broadcastAndAggregationService: BroadcastAndAggregationService val queryTimeout: Duration val breakdownTypes: Set[ResultOutputType] val collectQepAudit:Boolean protected def doReadResultOutputTypes(request: ReadResultOutputTypesRequest): BaseResp = { info(s"doReadResultOutputTypes($request)") authenticateAndThen(request) { authResult => val resultOutputTypes = ResultOutputType.nonErrorTypes ++ breakdownTypes //TODO: XXX: HACK: Would like to remove the cast ReadResultOutputTypesResponse(resultOutputTypes).asInstanceOf[BaseResp] } } protected def doFlagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { authenticateAndThen(request) { authResult => QepQueryDb.db.insertQepQueryFlag(request) doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast,authResult) } } protected def doUnFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { authenticateAndThen(request) { authResult => QepQueryDb.db.insertQepQueryFlag(request) doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast,authResult) } } protected def doRunQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor") //store the query in the qep's database doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast,authResult) } } protected def doReadQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doReadQueryDefinition($request,$shouldBroadcast)") doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast,authResult) } } protected def doReadInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doReadInstanceResults($request,$shouldBroadcast)") val networkId = request.shrineNetworkQueryId //read from the QEP database code here. Only broadcast if some result is in some sketchy state val resultsFromDb: Seq[QueryResult] = QepQueryDb.db.selectMostRecentQepResultsFor(networkId) //If any query result was pending val response = if (resultsFromDb.nonEmpty && (!resultsFromDb.exists(!_.statusType.isDone))) { debug(s"Using qep cached results for query $networkId") AggregatedReadInstanceResultsResponse(networkId, resultsFromDb).asInstanceOf[BaseResp] } else { debug(s"Requesting results for $networkId from network") val response = doBroadcastQuery(request, new ReadInstanceResultsAggregator(networkId, false), shouldBroadcast,authResult) //put the new results in the database if we got what we wanted response match { case arirr: AggregatedReadInstanceResultsResponse => arirr.results.foreach(r => QepQueryDb.db.insertQueryResult(networkId, r)) case _ => //do nothing } response } response } } protected def doReadQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doReadQueryInstances($request,$shouldBroadcast)") val now = XmlDateHelper.now val networkQueryId = request.networkQueryId val username = request.authn.username val groupId = request.projectId //NB: Return a dummy response, with a dummy QueryInstance containing the network (Shrine) id of the query we'd like //to get "instances" for. This allows the legacy web client to formulate a request for query results that Shrine //can understand, while meeting the conversational requirements of the legacy web client. val instance = QueryInstance(networkQueryId.toString, networkQueryId.toString, username, groupId, now, now) //TODO: XXX: HACK: Would like to remove the cast //NB: Munge in username from authentication result ReadQueryInstancesResponse(networkQueryId, authResult.username, groupId, Seq(instance)).asInstanceOf[BaseResp] } } protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): ReadPreviousQueriesResponse = { authenticateAndThen(request){ authResult => info(s"doReadPreviousQueries($request,$shouldBroadcast)") //todo if any results are in one of the pending states go ahead and request them async (has to wait for async Shrine) //pull queries from the local database. QepQueryDb.db.selectPreviousQueries(request) } } protected def doRenameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doRenameQuery($request,$shouldBroadcast)") QepQueryDb.db.renamePreviousQuery(request) doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast,authResult) } } protected def doDeleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => info(s"doDeleteQuery($request,$shouldBroadcast)") QepQueryDb.db.markDeleted(request) doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast,authResult) } } protected def doReadApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseResp = authenticateAndThen(request) { _ => info(s"doReadApprovedQueryTopics($request,$shouldBroadcast)") //TODO: XXX: HACK: Would like to remove the cast authorizationService.readApprovedEntries(request) match { case Left(errorResponse) => errorResponse.asInstanceOf[BaseResp] case Right(validResponse) => validResponse.asInstanceOf[BaseResp] } } import broadcastAndAggregationService.sendAndAggregate protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean, authResult:Authenticated): BaseResp = { debug(s"doBroadcastQuery($request) authResult is $authResult") //NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another //When making BroadcastMessages val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false)) //NB: Only audit RunQueryRequests request match { case runQueryRequest: RunQueryRequest => // inject modified, authorized runQueryRequest //although it might make more sense to put this whole if block in the aggregator, the RunQueryAggregator lives in the hub, far from this DB code auditAuthorizeAndThen(runQueryRequest) { authorizedRequest => debug(s"doBroadcastQuery authorizedRequest is $authorizedRequest") // tuck the ACT audit metrics data into a database here if (collectQepAudit) QepAuditDb.db.insertQepQuery(authorizedRequest,commonName) QepQueryDb.db.insertQepQuery(authorizedRequest) val response: BaseResp = doSynchronousQuery(networkAuthn,authorizedRequest,aggregator,shouldBroadcast) response match { //todo do in one transaction case aggregated:AggregatedRunQueryResponse => aggregated.results.foreach(QepQueryDb.db.insertQueryResult(runQueryRequest.networkQueryId,_)) case _ => debug(s"Unanticipated response type $response") } response } case _ => doSynchronousQuery(networkAuthn,request,aggregator,shouldBroadcast) } } private def doSynchronousQuery(networkAuthn: AuthenticationInfo,request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean) = { info(s"doSynchronousQuery($request) started") val response = waitFor(sendAndAggregate(networkAuthn, request, aggregator, shouldBroadcast)).asInstanceOf[BaseResp] info(s"doSynchronousQuery($request) completed with response $response") response } private[qep] val runQueryAggregatorFor: RunQueryRequest => RunQueryAggregator = Aggregators.forRunQueryRequest(includeAggregateResult) protected def waitFor[R](futureResponse: Future[R]): R = { XmlDateHelper.time("Waiting for aggregated results")(debug(_)) { Await.result(futureResponse, queryTimeout) } } private[qep] def auditAuthorizeAndThen[T](request: RunQueryRequest)(body: (RunQueryRequest => T)): T = { auditTransactionally(request) { debug(s"auditAuthorizeAndThen($request) with $authorizationService") val authorizedRequest = authorizationService.authorizeRunQueryRequest(request) match { case na: NotAuthorized => throw na.toException case authorized: Authorized => request.copy(topicName = authorized.topicIdAndName.map(x => x._2)) } body(authorizedRequest) } } private[qep] def auditTransactionally[T](request: RunQueryRequest)(body: => T): T = { try { body } finally { auditDao.addAuditEntry( request.projectId, request.authn.domain, request.authn.username, request.queryDefinition.toI2b2String, //TODO: Use i2b2 format Still? request.topicId) } } + //todo move auth code with SHRINE-1322 import AuthenticationResult._ private[qep] def authenticateAndThen[T](request: BaseShrineRequest)(f: Authenticated => T): T = { val AuthenticationInfo(domain, username, _) = request.authn val authResult = authenticator.authenticate(request.authn) - //todo remove - throw new NotAuthenticatedException(domain,username,"Broke the code to see what happens",null) - authResult match { case a: Authenticated => f(a) - //todo this exception is never caught. Fix that with SHRINE-1322 case na:NotAuthenticated => throw NotAuthenticatedException(na) } } } \ No newline at end of file diff --git a/qep/service/src/test/scala/net/shrine/qep/I2B2QepServiceTest.scala b/qep/service/src/test/scala/net/shrine/qep/I2B2QepServiceTest.scala index 85ac395c4..ed3912acd 100644 --- a/qep/service/src/test/scala/net/shrine/qep/I2B2QepServiceTest.scala +++ b/qep/service/src/test/scala/net/shrine/qep/I2B2QepServiceTest.scala @@ -1,58 +1,57 @@ package net.shrine.qep import net.shrine.util.ShouldMatchersForJUnit import org.junit.Test import net.shrine.authentication.Authenticator import net.shrine.protocol.AuthenticationInfo import net.shrine.authentication.AuthenticationResult import net.shrine.protocol.ReadResultOutputTypesRequest import net.shrine.protocol.Credential import net.shrine.protocol.ReadResultOutputTypesResponse import net.shrine.protocol.ResultOutputType import net.shrine.protocol.DefaultBreakdownResultOutputTypes /** * @author clint * @since Oct 23, 2014 */ final class I2B2QepServiceTest extends ShouldMatchersForJUnit { private val knownUsername = "some-user" private val unknownUsername = "some-unknown-user" import scala.concurrent.duration._ -/* + @Test def testReadResultOutputTypes(): Unit = { val authenticator: Authenticator = new Authenticator { override def authenticate(authn: AuthenticationInfo): AuthenticationResult = { if (authn.username == knownUsername) { AuthenticationResult.Authenticated(authn.domain, authn.username) } else { AuthenticationResult.NotAuthenticated(authn.domain, authn.username, "blarg") } } } val breakdownResultOutputTypes = DefaultBreakdownResultOutputTypes.toSet val service = I2b2QepService("example.com",null, authenticator, null, includeAggregateResult = true, null, 1.day, breakdownResultOutputTypes,false) { val req = ReadResultOutputTypesRequest("project-id", 1.minute, AuthenticationInfo("d", knownUsername, Credential("foo", isToken = false))) val resp = service.readResultOutputTypes(req) resp.asInstanceOf[ReadResultOutputTypesResponse].outputTypes should equal(ResultOutputType.nonErrorTypes ++ breakdownResultOutputTypes) } { val req = ReadResultOutputTypesRequest("project-id", 1.minute, AuthenticationInfo("d", unknownUsername, Credential("foo", isToken = false))) intercept[Exception] { service.readResultOutputTypes(req) } } } - */ } \ No newline at end of file diff --git a/qep/service/src/test/scala/net/shrine/qep/QepServiceTest.scala b/qep/service/src/test/scala/net/shrine/qep/QepServiceTest.scala index b6ca05394..7f11748e8 100644 --- a/qep/service/src/test/scala/net/shrine/qep/QepServiceTest.scala +++ b/qep/service/src/test/scala/net/shrine/qep/QepServiceTest.scala @@ -1,209 +1,206 @@ package net.shrine.qep import org.junit.Test import org.scalatest.mock.EasyMockSugar import net.shrine.authorization.QueryAuthorizationService import net.shrine.qep.dao.AbstractAuditDaoTest import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.Credential import net.shrine.protocol.ReadApprovedQueryTopicsRequest import net.shrine.protocol.ReadApprovedQueryTopicsResponse import net.shrine.protocol.ReadQueryInstancesRequest import net.shrine.protocol.ReadQueryInstancesResponse import net.shrine.protocol.RunQueryRequest import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.query.Term import net.shrine.authorization.AuthorizationResult import net.shrine.authentication.Authenticator import net.shrine.authentication.AuthenticationResult import net.shrine.authentication.NotAuthenticatedException import net.shrine.protocol.ErrorResponse /** * @author Bill Simons * @author Clint Gilbert * @since 3/30/11 * @see http://cbmi.med.harvard.edu * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @see http://www.gnu.org/licenses/lgpl.html */ final class QepServiceTest extends AbstractAuditDaoTest with EasyMockSugar { import scala.concurrent.duration._ -/* @Test def testReadQueryInstances() { val projectId = "foo" val queryId = 123L val authn = AuthenticationInfo("some-domain", "some-username", Credential("blarg", isToken = false)) val req = ReadQueryInstancesRequest(projectId, 1.millisecond, authn, queryId) val service = QepService("example.com",null, AllowsAllAuthenticator, null, includeAggregateResult = true, null, null, Set.empty,collectQepAudit = false) val response = service.readQueryInstances(req).asInstanceOf[ReadQueryInstancesResponse] response should not be (null) response.groupId should equal(projectId) response.queryMasterId should equal(queryId) response.userId should equal(authn.username) val Seq(instance) = response.queryInstances instance.startDate should not be (null) instance.endDate should not be (null) instance.startDate should equal(instance.endDate) instance.groupId should equal(projectId) instance.queryInstanceId should equal(queryId.toString) instance.queryMasterId should equal(queryId.toString) instance.userId should equal(authn.username) } - */ private val authn = AuthenticationInfo("some-domain", "some-user", Credential("some-password", isToken = false)) private val projectId = "projectId" private val queryDef = QueryDefinition("yo", Term("foo")) private val request = RunQueryRequest(projectId, 1.millisecond, authn, Some("topicId"), Some("Topic Name"), Set.empty, queryDef) @Test def testRunQueryAggregatorFor() { def doTestRunQueryAggregatorFor(addAggregatedResult: Boolean) { val service = QepService("example.com",null, null, null, addAggregatedResult, null, null, Set.empty,collectQepAudit = false) val aggregator = service.runQueryAggregatorFor(request) aggregator should not be (null) aggregator.queryId should be(-1L) aggregator.groupId should be(projectId) aggregator.userId should be(authn.username) aggregator.requestQueryDefinition should be(queryDef) aggregator.addAggregatedResult should be(addAggregatedResult) } doTestRunQueryAggregatorFor(true) doTestRunQueryAggregatorFor(false) } @Test def testAuditTransactionally() = afterMakingTables { def doTestAuditTransactionally(shouldThrow: Boolean) { val service = QepService("example.com",auditDao, null, null, includeAggregateResult = true, null, null, Set.empty,collectQepAudit = false) if (shouldThrow) { intercept[Exception] { service.auditTransactionally(request)(throw new Exception) } } else { val x = 1 val actual = service.auditTransactionally(request)(x) actual should be(x) } //We should have recorded an audit entry no matter what val Seq(entry) = auditDao.findRecentEntries(1) entry.domain should be(authn.domain) entry.username should be(authn.username) entry.project should be(projectId) entry.queryText should be(Some(queryDef.toI2b2String)) entry.queryTopic should be(request.topicId) entry.time should not be (null) } doTestAuditTransactionally(false) doTestAuditTransactionally(true) } import QepServiceTest._ - /* @Test def testAfterAuthenticating() { def doTestAfterAuthenticating(shouldAuthenticate: Boolean) { val service = QepService("example.com",auditDao, new MockAuthenticator(shouldAuthenticate), new MockAuthService(true), includeAggregateResult = true, null, null, Set.empty,collectQepAudit = false) if (shouldAuthenticate) { var foo = false service.authenticateAndThen(request) { _ => foo = true } foo should be(right = true) } else { intercept[NotAuthenticatedException] { service.authenticateAndThen(request) { _ => () } } } } doTestAfterAuthenticating(true) doTestAfterAuthenticating(false) } -*/ + @Test def testAfterAuditingAndAuthorizing() = afterMakingTables { def doAfterAuditingAndAuthorizing(shouldBeAuthorized: Boolean, shouldThrow: Boolean) { val service = QepService("example.com",auditDao, AllowsAllAuthenticator, new MockAuthService(shouldBeAuthorized), includeAggregateResult = true, null, null, Set.empty,collectQepAudit = false) if (shouldThrow || !shouldBeAuthorized) { intercept[Exception] { service.auditAuthorizeAndThen(request)(request => throw new Exception) } } else { val x = 1 val actual = service.auditAuthorizeAndThen(request)(request => x) actual should be(x) } //We should have recorded an audit entry no matter what val Seq(entry) = auditDao.findRecentEntries(1) entry.domain should be(authn.domain) entry.username should be(authn.username) entry.project should be(projectId) entry.queryText should be(Some(queryDef.toI2b2String)) entry.queryTopic should be(request.topicId) entry.time should not be (null) } doAfterAuditingAndAuthorizing(shouldBeAuthorized = true, shouldThrow = true) doAfterAuditingAndAuthorizing(shouldBeAuthorized = true, shouldThrow = false) doAfterAuditingAndAuthorizing(shouldBeAuthorized = false, shouldThrow = true) doAfterAuditingAndAuthorizing(shouldBeAuthorized = false, shouldThrow = false) } } object QepServiceTest { final class MockAuthenticator(shouldAuthenticate: Boolean) extends Authenticator { override def authenticate(authn: AuthenticationInfo): AuthenticationResult = { if (shouldAuthenticate) { AuthenticationResult.Authenticated(authn.domain, authn.username) } else { AuthenticationResult.NotAuthenticated(authn.domain, authn.username, "blarg") } } } final class MockAuthService(shouldWork: Boolean) extends QueryAuthorizationService { def authorizeRunQueryRequest(request: RunQueryRequest): AuthorizationResult = { if (shouldWork) { val topicIdAndName = (request.topicId,request.topicName) match { case (Some(id),Some(name)) => Some((id,name)) case (None,None) => None } AuthorizationResult.Authorized(topicIdAndName)} else { AuthorizationResult.NotAuthorized("blarg") } } def readApprovedEntries(request: ReadApprovedQueryTopicsRequest): Either[ErrorResponse, ReadApprovedQueryTopicsResponse] = ??? } } \ No newline at end of file