diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala index 57fbcad79..34a0cafe4 100644 --- a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala +++ b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala @@ -1,358 +1,350 @@ package net.shrine.metadata import java.util.UUID import akka.actor.ActorSystem import net.shrine.audit.{NetworkQueryId, QueryName, Time} import net.shrine.authorization.steward.UserName import net.shrine.i2b2.protocol.pm.User import net.shrine.log.Loggable import net.shrine.problem.ProblemDigest import net.shrine.protocol.ResultOutputType -import net.shrine.qep.querydb.{FullQueryResult, QepQuery, QepQueryBreakdownResultsRow, QepQueryDb, QepQueryFlag} +import net.shrine.qep.querydb.{FullQueryResult, QepQuery, QepQueryBreakdownResultsRow, QepQueryDb, QepQueryDbChangeNotifier, QepQueryFlag} import rapture.json._ import rapture.json.formatters.humanReadable import rapture.json.jsonBackends.jawn._ import spray.http.{StatusCode, StatusCodes} import spray.routing._ -import scala.collection.concurrent.{TrieMap, Map => ConcurrentMap} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Promise import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Try /** * An API to support the web client's work with queries. * * The current API supplies information about previous running queries. Eventually this will support accessing * information about queries running now and the ability to submit queries. */ //todo move this to the qep/service module trait QepService extends HttpService with Loggable { def system: ActorSystem val qepInfo = """ |The SHRINE query entry point service. | |This API gives a researcher access to queries, and (eventually) the ability to run queries. | """.stripMargin def qepRoute(user: User): Route = pathPrefix("qep") { get { detach(){ queryResult(user) ~ queryResultsTable(user) } } ~ pathEndOrSingleSlash{complete(qepInfo)} ~ respondWithStatus(StatusCodes.NotFound){complete(qepInfo)} } - - //todo extract this bit to a different place - where the QEP messages come back - val unit:Unit = () - //todo for when you handle general json data, expand NetworkQueryId into a thing to be evaulated as part of the scan, and pass in the filter function - val longPollRequestsToComplete:ConcurrentMap[UUID,(NetworkQueryId,Promise[Unit])] = TrieMap.empty - - def triggerDataChangeFor(id:NetworkQueryId) = longPollRequestsToComplete.values.filter(_._1 == id).map(_._2.trySuccess(unit)) - /* Races to complete are OK in spray. They're already happening, in fact. When a request comes in if the request can be fulfilled immediately then do that if not create a promise to fulfil to trigger the complete create a promise to bump that first one on timeout schedule a runnable to bump the timeout promise create a promise to bump that first one if the conditions are right create a promise to bump the conditional one and stuff it in a concurrent map for other parts of the system to find onSuccess remove the conditional promise and cancel the scheduled timeout. */ def queryResult(user:User):Route = path("queryResult" / LongNumber) { queryId: NetworkQueryId => //take optional parameters for version and an awaitTime, but insist on both //If the timeout parameter isn't supplied then the deadline is now so it will reply immediately parameters('afterVersion.as[Long] ? 0L, 'timeoutSeconds.as[Long] ? 0L) { (afterVersion: Long, timeout: Long) => //todo check that the timeout is less than the spray "give up" timeout val requestStartTime = System.currentTimeMillis() val deadline = requestStartTime + (timeout * 1000) detach(){ val troubleOrResultsRow = selectResultsRow(queryId, user) if (shouldRespondNow(deadline, afterVersion, troubleOrResultsRow)) { //bypass all the concurrent/interrupt business. Just reply. completeWithQueryResult(troubleOrResultsRow) } else { // promise used to respond val okToRespond = Promise[Either[(StatusCode,String),ResultsRow]]() //Schedule the timeout val okToRespondTimeout = Promise[Unit]() okToRespondTimeout.future.transform({unit => okToRespond.tryComplete(Try(selectResultsRow(queryId, user))) },{x => x})//todo some logging val timeLeft = (deadline - System.currentTimeMillis()) milliseconds case class TriggerRunnable(networkQueryId: NetworkQueryId,promise: Promise[Unit]) extends Runnable { + val unit:Unit = () override def run(): Unit = promise.trySuccess(unit) } val timeoutCanceller = system.scheduler.scheduleOnce(timeLeft,TriggerRunnable(queryId,okToRespondTimeout)) //Set up for an interrupt from new data val okToRespondIfNewData = Promise[Unit]() okToRespondIfNewData.future.transform({unit => val latestResultsRow = selectResultsRow(queryId, user) if(shouldRespondNow(deadline,afterVersion,latestResultsRow)) { okToRespond.tryComplete(Try(selectResultsRow(queryId, user))) } },{x => x})//todo some logging val requestId = UUID.randomUUID() //put id -> okToRespondIfNewData in a map so that outside processes can trigger it - longPollRequestsToComplete.put(requestId,(queryId,okToRespondIfNewData)) + QepQueryDbChangeNotifier.putLongPollRequest(requestId,queryId,okToRespondIfNewData) onSuccess(okToRespond.future){ latestResultsRow:Either[(StatusCode,String),ResultsRow] => //clean up concurrent bits before responding - longPollRequestsToComplete.remove(requestId) + QepQueryDbChangeNotifier.removeLongPollRequest(requestId) timeoutCanceller.cancel() completeWithQueryResult(latestResultsRow) } } } } } /** * @param deadline time when a response must go * @param afterVersion last timestamp the requester knows about * @param resultsRow either the result row or something is not right * @return true to respond now, false to dither */ def shouldRespondNow(deadline: Long, afterVersion: Long, resultsRow:Either[(StatusCode,String),ResultsRow] ):Boolean = { val currentTime = System.currentTimeMillis() if (currentTime >= deadline) true else resultsRow.fold( {_._1 != StatusCodes.NotFound}, {_.version > afterVersion} ) } def completeWithQueryResult(troubleOrResultsRow:Either[(StatusCode,String),ResultsRow]): Route = { troubleOrResultsRow.fold({ trouble => //something is wrong. Respond now. respondWithStatus(trouble._1) { complete(trouble._2) } }, { queryAndResults => //everything is fine. Respond now. val json: Json = Json(queryAndResults) val formattedJson: String = Json.format(json)(humanReadable()) complete(formattedJson) }) } def selectResultsRow(queryId:NetworkQueryId,user:User):Either[(StatusCode,String),ResultsRow] = { //query once and determine if the latest change > afterVersion val queryOption: Option[QepQuery] = QepQueryDb.db.selectQueryById(queryId) queryOption.map{query: QepQuery => if (user.sameUserAs(query.userName, query.userDomain)) { val mostRecentQueryResults: Seq[Result] = QepQueryDb.db.selectMostRecentFullQueryResultsFor(queryId).map(Result(_)) val flag = QepQueryDb.db.selectMostRecentQepQueryFlagFor(queryId).map(QueryFlag(_)) val queryCell = QueryCell(query, flag) val queryAndResults = ResultsRow(queryCell, mostRecentQueryResults) Right(queryAndResults) } else Left((StatusCodes.Forbidden,s"Query $queryId belongs to a different user")) }.getOrElse(Left[(StatusCode,String),ResultsRow]((StatusCodes.NotFound,s"No query with id $queryId found"))) } def queryResultsTable(user: User): Route = path("queryResultsTable") { matchQueryParameters(Some(user.username)){ queryParameters:QueryParameters => val queryRowCount: Int = QepQueryDb.db.countPreviousQueriesByUserAndDomain( userName = user.username, domain = user.domain ) val queries: Seq[QepQuery] = QepQueryDb.db.selectPreviousQueriesByUserAndDomain( userName = user.username, domain = user.domain, skip = queryParameters.skipOption, limit = queryParameters.limitOption ) //todo revisit json structure to remove things the front-end doesn't use val adapters: Seq[String] = QepQueryDb.db.selectDistinctAdaptersWithResults val flags: Map[NetworkQueryId, QueryFlag] = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(queries.map(q => q.networkId).to[Set]) .map(q => q._1 -> QueryFlag(q._2)) val queryResults: Seq[ResultsRow] = queries.map(q => ResultsRow( query = QueryCell(q,flags.get(q.networkId)), results = QepQueryDb.db.selectMostRecentFullQueryResultsFor(q.networkId).map(Result(_)))) val table: ResultsTable = ResultsTable(queryRowCount,queryParameters.skipOption.getOrElse(0),adapters,queryResults) val jsonTable: Json = Json(table) val formattedTable: String = Json.format(jsonTable)(humanReadable()) complete(formattedTable) } } def matchQueryParameters(userName: Option[UserName])(parameterRoute: QueryParameters => Route): Route = { parameters('skip.as[Int].?, 'limit.as[Int].?) { (skipOption, limitOption) => val qp = QueryParameters( userName, skipOption, limitOption ) parameterRoute(qp) } } } //todo maybe move to QepQueryDb class case class QueryParameters( researcherIdOption:Option[UserName] = None, skipOption:Option[Int] = None, limitOption:Option[Int] = None ) case class ResultsTable( rowCount:Int, rowOffset:Int, adapters:Seq[String], //todo type for adapter name queryResults:Seq[ResultsRow] ) case class ResultsRow( query:QueryCell, results: Seq[Result], isComplete: Boolean, //a member variable to appear in json version:Long //a time stamp in 1.23, a counting integer in a future release ) object ResultsRow { def apply( query: QueryCell, results: Seq[Result] ): ResultsRow = { val isComplete = if (results.isEmpty) false else results.forall(_.isComplete) val version = (Seq(query.changeDate) ++ results.map(_.changeDate)).max //the latest change date ResultsRow(query, results, isComplete, version) } } case class QueryCell( networkId:String, //easier to support in json, lessens the impact of using a GUID iff we can get there queryName: QueryName, dateCreated: Time, queryXml: String, changeDate: Time, flag:Option[QueryFlag] ) object QueryCell { def apply(qepQuery: QepQuery,flag: Option[QueryFlag]): QueryCell = QueryCell( networkId = qepQuery.networkId.toString, queryName = qepQuery.queryName, dateCreated = qepQuery.dateCreated, queryXml = qepQuery.queryXml, changeDate = qepQuery.changeDate, flag ) } case class QueryFlag( flagged:Boolean, flagMessage:String, changeDate:Long ) object QueryFlag{ def apply(qepQueryFlag: QepQueryFlag): QueryFlag = QueryFlag(qepQueryFlag.flagged, qepQueryFlag.flagMessage, qepQueryFlag.changeDate) } case class Result ( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:Option[ResultOutputType], count:Long, status:String, //todo QueryResult.StatusType, statusMessage:Option[String], changeDate:Long, breakdowns: Seq[BreakdownResultsForType], problemDigest:Option[ProblemDigestForJson] ) { def isComplete = true //todo until I get to SHRINE-2148 } object Result { def apply(fullQueryResult: FullQueryResult): Result = new Result( resultId = fullQueryResult.resultId, networkQueryId = fullQueryResult.networkQueryId, instanceId = fullQueryResult.instanceId, adapterNode = fullQueryResult.adapterNode, resultType = fullQueryResult.resultType, count = fullQueryResult.count, status = fullQueryResult.status.toString, statusMessage = fullQueryResult.statusMessage, changeDate = fullQueryResult.changeDate, breakdowns = fullQueryResult.breakdownTypeToResults.map(tToR => BreakdownResultsForType(fullQueryResult.adapterNode,tToR._1,tToR._2)).to[Seq], problemDigest = fullQueryResult.problemDigest.map(ProblemDigestForJson(_)) ) } //todo replace when you figure out how to json-ize xml in rapture case class ProblemDigestForJson(codec: String, stampText: String, summary: String, description: String, detailsString: String, epoch: Long) object ProblemDigestForJson { def apply(problemDigest: ProblemDigest): ProblemDigestForJson = ProblemDigestForJson( problemDigest.codec, problemDigest.stampText, problemDigest.summary, problemDigest.description, problemDigest.detailsXml.text, problemDigest.epoch) } case class BreakdownResultsForType(resultType:ResultOutputType,results:Seq[BreakdownResult]) object BreakdownResultsForType { def apply(adapterName: String, breakdownType: ResultOutputType, breakdowns: Seq[QepQueryBreakdownResultsRow]): BreakdownResultsForType = { val breakdownResults = breakdowns.filter(_.adapterNode == adapterName).map(row => BreakdownResult(row.dataKey,row.value,row.changeDate)) BreakdownResultsForType(breakdownType,breakdownResults) } } case class BreakdownResult(dataKey:String,value:Long,changeDate:Long) diff --git a/apps/meta-app/src/test/scala/net/shrine/metadata/QepServiceTest.scala b/apps/meta-app/src/test/scala/net/shrine/metadata/QepServiceTest.scala index da6b588c2..6236fb62d 100644 --- a/apps/meta-app/src/test/scala/net/shrine/metadata/QepServiceTest.scala +++ b/apps/meta-app/src/test/scala/net/shrine/metadata/QepServiceTest.scala @@ -1,317 +1,315 @@ package net.shrine.metadata import akka.actor.ActorRefFactory import net.shrine.i2b2.protocol.pm.User import net.shrine.protocol.{Credential, QueryResult, ResultOutputType} -import net.shrine.qep.querydb.{QepQuery, QepQueryDb, QueryResultRow} +import net.shrine.qep.querydb.{QepQuery, QepQueryDb, QepQueryDbChangeNotifier, QueryResultRow} import org.json4s.DefaultFormats import org.scalatest.{BeforeAndAfterEach, Suite} import scala.language.postfixOps //import org.json4s.native.JsonMethods.parse import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import spray.testkit.ScalatestRouteTest /** * @author david * @since 3/30/17 */ @RunWith(classOf[JUnitRunner]) class QepServiceTest extends FlatSpec with ScalatestRouteTest with QepService with TestWithDatabase { override def actorRefFactory: ActorRefFactory = system import scala.concurrent.duration._ implicit val routeTestTimeout = RouteTestTimeout(30.seconds) import spray.http.StatusCodes._ "QepService" should "return an OK and qepInfo for a dead-end route" in { Get(s"/qep") ~> qepRoute(researcherUser) ~> check { implicit val formats = DefaultFormats val result = body.data.asString assertResult(OK)(status) assertResult(qepInfo)(result) } } "QepService" should "return a NotFound and qepInfo for a non-existant route" in { Get(s"/qep/flarg") ~> qepRoute(researcherUser) ~> check { implicit val formats = DefaultFormats val result = body.data.asString assertResult(NotFound)(status) assertResult(qepInfo)(result) } } val queryReceiveTime = System.currentTimeMillis() val qepQuery = QepQuery( networkId = 1L, userName = "ben", userDomain = "testDomain", queryName = "testQuery", expression = Some("testExpression"), dateCreated = queryReceiveTime, deleted = false, queryXml = "testXML", changeDate = queryReceiveTime ) val qepResultRowFromMgh = QueryResultRow( resultId = 10L, networkQueryId = 1L, instanceId = 100L, adapterNode = "MGH", resultType = Some(ResultOutputType.PATIENT_COUNT_XML), size = 30L, startDate = Some(queryReceiveTime + 10), endDate = Some(queryReceiveTime + 20), status = QueryResult.StatusType.Finished, statusMessage = None, changeDate = queryReceiveTime + 20 ) val qepResultRowFromPartners = QueryResultRow( resultId = 10L, networkQueryId = 1L, instanceId = 100L, adapterNode = "Partners", resultType = Some(ResultOutputType.PATIENT_COUNT_XML), size = 300L, startDate = Some(queryReceiveTime + 10), endDate = Some(queryReceiveTime + 20), status = QueryResult.StatusType.Finished, statusMessage = None, changeDate = queryReceiveTime + 30 ) val qepResultRowFromBch = QueryResultRow( resultId = 10L, networkQueryId = 1L, instanceId = 100L, adapterNode = "BCH", resultType = Some(ResultOutputType.PATIENT_COUNT_XML), size = 3000L, startDate = Some(queryReceiveTime + 10), endDate = Some(queryReceiveTime + 20), status = QueryResult.StatusType.Finished, statusMessage = None, changeDate = queryReceiveTime + 40 ) val qepResultRowFromDfci = QueryResultRow( resultId = 10L, networkQueryId = 1L, instanceId = 100L, adapterNode = "DFCI", resultType = Some(ResultOutputType.PATIENT_COUNT_XML), size = 30000L, startDate = Some(queryReceiveTime + 10), endDate = Some(queryReceiveTime + 20), status = QueryResult.StatusType.Finished, statusMessage = None, changeDate = queryReceiveTime + 50 ) "QepService" should "return an OK and a row of data for a queryResult request" in { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepResultRow(qepResultRowFromBch) QepQueryDb.db.insertQepResultRow(qepResultRowFromMgh) QepQueryDb.db.insertQepResultRow(qepResultRowFromDfci) QepQueryDb.db.insertQepResultRow(qepResultRowFromPartners) Get(s"/qep/queryResult/${qepQuery.networkId}") ~> qepRoute(researcherUser) ~> check { implicit val formats = DefaultFormats val result = body.data.asString assertResult(OK)(status) //todo check json result after format is pinned down assertResult(qepInfo)(result) } } "QepService" should "return an OK and a row of data for a queryResult request with the version and timeoutSeconds parameters" in { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepResultRow(qepResultRowFromBch) QepQueryDb.db.insertQepResultRow(qepResultRowFromMgh) QepQueryDb.db.insertQepResultRow(qepResultRowFromDfci) QepQueryDb.db.insertQepResultRow(qepResultRowFromPartners) Get(s"/qep/queryResult/${qepQuery.networkId}?timeoutSeconds=10&afterVersion=${queryReceiveTime - 60}") ~> qepRoute(researcherUser) ~> check { implicit val formats = DefaultFormats val result = body.data.asString assertResult(OK)(status) - println(result) - //todo check json result after format is pinned down assertResult(qepInfo)(result) } } "QepService" should "return a NotFound with a bad query id for a queryResult request" in { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepResultRow(qepResultRowFromBch) QepQueryDb.db.insertQepResultRow(qepResultRowFromMgh) QepQueryDb.db.insertQepResultRow(qepResultRowFromDfci) QepQueryDb.db.insertQepResultRow(qepResultRowFromPartners) Get(s"/qep/queryResult/${20L}") ~> qepRoute(researcherUser) ~> check { implicit val formats = DefaultFormats val result = body.data.asString assertResult(NotFound)(status) } } "QepService" should "return a Forbidden if the user is the wrong user for the query for a queryResult request" in { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepResultRow(qepResultRowFromBch) QepQueryDb.db.insertQepResultRow(qepResultRowFromMgh) QepQueryDb.db.insertQepResultRow(qepResultRowFromDfci) QepQueryDb.db.insertQepResultRow(qepResultRowFromPartners) Get(s"/qep/queryResult/${qepQuery.networkId}") ~> qepRoute(wrongUser) ~> check { implicit val formats = DefaultFormats val result = body.data.asString assertResult(Forbidden)(status) } } "QepService" should "return an OK and a row of data for a queryResult request with the version and timeoutSeconds parameters if the version hasn't changed, but not until after timeout" in { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepResultRow(qepResultRowFromBch) QepQueryDb.db.insertQepResultRow(qepResultRowFromMgh) QepQueryDb.db.insertQepResultRow(qepResultRowFromDfci) QepQueryDb.db.insertQepResultRow(qepResultRowFromPartners) val start = System.currentTimeMillis() val timeout = 5 Get(s"/qep/queryResult/${qepQuery.networkId}?timeoutSeconds=$timeout&afterVersion=${queryReceiveTime+50}") ~> qepRoute(researcherUser) ~> check { implicit val formats = DefaultFormats val result = body.data.asString assertResult(OK)(status) val end = System.currentTimeMillis() assert(end - start >= timeout * 1000,s"The call took ${end - start} but should have taken at least ${timeout*1000}") //todo check json result after format is pinned down assertResult(qepInfo)(result) } } "QepService" should "return an OK and a row of data for a queryResult request with the version and timeoutSeconds parameters before the timeoutSeconds if the version changes while waiting" in { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepResultRow(qepResultRowFromMgh) QepQueryDb.db.insertQepResultRow(qepResultRowFromPartners) // QepQueryDb.db.insertQepResultRow(qepResultRowFromDfci) val start = System.currentTimeMillis() val delay = 2000 object Inserter extends Runnable { override def run(): Unit = { QepQueryDb.db.insertQepResultRow(qepResultRowFromBch) - triggerDataChangeFor(qepResultRowFromBch.networkQueryId) + QepQueryDbChangeNotifier.triggerDataChangeFor(qepResultRowFromBch.networkQueryId) } } system.scheduler.scheduleOnce(delay milliseconds,Inserter) val timeout = 5 Get(s"/qep/queryResult/${qepQuery.networkId}?timeoutSeconds=$timeout&afterVersion=${queryReceiveTime+35}") ~> qepRoute(researcherUser) ~> check { implicit val formats = DefaultFormats val result = body.data.asString assertResult(OK)(status) val end = System.currentTimeMillis() assert(end - start >= delay,s"The call took ${end - start} but should have taken at least $delay") assert(end - start < timeout * 1000,s"The call took ${end - start} but should have taken less than $timeout") //todo check json result after format is pinned down assertResult(qepInfo)(result) } } "QepService" should "return an OK and a table of data for a queryResultsTable request" in { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepResultRow(qepResultRowFromBch) QepQueryDb.db.insertQepResultRow(qepResultRowFromMgh) QepQueryDb.db.insertQepResultRow(qepResultRowFromDfci) QepQueryDb.db.insertQepResultRow(qepResultRowFromPartners) Get(s"/qep/queryResultsTable") ~> qepRoute(researcherUser) ~> check { implicit val formats = DefaultFormats val result = body.data.asString assertResult(OK)(status) /* println("************") println(result) println("************") */ //todo check json result assertResult(qepInfo)(result) } } /* todo "QepService" should "return an OK and a table of data for a queryResultsTable request with different skip and limit values" in { Get(s"/qep/queryResultsTable?skip=2&limit=2") ~> qepRoute(researcherUser) ~> check { implicit val formats = DefaultFormats val result = body.data.asString assertResult(OK)(status) //todo check json result assertResult(qepInfo)(result) } } */ val researcherUserName = "ben" val researcherFullName = researcherUserName val researcherUser = User( fullName = researcherUserName, username = researcherFullName, domain = "testDomain", credential = new Credential("researcher's password",false), params = Map(), rolesByProject = Map() ) val wrongUser = User( fullName = "Wrong User", username = "wrong", domain = "testDomain", credential = new Credential("researcher's password",false), params = Map(), rolesByProject = Map() ) } trait TestWithDatabase extends BeforeAndAfterEach { this: Suite => override def beforeEach() = { QepQueryDb.db.createTables() } override def afterEach() = { QepQueryDb.db.dropTables() } } diff --git a/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDbChangeNotifier.scala b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDbChangeNotifier.scala new file mode 100644 index 000000000..c4bfda7b7 --- /dev/null +++ b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDbChangeNotifier.scala @@ -0,0 +1,31 @@ +package net.shrine.qep.querydb + +import java.util.UUID + +import net.shrine.audit.NetworkQueryId + +import scala.concurrent.Promise + +import scala.collection.concurrent.{TrieMap, Map => ConcurrentMap} + +/** + * + * + * @author david + * @since 8/12/17 + */ +//todo when we do the json data work, this class can grow up to be part of that subproject and leave here +object QepQueryDbChangeNotifier { + + val unit:Unit = () + //todo for when you handle general json data, expand NetworkQueryId into a p: A => Boolean for filter, to be evaulated as part of the scan, and pass in the changed object. Maybe even replace unit with the changed object + val longPollRequestsToComplete:ConcurrentMap[UUID,(NetworkQueryId,Promise[Unit])] = TrieMap.empty + + /* scan all the pending Promises to see if any can be fulfilled */ + def triggerDataChangeFor(queryId:NetworkQueryId) = longPollRequestsToComplete.values.filter(_._1 == queryId).map(_._2.trySuccess(unit)) + + def putLongPollRequest(requstId:UUID,queryId:NetworkQueryId,promise: Promise[Unit]) = longPollRequestsToComplete.put(requstId,(queryId,promise)) + + def removeLongPollRequest(requestId:UUID) = longPollRequestsToComplete.remove(requestId) + +}