diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/DeleteQueryAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/DeleteQueryAdapter.scala index 6acb1d44c..f029f838b 100644 --- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/DeleteQueryAdapter.scala +++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/DeleteQueryAdapter.scala @@ -1,27 +1,27 @@ package net.shrine.adapter import net.shrine.adapter.dao.AdapterDao import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.DeleteQueryRequest import net.shrine.protocol.DeleteQueryResponse import net.shrine.protocol.ShrineResponse /** * @author Bill Simons * @date 4/12/11 * @link http://cbmi.med.harvard.edu * @link http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @link http://www.gnu.org/licenses/lgpl.html */ final class DeleteQueryAdapter(dao: AdapterDao) extends Adapter { override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = { val request = message.request.asInstanceOf[DeleteQueryRequest] - dao.deleteQuery(request.queryId) + dao.deleteQuery(request.networkQueryId) - DeleteQueryResponse(request.queryId) + DeleteQueryResponse(request.networkQueryId) } } \ No newline at end of file diff --git a/commons/crypto/src/test/scala/net/shrine/crypto/DefaultSignerVerifierTest.scala b/commons/crypto/src/test/scala/net/shrine/crypto/DefaultSignerVerifierTest.scala index 013681ab7..0d2fc4df3 100644 --- a/commons/crypto/src/test/scala/net/shrine/crypto/DefaultSignerVerifierTest.scala +++ b/commons/crypto/src/test/scala/net/shrine/crypto/DefaultSignerVerifierTest.scala @@ -1,535 +1,535 @@ package net.shrine.crypto import net.shrine.util.{Base64, ShouldMatchersForJUnit, XmlGcEnrichments} import org.junit.Test import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.AuthenticationInfo import net.shrine.protocol.Credential import net.shrine.protocol.DeleteQueryRequest import net.shrine.protocol.RunQueryRequest import net.shrine.protocol.ResultOutputType import net.shrine.protocol.query.QueryDefinition import net.shrine.protocol.query.Term import net.shrine.protocol.query.Modifiers import net.shrine.protocol.query.Or import net.shrine.protocol.ReadQueryResultRequest import net.shrine.protocol.DefaultBreakdownResultOutputTypes import net.shrine.protocol.query.Constrained import net.shrine.protocol.query.ValueConstraint import net.shrine.protocol.CertId import java.math.BigInteger import java.security.cert.X509Certificate import java.security.cert.CertificateFactory import scala.io.Source import java.io.ByteArrayInputStream /** * @author clint * @since Nov 27, 2013 */ final class DefaultSignerVerifierTest extends ShouldMatchersForJUnit { private val authn = AuthenticationInfo("some-domain", "some-username", Credential("sadkljlajdl", isToken = false)) private val certCollection = TestKeystore.certCollection private val signerVerifier = new DefaultSignerVerifier(certCollection) import SigningCertStrategy._ import scala.concurrent.duration._ @Test def testIssuersMatchBetweenCertsWithIPsInDistinguishedNames(): Unit = { def readCert(fileName: String): X509Certificate = { val factory = CertificateFactory.getInstance("X.509") val source = Source.fromInputStream(getClass.getClassLoader.getResourceAsStream(fileName)) val encodedCertData = try { source.mkString } finally { source.close() } val byteStream = new ByteArrayInputStream(Base64.fromBase64(encodedCertData)) try { factory.generateCertificate(byteStream).asInstanceOf[X509Certificate] } finally { byteStream.close() } } val ca = readCert("test-caroot.pem") val alpha = readCert("test-alpha-signed.pem") val beta = readCert("test-beta-signed.pem") val gamma = readCert("test-gamma-signed.pem") def shouldMatch[F](field: X509Certificate => F)(a: X509Certificate, b: X509Certificate) { field(a) should equal(field(b)) //Use options to handle null fields Option(field(a)).map(_.hashCode) should equal(Option(field(b)).map(_.hashCode)) } shouldMatch(_.getIssuerDN)(ca, alpha) shouldMatch(_.getIssuerDN)(ca, beta) shouldMatch(_.getIssuerDN)(ca, gamma) shouldMatch(_.getIssuerX500Principal)(ca, alpha) shouldMatch(_.getIssuerX500Principal)(ca, beta) shouldMatch(_.getIssuerX500Principal)(ca, gamma) shouldMatch(_.getIssuerUniqueID)(ca, alpha) shouldMatch(_.getIssuerUniqueID)(ca, beta) shouldMatch(_.getIssuerUniqueID)(ca, gamma) ca.getSerialNumber should not equal(alpha.getSerialNumber) ca.getSerialNumber should not equal(beta.getSerialNumber) ca.getSerialNumber should not equal(gamma.getSerialNumber) } @Test def testSigningAndVerificationQueryDefWithSubQueries(): Unit = { //A failing case reported by Ben C. val queryDef = QueryDefinition.fromI2b2 { (t) (493.90) Asthma(250.00) Diabet@15:32:58 ANY 0 Event 1 STARTDATE FIRST LESS Event 2 STARTDATE FIRST GREATEREQUAL 365 DAY Event 1 EVENT Event 1 SAMEINSTANCENUM 0 1 100 0 SAMEINSTANCENUM 1 6 (493.90) Asthma, unspecified type, unspecified \\SHRINE\SHRINE\Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.90) Asthma, unspecified type, unspecified\ Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.90) Asthma, unspecified type, unspecified\ ENC LA false 6 (493.91) Asthma, unspecified type, with status asthmaticus \\SHRINE\SHRINE\Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.91) Asthma, unspecified type, with status asthmaticus\ Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.91) Asthma, unspecified type, with status asthmaticus\ ENC LA false 6 (493.92) Asthma, unspecified type, with (acute) exacerbation \\SHRINE\SHRINE\Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.92) Asthma, unspecified type, with (acute) exacerbation\ Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.92) Asthma, unspecified type, with (acute) exacerbation\ ENC LA false Event 2 EVENT Event 2 SAMEINSTANCENUM 0 1 100 0 SAMEINSTANCENUM 1 6 (250.00) Diabetes mellitus without mention of complication, type II or unspecified type, not stated as uncontrolled \\SHRINE\SHRINE\Diagnoses\Endocrine, nutritional and metabolic diseases, and immunity disorders (240-279.99)\Diseases of other endocrine glands (249-259.99)\Diabetes mellitus (250)\Diabetes mellitus without mention of complication (250.0)\(250.00) Diabetes mellitus without mention of complication, type II or unspecified type, not stated as uncontrolled\ Diagnoses\Endocrine, nutritional and metabolic diseases, and immunity disorders (240-279.99)\Diseases of other endocrine glands (249-259.99)\Diabetes mellitus (250)\Diabetes mellitus without mention of complication (250.0)\(250.00) Diabetes mellitus without mention of complication, type II or unspecified type, not stated as uncontrolled\ ENC LA false 6 (530.81) Esophageal reflux \\SHRINE\SHRINE\Diagnoses\Diseases of the digestive system (520-579.99)\Diseases of esophagus, stomach, and duodenum (530-539.99)\Diseases of esophagus (530)\Other specified disorders of esophagus (530.8)\(530.81) Esophageal reflux\ Diagnoses\Diseases of the digestive system (520-579.99)\Diseases of esophagus, stomach, and duodenum (530-539.99)\Diseases of esophagus (530)\Other specified disorders of esophagus (530.8)\(530.81) Esophageal reflux\ ENC LA false }.get def shouldVerify(signingCertStrategy: SigningCertStrategy): Unit = { val resultTypes = DefaultBreakdownResultOutputTypes.toSet + ResultOutputType.PATIENT_COUNT_XML val unsignedMessage = BroadcastMessage(authn, RunQueryRequest("some-project-id", 12345.milliseconds, authn, Some("topic-id"), Some("Topic Name"), resultTypes, queryDef)) val signedMessage = signerVerifier.sign(unsignedMessage, signingCertStrategy) signerVerifier.verifySig(signedMessage, 1.hour) should be(right = true) //NB: Simulate going from one machine to another val roundTripped = xmlRoundTrip(signedMessage) roundTripped.signature should equal(signedMessage.signature) signerVerifier.verifySig(roundTripped, 1.hour) should be(right = true) } //shouldVerify(Attach) shouldVerify(DontAttach) } //See https://open.med.harvard.edu/jira/browse/SHRINE-859 @Test def testSigningAndVerificationQueryNameWithSpaces(): Unit = { def shouldVerify(queryName: String, signingCertStrategy: SigningCertStrategy): Unit = { val queryDef = QueryDefinition(queryName, Term("""\\PCORNET\PCORI\DEMOGRAPHIC\Age\>= 65 years old\65\""")) val resultTypes = DefaultBreakdownResultOutputTypes.toSet + ResultOutputType.PATIENT_COUNT_XML val unsignedMessage = BroadcastMessage(authn, RunQueryRequest("some-project-id", 12345.milliseconds, authn, Some("topic-id"), Some("Topic Name"), resultTypes, queryDef)) val signedMessage = signerVerifier.sign(unsignedMessage, signingCertStrategy) signerVerifier.verifySig(signedMessage, 1.hour) should be(right = true) //NB: Simulate going from one machine to another val roundTripped = xmlRoundTrip(signedMessage) roundTripped.signature should equal(signedMessage.signature) signerVerifier.verifySig(roundTripped, 1.hour) should be(right = true) } shouldVerify("foo", Attach) shouldVerify(" foo", Attach) shouldVerify("foo ", Attach) shouldVerify("fo o", Attach) shouldVerify(" 65 years old@13:23:00", Attach) shouldVerify("foo", DontAttach) shouldVerify(" foo", DontAttach) shouldVerify("foo ", DontAttach) shouldVerify("fo o", DontAttach) shouldVerify(" 65 years old@13:23:00", DontAttach) } @Test def testSigningAndVerification(): Unit = doTestSigningAndVerification(signerVerifier) @Test def testSigningAndVerificationAttachedKnownCertNotSignedByCA(): Unit = { //Messages will be signed with a key that's in our keystore, but is not signed by a CA val descriptor = KeyStoreDescriptor( "shrine.keystore.multiple-private-keys", "chiptesting", Some("private-key-2"), Seq("carra ca"), KeyStoreType.JKS) val mySignerVerifier = new DefaultSignerVerifier(KeyStoreCertCollection.fromClassPathResource(descriptor)) val unsignedMessage = BroadcastMessage(authn, DeleteQueryRequest("some-project-id", 12345.milliseconds, authn, 87356L)) val signedMessage = mySignerVerifier.sign(unsignedMessage, Attach) mySignerVerifier.verifySig(signedMessage, 1.hour) should be(right = false) } @Test def testSigningAndVerificationAttachedUnknownCertNotSignedByCA(): Unit = { //Messages will be signed with a key that's NOT in our keystore, but is not signed by a CA val signerDescriptor = KeyStoreDescriptor( "shrine.keystore.multiple-private-keys", "chiptesting", Some("private-key-2"), //This cert is NOT in TestKeystore.certCollection Seq("carra ca"), KeyStoreType.JKS) val signer: Signer = new DefaultSignerVerifier(KeyStoreCertCollection.fromClassPathResource(signerDescriptor)) val verifier: Verifier = new DefaultSignerVerifier(TestKeystore.certCollection) val unsignedMessage = BroadcastMessage(authn, DeleteQueryRequest("some-project-id", 12345.milliseconds, authn, 87356L)) val signedMessage = signer.sign(unsignedMessage, Attach) verifier.verifySig(signedMessage, 1.hour) should be(right = false) } private def doTestSigningAndVerification(signerVerifier: Signer with Verifier): Unit = { def doTest(signingCertStrategy: SigningCertStrategy) { val unsignedMessage = BroadcastMessage(authn, DeleteQueryRequest("some-project-id", 12345.milliseconds, authn, 87356L)) val signedMessage = signerVerifier.sign(unsignedMessage, signingCertStrategy) (unsignedMessage eq signedMessage) should be(right = false) unsignedMessage should not equal (signedMessage) signedMessage.networkAuthn should equal(unsignedMessage.networkAuthn) signedMessage.request should equal(unsignedMessage.request) signedMessage.requestId should equal(unsignedMessage.requestId) unsignedMessage.signature should be(None) signedMessage.signature.isDefined should be(right = true) val sig = signedMessage.signature.get sig.timestamp should not be (null) sig.signedBy should equal(certCollection.myCertId.get) sig.value should not be (null) //The signed message should verify signerVerifier.verifySig(signedMessage, 1.hour) should be(right = true) def shouldNotVerify(message: BroadcastMessage) { signerVerifier.verifySig(xmlRoundTrip(message), 1.hour) should be(right = false) } //The unsigned one should not shouldNotVerify(unsignedMessage) //Expired sigs shouldn't verify signerVerifier.verifySig(signedMessage, 0.hours) should be(right = false) //modifying anything should prevent verification shouldNotVerify { - val anotherRequest = signedMessage.request.asInstanceOf[DeleteQueryRequest].copy(queryId = 123L) + val anotherRequest = signedMessage.request.asInstanceOf[DeleteQueryRequest].copy(networkQueryId = 123L) signedMessage.withRequest(anotherRequest) } shouldNotVerify { signedMessage.withRequestId(99999L) } shouldNotVerify { signedMessage.copy(networkAuthn = signedMessage.networkAuthn.copy(domain = "askldjlakjsd")) } shouldNotVerify { signedMessage.copy(networkAuthn = signedMessage.networkAuthn.copy(username = "askldjlakjsd")) } shouldNotVerify { signedMessage.copy(networkAuthn = signedMessage.networkAuthn.copy(credential = signedMessage.networkAuthn.credential.copy(isToken = true))) } shouldNotVerify { signedMessage.copy(networkAuthn = signedMessage.networkAuthn.copy(credential = signedMessage.networkAuthn.credential.copy(value = "oieutorutoirutioerutoireuto"))) } shouldNotVerify { val timestamp = signedMessage.signature.get.timestamp import scala.concurrent.duration._ import XmlGcEnrichments._ val newTimestamp = timestamp + 123.minutes signedMessage.withSignature(signedMessage.signature.get.copy(timestamp = newTimestamp)) } shouldNotVerify { val timestamp = signedMessage.signature.get.timestamp import scala.concurrent.duration._ import XmlGcEnrichments._ val newTimestamp = timestamp + (-99).minutes signedMessage.withSignature(signedMessage.signature.get.copy(timestamp = newTimestamp)) } } doTest(Attach) doTest(DontAttach) } @Test def testSigningAndVerificationModifiedTerm(): Unit = { import scala.concurrent.duration._ def doVerificationTest(signingCertStrategy: SigningCertStrategy, queryDef: QueryDefinition): Unit = { val req = RunQueryRequest("some-project-id", 12345.milliseconds, authn, Some("topic-id"), Some("Topic Name"), Set(ResultOutputType.PATIENT_COUNT_XML), queryDef) val unsignedMessage = BroadcastMessage(authn, req) val signedMessage = signerVerifier.sign(unsignedMessage, signingCertStrategy) (unsignedMessage eq signedMessage) should be(right = false) unsignedMessage should not equal (signedMessage) signedMessage.networkAuthn should equal(unsignedMessage.networkAuthn) signedMessage.request should equal(unsignedMessage.request) signedMessage.requestId should equal(unsignedMessage.requestId) unsignedMessage.signature should be(None) signedMessage.signature.isDefined should be(right = true) val sig = signedMessage.signature.get sig.timestamp should not be (null) sig.signedBy should equal(certCollection.myCertId.get) sig.value should not be (null) //The signed message should verify signerVerifier.verifySig(xmlRoundTrip(signedMessage), 1.hour) should be(right = true) } val t1 = Term("t1") val t2 = Term("t2") val t3 = Term("t3") doVerificationTest(Attach, QueryDefinition("foo", Term(""))) doVerificationTest(Attach, QueryDefinition("foo", Constrained(t1, Some(Modifiers("n", "ap", t2.value)), None))) doVerificationTest(Attach, QueryDefinition("foo", Or(t1, Constrained(t2, Some(Modifiers("n", "ap", t3.value)), None)))) doVerificationTest(Attach, QueryDefinition("foo", Or(t1, Constrained(t2, None, Some(ValueConstraint("foo", Some("bar"), "baz", "Nuh")))))) doVerificationTest(Attach, QueryDefinition("foo", Or(t1, Constrained(t2, None, Some(ValueConstraint("foo", None, "baz", "Nuh")))))) doVerificationTest(DontAttach, QueryDefinition("foo", Term(""))) doVerificationTest(DontAttach, QueryDefinition("foo", Constrained(t1, Some(Modifiers("n", "ap", t2.value)), None))) doVerificationTest(DontAttach, QueryDefinition("foo", Or(t1, Constrained(t2, Some(Modifiers("n", "ap", t3.value)), None)))) doVerificationTest(DontAttach, QueryDefinition("foo", Or(t1, Constrained(t2, None, Some(ValueConstraint("foo", Some("bar"), "baz", "Nuh")))))) doVerificationTest(DontAttach, QueryDefinition("foo", Or(t1, Constrained(t2, None, Some(ValueConstraint("foo", None, "baz", "Nuh")))))) } @Test def testSigningAndVerificationReadQueryResultRequest(): Unit = { def doTest(signingCertStrategy: SigningCertStrategy) { val localAuthn = AuthenticationInfo("i2b2demo", "shrine", Credential("SessionKey:PX4LlvLrMhybWRQfoobarbaz", isToken = true)) import scala.concurrent.duration._ val unsignedMessage = BroadcastMessage(authn, ReadQueryResultRequest("SHRINE", 180.seconds, localAuthn, 7923919416951966472L)) val signedMessage = signerVerifier.sign(unsignedMessage, signingCertStrategy) (unsignedMessage eq signedMessage) should be(right = false) unsignedMessage should not equal (signedMessage) signedMessage.networkAuthn should equal(unsignedMessage.networkAuthn) signedMessage.request should equal(unsignedMessage.request) signedMessage.requestId should equal(unsignedMessage.requestId) unsignedMessage.signature should be(None) signedMessage.signature.isDefined should be(right = true) val sig = signedMessage.signature.get sig.timestamp should not be (null) sig.signedBy should equal(certCollection.myCertId.get) sig.value should not be (null) import scala.concurrent.duration._ //The signed message should verify signerVerifier.verifySig(xmlRoundTrip(signedMessage), 1.hour) should be(right = true) } doTest(Attach) doTest(DontAttach) } private def getCertByAlias(alias: String) = certCollection.asInstanceOf[KeyStoreCertCollection].getX509Cert(alias).get @Test def testIsSignedByTrustedCA(): Unit = { import signerVerifier.isSignedByTrustedCA val signedByCa = getCertByAlias("test-cert") val notSignedByCa = getCertByAlias("spin-t1") isSignedByTrustedCA(signedByCa).get should be(right = true) isSignedByTrustedCA(notSignedByCa).isFailure should be(right = true) //TODO: Test case where isSignedByTrustedCA produces Success(false): //where CA cert (param's issuer-DN) IS in our keystore, but X509Certificate.verify(PublicKey) returns false } @Test def testObtainAndValidateSigningCert(): Unit = { import signerVerifier.obtainAndValidateSigningCert import scala.concurrent.duration._ val unsignedMessage = BroadcastMessage(authn, ReadQueryResultRequest("SHRINE", 180.seconds, authn, 7923919416951966472L)) //attached signing cert signed by known CA { val signedMessageWithAttachedSigner = signerVerifier.sign(unsignedMessage, SigningCertStrategy.Attach) val signerCert = obtainAndValidateSigningCert(signedMessageWithAttachedSigner.signature.get).get signerCert should equal(getCertByAlias("test-cert")) } //Known signer, no attached signing cert { val signedMessageWithoutAttachedSigner = signerVerifier.sign(unsignedMessage, SigningCertStrategy.DontAttach) val signerCert = obtainAndValidateSigningCert(signedMessageWithoutAttachedSigner.signature.get).get signerCert should equal(getCertByAlias("test-cert")) } //No attached cert, unknown signer: obtaining signing cert should fail { val signedMessage = signerVerifier.sign(unsignedMessage, SigningCertStrategy.DontAttach) val unknownSigner = CertId(new BigInteger("-1")) val signedMessageWithUnknownSigner = signedMessage.copy(signature = Some(signedMessage.signature.get.copy(signedBy = unknownSigner))) val signerCertAttempt = obtainAndValidateSigningCert(signedMessageWithUnknownSigner.signature.get) signerCertAttempt.isFailure should be(right = true) } } private def xmlRoundTrip(message: BroadcastMessage): BroadcastMessage = { val roundTripped = BroadcastMessage.fromXml(message.toXml).get roundTripped should equal(message) message } } \ No newline at end of file diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/DeleteQueryRequest.scala b/commons/protocol/src/main/scala/net/shrine/protocol/DeleteQueryRequest.scala index da2d0c5d4..f5d383425 100644 --- a/commons/protocol/src/main/scala/net/shrine/protocol/DeleteQueryRequest.scala +++ b/commons/protocol/src/main/scala/net/shrine/protocol/DeleteQueryRequest.scala @@ -1,86 +1,85 @@ package net.shrine.protocol import scala.concurrent.duration.Duration import scala.util.Try import scala.xml.NodeSeq -import net.shrine.serialization.I2b2Unmarshaller import net.shrine.util.XmlUtil import net.shrine.util.NodeSeqEnrichments import net.shrine.serialization.I2b2UnmarshallingHelpers /** * @author Bill Simons - * @date 3/28/11 - * @link http://cbmi.med.harvard.edu - * @link http://chip.org + * @since 3/28/11 + * @see http://cbmi.med.harvard.edu + * @see http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source - * @link http://www.gnu.org/licenses/lgpl.html + * @see http://www.gnu.org/licenses/lgpl.html * * NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing */ final case class DeleteQueryRequest( - override val projectId: String, - override val waitTime: Duration, - override val authn: AuthenticationInfo, - val queryId: Long) extends ShrineRequest(projectId, waitTime, authn) with CrcRequest with TranslatableRequest[DeleteQueryRequest] with HandleableShrineRequest with HandleableI2b2Request { + override val projectId: String, + override val waitTime: Duration, + override val authn: AuthenticationInfo, + networkQueryId: Long) extends ShrineRequest(projectId, waitTime, authn) with CrcRequest with TranslatableRequest[DeleteQueryRequest] with HandleableShrineRequest with HandleableI2b2Request { override val requestType = RequestType.MasterDeleteRequest override def handle(handler: ShrineRequestHandler, shouldBroadcast: Boolean) = handler.deleteQuery(this, shouldBroadcast) - + override def handleI2b2(handler: I2b2RequestHandler, shouldBroadcast: Boolean) = handler.deleteQuery(this, shouldBroadcast) override def toXml: NodeSeq = XmlUtil.stripWhitespace { { headerFragment } - { queryId } + { networkQueryId } } - - def withId(id: Long) = this.copy(queryId = id) + + def withId(id: Long) = this.copy(networkQueryId = id) override def withAuthn(ai: AuthenticationInfo) = this.copy(authn = ai) override def withProject(proj: String) = this.copy(projectId = proj) protected override def i2b2MessageBody = XmlUtil.stripWhitespace { { i2b2PsmHeader } { authn.username } - { queryId } + { networkQueryId } } } object DeleteQueryRequest extends I2b2XmlUnmarshaller[DeleteQueryRequest] with ShrineXmlUnmarshaller[DeleteQueryRequest] with ShrineRequestUnmarshaller with I2b2UnmarshallingHelpers { override def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[DeleteQueryRequest] = { import NodeSeqEnrichments.Strictness._ for { projectId <- i2b2ProjectId(xml) waitTime <- i2b2WaitTime(xml) authn <- i2b2AuthenticationInfo(xml) masterId <- (xml withChild "message_body" withChild "request" withChild "query_master_id").map(_.text.toLong) } yield { DeleteQueryRequest(projectId, waitTime, authn, masterId) } } override def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[DeleteQueryRequest] = { import NodeSeqEnrichments.Strictness._ for { waitTime <- shrineWaitTime(xml) authn <- shrineAuthenticationInfo(xml) queryId <- xml.withChild("queryId").map(_.text.toLong) projectId <- shrineProjectId(xml) } yield { DeleteQueryRequest(projectId, waitTime, authn, queryId) } } } \ No newline at end of file diff --git a/commons/protocol/src/test/scala/net/shrine/protocol/DeleteQueryRequestTest.scala b/commons/protocol/src/test/scala/net/shrine/protocol/DeleteQueryRequestTest.scala index 668c0f1e1..3d64d57e3 100644 --- a/commons/protocol/src/test/scala/net/shrine/protocol/DeleteQueryRequestTest.scala +++ b/commons/protocol/src/test/scala/net/shrine/protocol/DeleteQueryRequestTest.scala @@ -1,85 +1,85 @@ package net.shrine.protocol import org.junit.Test import org.junit.Assert.assertTrue import xml.Utility import net.shrine.util.XmlUtil /** * @author Bill Simons * @date 3/28/11 * @link http://cbmi.med.harvard.edu * @link http://chip.org *

* NOTICE: This software comes with NO guarantees whatsoever and is * licensed as Lgpl Open Source * @link http://www.gnu.org/licenses/lgpl.html */ final class DeleteQueryRequestTest extends ShrineRequestValidator { val queryId = 2422297885846950097L override def messageBody = XmlUtil.stripWhitespace { { authn.username } 0 0 CRC_QRY_deleteQueryMaster { authn.username } { queryId } } val deleteQueryRequest = XmlUtil.stripWhitespace { { requestHeaderFragment } { queryId } } @Test override def testFromI2b2 { val translatedRequest = DeleteQueryRequest.fromI2b2(DefaultBreakdownResultOutputTypes.toSet)(request).get validateRequestWith(translatedRequest) { - translatedRequest.queryId should equal(queryId) + translatedRequest.networkQueryId should equal(queryId) } } @Test override def testShrineRequestFromI2b2 { assertTrue(CrcRequest.fromI2b2(DefaultBreakdownResultOutputTypes.toSet)(request).get.isInstanceOf[DeleteQueryRequest]) } @Test def testDoubleDispatchingShrineRequestFromI2b2 { assertTrue(HandleableShrineRequest.fromI2b2(DefaultBreakdownResultOutputTypes.toSet)(request).get.isInstanceOf[DeleteQueryRequest]) } @Test override def testToXml { DeleteQueryRequest(projectId, waitTime, authn, queryId).toXml should equal(deleteQueryRequest) } @Test def testToI2b2 { DeleteQueryRequest(projectId, waitTime, authn, queryId).toI2b2 should equal(request) } @Test def testFromXml { val actual = DeleteQueryRequest.fromXml(DefaultBreakdownResultOutputTypes.toSet)(deleteQueryRequest).get validateRequestWith(actual) { - actual.queryId should equal(queryId) + actual.networkQueryId should equal(queryId) } } @Test def testShrineRequestFromXml { ShrineRequest.fromXml(Set.empty)(deleteQueryRequest).get.isInstanceOf[DeleteQueryRequest] should be(true) } } \ No newline at end of file diff --git a/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala b/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala index aad2f697d..d6baf2068 100644 --- a/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala +++ b/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala @@ -1,90 +1,90 @@ package net.shrine.integration import net.shrine.protocol.{DeleteQueryRequest, DeleteQueryResponse, RequestType, Result} import net.shrine.util.ShouldMatchersForJUnit import org.junit.{After, Before, Test} /** * @author clint * @since Jan 8, 2014 * * An end-to-end JAX-RS test that fires up a Hub and two spokes, makes a query, * and verifies that the correct requests were broadcast to the spokes, and * that the correct responses were received and aggregated at the hub. * * DeleteQueryResponses are used because they have very few fields and are easy * to construct and verify. It might be nice in the future to use * RunQuery{Request,Response}, but that was more trouble than it was worth for * a first pass. * * NB: The hub runs on port 9997, and the two spokes run on ports 9998 and 9999. */ final class OneHubTwoSpokesJaxrsTest extends AbstractHubAndSpokesTest with ShouldMatchersForJUnit { thisTest => @Test def testBroadcastDeleteQueryShrine: Unit = { doTestBroadcastDeleteQuery(shrineHubComponent) } @Test def testBroadcastDeleteQueryI2b2: Unit = { doTestBroadcastDeleteQuery(i2b2HubComponent) } lazy val shrineHubComponent = Hubs.Shrine(thisTest, port = 9997) lazy val i2b2HubComponent = Hubs.I2b2(thisTest, port = 9996) private def doTestBroadcastDeleteQuery[H <: AnyRef](hubComponent: AbstractHubComponent[H]): Unit = { val masterId = 123456L val projectId = "some-project-id" val client = hubComponent.clientFor(projectId, networkAuthn) //Broadcast a message val resp = client.deleteQuery(masterId, true) //Make sure we got the right response resp.queryId should equal(masterId) //Make sure all the spokes received the right message spokes.foreach { spoke => val lastMessage = spoke.mockHandler.lastMessage.get lastMessage.networkAuthn.domain should equal(networkAuthn.domain) lastMessage.networkAuthn.username should equal(networkAuthn.username) val req = lastMessage.request.asInstanceOf[DeleteQueryRequest] - req.queryId should equal(masterId) + req.networkQueryId should equal(masterId) req.projectId should equal(projectId) req.requestType should equal(RequestType.MasterDeleteRequest) req.authn should equal(networkAuthn) } //Make sure we got the right responses at the hub val multiplexer = hubComponent.broadcaster.lastMultiplexer.get val expectedResponses = spokes.map { spoke => Result(spoke.nodeId, spoke.mockHandler.elapsed, DeleteQueryResponse(masterId)) }.toSet multiplexer.resultsSoFar.toSet should equal(expectedResponses) } @Before override def setUp() { super.setUp() shrineHubComponent.JerseyTest.setUp() i2b2HubComponent.JerseyTest.setUp() } @After override def tearDown() { shrineHubComponent.JerseyTest.tearDown() i2b2HubComponent.JerseyTest.tearDown() super.tearDown() } } \ No newline at end of file diff --git a/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala b/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala index 215b63c9b..54460a951 100644 --- a/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala +++ b/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala @@ -1,247 +1,247 @@ package net.shrine.integration import net.shrine.adapter.client.RemoteAdapterClient import net.shrine.adapter.service.JerseyTestComponent import net.shrine.broadcaster.{AdapterClientBroadcaster, NodeHandle, PosterBroadcasterClient} import net.shrine.broadcaster.service.{BroadcasterMultiplexerRequestHandler, BroadcasterMultiplexerResource, BroadcasterMultiplexerService} import net.shrine.protocol.query.{Constrained, Modifiers, Or, QueryDefinition, Term, ValueConstraint} import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, DeleteQueryResponse, FlagQueryRequest, FlagQueryResponse, NodeId, RequestType, Result, ResultOutputType, RunQueryRequest, RunQueryResponse, UnFlagQueryRequest, UnFlagQueryResponse} import net.shrine.util.ShouldMatchersForJUnit import org.junit.{After, Before, Test} /** * @author clint * @since Mar 6, 2014 */ final class OneQepOneHubTwoSpokesJaxrsTest extends AbstractHubAndSpokesTest with ShouldMatchersForJUnit { thisTest => @Test def testBroadcastDeleteQueryShrine(): Unit = doTestBroadcastDeleteQuery(shrineQueryEntryPointComponent) @Test def testBroadcastDeleteQueryI2b2(): Unit = doTestBroadcastDeleteQuery(i2b2QueryEntryPointComponent) @Test def testBroadcastFlagQueryShrine(): Unit = doTestBroadcastFlagQuery(shrineQueryEntryPointComponent) @Test def testBroadcastFlagQueryI2b2(): Unit = doTestBroadcastFlagQuery(i2b2QueryEntryPointComponent) @Test def testBroadcastUnFlagQueryShrine(): Unit = doTestBroadcastUnFlagQuery(shrineQueryEntryPointComponent) @Test def testBroadcastUnFlagQueryI2b2(): Unit = doTestBroadcastUnFlagQuery(i2b2QueryEntryPointComponent) @Test def testBroadcastRunQueryShrine(): Unit = doTestBroadcastRunQuery(shrineQueryEntryPointComponent) @Test def testBroadcastRunQueryI2b2(): Unit = doTestBroadcastRunQuery(i2b2QueryEntryPointComponent) private def doTestBroadcastDeleteQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = { val masterId = 123456L val projectId = "some-project-id" val client = queryEntryPointComponent.clientFor(projectId, networkAuthn) //Broadcast a message val resp = client.deleteQuery(masterId, true) //Make sure we got the right response resp.queryId should equal(masterId) //Make sure all the spokes received the right message spokes.foreach { spoke => val lastMessage = spoke.mockHandler.lastMessage.get lastMessage.networkAuthn.domain should equal(networkAuthn.domain) lastMessage.networkAuthn.username should equal(networkAuthn.username) val req = lastMessage.request.asInstanceOf[DeleteQueryRequest] - req.queryId should equal(masterId) + req.networkQueryId should equal(masterId) req.projectId should equal(projectId) req.requestType should equal(RequestType.MasterDeleteRequest) req.authn should equal(networkAuthn) } //Make sure we got the right responses at the hub val multiplexer = HubComponent.broadcaster.lastMultiplexer.get val expectedResponses = spokes.map { spoke => Result(spoke.nodeId, spoke.mockHandler.elapsed, DeleteQueryResponse(masterId)) }.toSet multiplexer.resultsSoFar.toSet should equal(expectedResponses) } private def doTestBroadcastRunQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = { val masterId = 123456L val projectId = "some-project-id" val client = queryEntryPointComponent.clientFor(projectId, networkAuthn) //Include a modified term, to ensure they're parsed properly val queryDefinition = QueryDefinition("foo", Or(Term("x"), Constrained(Term("y"), Modifiers("some-modifier", "ap", "k"), ValueConstraint("foo", Some("bar"), "baz", "nuh")))) //Broadcast a message val resp = client.runQuery("some-topic-id", Set(ResultOutputType.PATIENT_COUNT_XML), queryDefinition, true) resp.results.size should equal(spokes.size) //Make sure all the spokes received the right message spokes.foreach { spoke => val lastMessage = spoke.mockHandler.lastMessage.get lastMessage.networkAuthn.domain should equal(networkAuthn.domain) lastMessage.networkAuthn.username should equal(networkAuthn.username) val req = lastMessage.request.asInstanceOf[RunQueryRequest] req.projectId should equal(projectId) req.requestType should equal(RequestType.QueryDefinitionRequest) req.authn should equal(networkAuthn) req.queryDefinition should equal(queryDefinition) } //Make sure we got the right responses at the hub val multiplexer = HubComponent.broadcaster.lastMultiplexer.get multiplexer.resultsSoFar.collect { case Result(_, _, payload) => payload.getClass } should equal((1 to spokes.size).map(_ => classOf[RunQueryResponse])) val expectedResponders = spokes.map(_.nodeId).toSet multiplexer.resultsSoFar.map(_.origin).toSet should equal(expectedResponders) } private def doTestBroadcastFlagQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = { val networkQueryId = 123456L val projectId = "some-project-id" val client = queryEntryPointComponent.clientFor(projectId, networkAuthn) val message = "flag message" //Broadcast a message val resp = client.flagQuery(networkQueryId, Some(message), true) //Make sure we got the right response resp should be(FlagQueryResponse) //Make sure all the spokes received the right message spokes.foreach { spoke => val lastMessage = spoke.mockHandler.lastMessage.get lastMessage.networkAuthn.domain should equal(networkAuthn.domain) lastMessage.networkAuthn.username should equal(networkAuthn.username) val req = lastMessage.request.asInstanceOf[FlagQueryRequest] req.networkQueryId should equal(networkQueryId) req.projectId should equal(projectId) req.requestType should equal(RequestType.FlagQueryRequest) req.authn should equal(networkAuthn) req.message should be(Some(message)) } //Make sure we got the right responses at the hub val multiplexer = HubComponent.broadcaster.lastMultiplexer.get val expectedResponses = spokes.map { spoke => Result(spoke.nodeId, spoke.mockHandler.elapsed, FlagQueryResponse) }.toSet multiplexer.resultsSoFar.toSet should equal(expectedResponses) } private def doTestBroadcastUnFlagQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = { val networkQueryId = 123456L val projectId = "some-project-id" val client = queryEntryPointComponent.clientFor(projectId, networkAuthn) //Broadcast a message val resp = client.unFlagQuery(networkQueryId, true) //Make sure we got the right response resp should be(UnFlagQueryResponse) //Make sure all the spokes received the right message spokes.foreach { spoke => val lastMessage = spoke.mockHandler.lastMessage.get lastMessage.networkAuthn.domain should equal(networkAuthn.domain) lastMessage.networkAuthn.username should equal(networkAuthn.username) val req = lastMessage.request.asInstanceOf[UnFlagQueryRequest] req.networkQueryId should equal(networkQueryId) req.projectId should equal(projectId) req.requestType should equal(RequestType.UnFlagQueryRequest) req.authn should equal(networkAuthn) } //Make sure we got the right responses at the hub val multiplexer = HubComponent.broadcaster.lastMultiplexer.get val expectedResponses = spokes.map { spoke => Result(spoke.nodeId, spoke.mockHandler.elapsed, UnFlagQueryResponse) }.toSet multiplexer.resultsSoFar.toSet should equal(expectedResponses) } import scala.concurrent.duration._ lazy val i2b2QueryEntryPointComponent = Hubs.I2b2(thisTest, port = 9995, broadcasterClient = Some(PosterBroadcasterClient(posterFor(HubComponent), DefaultBreakdownResultOutputTypes.toSet))) lazy val shrineQueryEntryPointComponent = Hubs.Shrine(thisTest, port = 9996, broadcasterClient = Some(PosterBroadcasterClient(posterFor(HubComponent), DefaultBreakdownResultOutputTypes.toSet))) object HubComponent extends JerseyTestComponent[BroadcasterMultiplexerRequestHandler] { override val basePath = "broadcaster/broadcast" override val port = 9997 override def resourceClass(handler: BroadcasterMultiplexerRequestHandler) = BroadcasterMultiplexerResource(handler) lazy val broadcaster: InspectableDelegatingBroadcaster = { val destinations: Set[NodeHandle] = spokes.map { spoke => val client = RemoteAdapterClient(NodeId.Unknown,posterFor(spoke), DefaultBreakdownResultOutputTypes.toSet) NodeHandle(spoke.nodeId, client) } InspectableDelegatingBroadcaster(AdapterClientBroadcaster(destinations, MockHubDao)) } override lazy val makeHandler: BroadcasterMultiplexerRequestHandler = { BroadcasterMultiplexerService(broadcaster, 1.hour) } } @Before override def setUp() { super.setUp() HubComponent.JerseyTest.setUp() shrineQueryEntryPointComponent.JerseyTest.setUp() i2b2QueryEntryPointComponent.JerseyTest.setUp() } @After override def tearDown() { i2b2QueryEntryPointComponent.JerseyTest.tearDown() shrineQueryEntryPointComponent.JerseyTest.tearDown() HubComponent.JerseyTest.tearDown() super.tearDown() } } \ No newline at end of file diff --git a/integration/src/test/scala/net/shrine/integration/mockAdapterRequestHandlers.scala b/integration/src/test/scala/net/shrine/integration/mockAdapterRequestHandlers.scala index b46e3c67e..bdee2980e 100644 --- a/integration/src/test/scala/net/shrine/integration/mockAdapterRequestHandlers.scala +++ b/integration/src/test/scala/net/shrine/integration/mockAdapterRequestHandlers.scala @@ -1,84 +1,84 @@ package net.shrine.integration import net.shrine.adapter.service.AdapterRequestHandler import net.shrine.log.Loggable import net.shrine.protocol.NodeId import net.shrine.protocol.BroadcastMessage import net.shrine.protocol.Result import net.shrine.protocol.DeleteQueryRequest import net.shrine.protocol.DeleteQueryResponse import scala.concurrent.duration._ import scala.util.control.NoStackTrace import net.shrine.protocol.FlagQueryRequest import net.shrine.protocol.UnFlagQueryRequest import net.shrine.protocol.FlagQueryResponse import net.shrine.protocol.UnFlagQueryResponse import net.shrine.protocol.RunQueryRequest import net.shrine.protocol.RunQueryResponse import net.shrine.util.XmlDateHelper import net.shrine.protocol.QueryResult /** * @author clint * @date Dec 17, 2013 */ class MockAdapterRequestHandler(val nodeId: NodeId) extends AdapterRequestHandler with Loggable { val elapsed = 1.second private[this] val lock = new AnyRef @volatile private[this] var lastMessageOption: Option[BroadcastMessage] = None def lastMessage: Option[BroadcastMessage] = lock.synchronized(lastMessageOption) override def handleRequest(message: BroadcastMessage): Result = { lock.synchronized { lastMessageOption = Option(message) } message.request match { - case req: DeleteQueryRequest => Result(nodeId, elapsed, DeleteQueryResponse(req.queryId)) + case req: DeleteQueryRequest => Result(nodeId, elapsed, DeleteQueryResponse(req.networkQueryId)) case req: FlagQueryRequest => Result(nodeId, elapsed, FlagQueryResponse) case req: UnFlagQueryRequest => Result(nodeId, elapsed, UnFlagQueryResponse) case req: RunQueryRequest => Result(nodeId, elapsed, { val now = XmlDateHelper.now val queryResult = QueryResult(34782L, 2395723L, req.outputTypes.headOption, 123L, Some(now), Some(now), None, QueryResult.StatusType.Finished, None) RunQueryResponse(message.requestId, XmlDateHelper.now, req.authn.username, req.authn.domain, req.queryDefinition, 94587L, queryResult) }) case r => { error(s"Unexpected request: $r") ??? } } } } object MockAdapterRequestHandler extends MockAdapterRequestHandler(NodeId("MOCK")) /** * @author clint * @date Dec 17, 2013 */ object AlwaysThrowsAdapterRequestHandler extends AdapterRequestHandler { override def handleRequest(message: BroadcastMessage): Result = throw new Exception("blarg") with NoStackTrace } /** * @author clint * @date Dec 17, 2013 */ final case class TimesOutAdapterRequestHandler(howLong: Duration) extends AdapterRequestHandler { val nodeId = NodeId("TIMESOUT") override def handleRequest(message: BroadcastMessage): Result = message.request match { case req: DeleteQueryRequest => { Thread.sleep(howLong.toMillis) - Result(nodeId, 1.second, DeleteQueryResponse(req.queryId)) + Result(nodeId, 1.second, DeleteQueryResponse(req.networkQueryId)) } case _ => ??? } } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala index 5a7f29386..b54f4ba75 100644 --- a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala +++ b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala @@ -1,235 +1,236 @@ package net.shrine.qep import net.shrine.aggregation.{Aggregator, Aggregators, DeleteQueryAggregator, FlagQueryAggregator, ReadInstanceResultsAggregator, ReadQueryDefinitionAggregator, RenameQueryAggregator, RunQueryAggregator, UnFlagQueryAggregator} import net.shrine.authentication.{AuthenticationResult, Authenticator, NotAuthenticatedException} import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized} import net.shrine.authorization.QueryAuthorizationService import net.shrine.broadcaster.BroadcastAndAggregationService import net.shrine.log.Loggable import net.shrine.protocol.{QueryResult, AggregatedReadInstanceResultsResponse, AggregatedRunQueryResponse, AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, Credential, DeleteQueryRequest, FlagQueryRequest, QueryInstance, ReadApprovedQueryTopicsRequest, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, ReadQueryDefinitionRequest, ReadQueryInstancesRequest, ReadQueryInstancesResponse, ReadResultOutputTypesRequest, ReadResultOutputTypesResponse, RenameQueryRequest, ResultOutputType, RunQueryRequest, UnFlagQueryRequest} import net.shrine.qep.audit.QepAuditDb import net.shrine.qep.dao.AuditDao import net.shrine.qep.queries.QepQueryDb import net.shrine.util.XmlDateHelper import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration /** * @author clint * @since Feb 19, 2014 */ trait AbstractQepService[BaseResp <: BaseShrineResponse] extends Loggable { val commonName:String val auditDao: AuditDao val authenticator: Authenticator val authorizationService: QueryAuthorizationService val includeAggregateResult: Boolean val broadcastAndAggregationService: BroadcastAndAggregationService val queryTimeout: Duration val breakdownTypes: Set[ResultOutputType] val collectQepAudit:Boolean protected def doReadResultOutputTypes(request: ReadResultOutputTypesRequest): BaseResp = { info(s"doReadResultOutputTypes($request)") authenticateAndThen(request) { authResult => val resultOutputTypes = ResultOutputType.nonErrorTypes ++ breakdownTypes //TODO: XXX: HACK: Would like to remove the cast ReadResultOutputTypesResponse(resultOutputTypes).asInstanceOf[BaseResp] } } protected def doFlagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { QepQueryDb.db.insertQepQueryFlag(request) doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast) } protected def doUnFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = { QepQueryDb.db.insertQepQueryFlag(request) doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast) } protected def doRunQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor") //store the query in the qep's database doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast) } protected def doReadQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doReadQueryDefinition($request,$shouldBroadcast)") doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast) } protected def doReadInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doReadInstanceResults($request,$shouldBroadcast)") val networkId = request.shrineNetworkQueryId //read from the QEP database code here. Only broadcast if some result is in some sketchy state val resultsFromDb:Seq[QueryResult] = QepQueryDb.db.selectMostRecentQepResultsFor(networkId) //If any query result was pending if(resultsFromDb.nonEmpty && (!resultsFromDb.exists(!_.statusType.isDone))) { debug(s"Using qep cached results for query $networkId") AggregatedReadInstanceResultsResponse(networkId,resultsFromDb).asInstanceOf[BaseResp] } else { debug(s"Using qep cached results for query $networkId") val response = doBroadcastQuery(request, new ReadInstanceResultsAggregator(networkId, false), shouldBroadcast) //put the new results in the database if we got what we wanted response match { case arirr:AggregatedReadInstanceResultsResponse => arirr.results.foreach(r => QepQueryDb.db.insertQueryResult(networkId,r)) case _ => //do nothing } response } } protected def doReadQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseResp = { info(s"doReadQueryInstances($request,$shouldBroadcast)") authenticateAndThen(request) { authResult => val now = XmlDateHelper.now val networkQueryId = request.networkQueryId val username = request.authn.username val groupId = request.projectId //NB: Return a dummy response, with a dummy QueryInstance containing the network (Shrine) id of the query we'd like //to get "instances" for. This allows the legacy web client to formulate a request for query results that Shrine //can understand, while meeting the conversational requirements of the legacy web client. val instance = QueryInstance(networkQueryId.toString, networkQueryId.toString, username, groupId, now, now) //TODO: XXX: HACK: Would like to remove the cast //NB: Munge in username from authentication result ReadQueryInstancesResponse(networkQueryId, authResult.username, groupId, Seq(instance)).asInstanceOf[BaseResp] } } protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): ReadPreviousQueriesResponse = { info(s"doReadPreviousQueries($request,$shouldBroadcast)") //todo if any results are in one of the pending states go ahead and request them async (has to wait for async Shrine) //pull queries from the local database. QepQueryDb.db.selectPreviousQueries(request) } protected def doRenameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseResp = { - //todo handle with local/local info(s"doRenameQuery($request,$shouldBroadcast)") + QepQueryDb.db.renamePreviousQuery(request) + doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast) } protected def doDeleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseResp = { //todo handle with local/local info(s"doDeleteQuery($request,$shouldBroadcast)") doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast) } protected def doReadApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseResp = authenticateAndThen(request) { _ => info(s"doReadApprovedQueryTopics($request,$shouldBroadcast)") //TODO: XXX: HACK: Would like to remove the cast authorizationService.readApprovedEntries(request) match { case Left(errorResponse) => errorResponse.asInstanceOf[BaseResp] case Right(validResponse) => validResponse.asInstanceOf[BaseResp] } } import broadcastAndAggregationService.sendAndAggregate protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean): BaseResp = { authenticateAndThen(request) { authResult => debug(s"doBroadcastQuery($request) authResult is $authResult") //NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another //When making BroadcastMessages val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false)) //NB: Only audit RunQueryRequests request match { case runQueryRequest: RunQueryRequest => // inject modified, authorized runQueryRequest //although it might make more sense to put this whole if block in the aggregator, the RunQueryAggregator lives in the hub, far from this DB code auditAuthorizeAndThen(runQueryRequest) { authorizedRequest => debug(s"doBroadcastQuery authorizedRequest is $authorizedRequest") // tuck the ACT audit metrics data into a database here if (collectQepAudit) QepAuditDb.db.insertQepQuery(authorizedRequest,commonName) QepQueryDb.db.insertQepQuery(authorizedRequest) val response: BaseResp = doSynchronousQuery(networkAuthn,authorizedRequest,aggregator,shouldBroadcast) response match { //todo do in one transaction case aggregated:AggregatedRunQueryResponse => aggregated.results.foreach(QepQueryDb.db.insertQueryResult(runQueryRequest.networkQueryId,_)) case _ => debug(s"Unanticipated response type $response") } response } case _ => doSynchronousQuery(networkAuthn,request,aggregator,shouldBroadcast) } } } private def doSynchronousQuery(networkAuthn: AuthenticationInfo,request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean) = { info(s"doSynchronousQuery($request) started") val response = waitFor(sendAndAggregate(networkAuthn, request, aggregator, shouldBroadcast)).asInstanceOf[BaseResp] info(s"doSynchronousQuery($request) completed with response $response") response } private[qep] val runQueryAggregatorFor: RunQueryRequest => RunQueryAggregator = Aggregators.forRunQueryRequest(includeAggregateResult) protected def waitFor[R](futureResponse: Future[R]): R = { XmlDateHelper.time("Waiting for aggregated results")(debug(_)) { Await.result(futureResponse, queryTimeout) } } private[qep] def auditAuthorizeAndThen[T](request: RunQueryRequest)(body: (RunQueryRequest => T)): T = { auditTransactionally(request) { debug(s"auditAuthorizeAndThen($request) with $authorizationService") val authorizedRequest = authorizationService.authorizeRunQueryRequest(request) match { case na: NotAuthorized => throw na.toException case authorized: Authorized => request.copy(topicName = authorized.topicIdAndName.map(x => x._2)) } body(authorizedRequest) } } private[qep] def auditTransactionally[T](request: RunQueryRequest)(body: => T): T = { try { body } finally { auditDao.addAuditEntry( request.projectId, request.authn.domain, request.authn.username, request.queryDefinition.toI2b2String, //TODO: Use i2b2 format Still? request.topicId) } } import AuthenticationResult._ private[qep] def authenticateAndThen[T](request: BaseShrineRequest)(f: Authenticated => T): T = { val AuthenticationInfo(domain, username, _) = request.authn val authResult = authenticator.authenticate(request.authn) authResult match { case a: Authenticated => f(a) case NotAuthenticated(_, _, reason) => throw new NotAuthenticatedException(s"User $domain:$username could not be authenticated: $reason") } } } \ No newline at end of file diff --git a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala index 669cd742d..592a24be1 100644 --- a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala +++ b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala @@ -1,493 +1,525 @@ package net.shrine.qep.queries import java.sql.SQLException import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName} import net.shrine.log.Loggable import net.shrine.problem.ProblemDigest -import net.shrine.protocol.{I2b2ResultEnvelope, QueryResult, ResultOutputType, DefaultBreakdownResultOutputTypes, UnFlagQueryRequest, FlagQueryRequest, QueryMaster, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RunQueryRequest} +import net.shrine.protocol.{DeleteQueryRequest, RenameQueryRequest, I2b2ResultEnvelope, QueryResult, ResultOutputType, DefaultBreakdownResultOutputTypes, UnFlagQueryRequest, FlagQueryRequest, QueryMaster, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RunQueryRequest} import net.shrine.qep.QepConfigSource import net.shrine.slick.TestableDataSourceCreator import net.shrine.util.XmlDateHelper import slick.driver.JdbcProfile import scala.collection.immutable.Iterable import scala.concurrent.duration.{Duration, DurationInt} import scala.concurrent.{Await, Future, blocking} import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.xml.XML /** * DB code for the QEP's query instances and query results. * * @author david * @since 1/19/16 */ case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource) extends Loggable { import schemaDef._ import jdbcProfile.api._ val database = Database.forDataSource(dataSource) def createTables() = schemaDef.createTables(database) def dropTables() = schemaDef.dropTables(database) def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = { val future: Future[R] = database.run(action) blocking { Await.result(future, 10 seconds) } } def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = { debug(s"insertQepQuery $runQueryRequest") insertQepQuery(QepQuery(runQueryRequest)) } def insertQepQuery(qepQuery: QepQuery):Unit = { dbRun(allQepQueryQuery += qepQuery) } def selectAllQepQueries:Seq[QepQuery] = { - dbRun(allQepQueryQuery.result) + dbRun(mostRecentVisibleQepQueries.result) } //todo order def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = { val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain) val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set]) val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId))) ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2))) } //todo order def selectPreviousQueriesByUserAndDomain(userName: UserName,domain: String):Seq[QepQuery] = { - dbRun(allQepQueryQuery.filter(_.userName === userName).filter(_.userDomain === domain).result) + dbRun(mostRecentVisibleQepQueries.filter(_.userName === userName).filter(_.userDomain === domain).result) + } + + def renamePreviousQuery(request:RenameQueryRequest):Unit = { + + val networkQueryId = request.networkQueryId + dbRun( + for { + queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result + _ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName)) + } yield queryResults + ) + } + + def deletePreviousQuery(request:DeleteQueryRequest):Unit = { + + val networkQueryId = request.networkQueryId + dbRun( + for { + queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result + _ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true)) + } yield queryResults + ) } def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(flagQueryRequest)) } def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = { insertQepQueryFlag(QepQueryFlag(unflagQueryRequest)) } def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = { dbRun(allQepQueryFlags += qepQueryFlag) } def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = { val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result) flags.map(x => x.networkQueryId -> x).toMap } def insertQepResultRow(qepQueryRow:QueryResultRow) = { dbRun(allQueryResultRows += qepQueryRow) } def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = { val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node")) val queryResultRow = QueryResultRow(networkQueryId,result) val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_)) val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq] dbRun( for { _ <- allQueryResultRows += queryResultRow _ <- allBreakdownResultsRows ++= breakdowns _ <- allProblemDigestRows ++= problem } yield () ) } def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = { dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result) } def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = { val (queryResults, breakdowns,problems) = dbRun( for { queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result breakdowns <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result } yield (queryResults, breakdowns, problems) ) val resultIdsToI2b2ResultEnvelopes: Map[Long, Map[ResultOutputType, I2b2ResultEnvelope]] = breakdowns.groupBy(_.resultId).map(rIdToB => rIdToB._1 -> QepQueryBreakdownResultsRow.resultEnvelopesFrom(rIdToB._2)) def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = { if(problemSeq.size == 1) problemSeq.head.toProblemDigest else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq") } val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) ) queryResults.map(r => r.toQueryResult( resultIdsToI2b2ResultEnvelopes.getOrElse(r.resultId,Map.empty), adapterNodesToProblemDigests.get(r.adapterNode) )) } def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = { dbRun(allBreakdownResultsRows += breakdownResultsRow) } def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = { dbRun(allBreakdownResultsRows.result) } } object QepQueryDb extends Loggable { val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config) val db = QepQueryDb(QepQuerySchema.schema,dataSource) val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart") if(createTablesOnStart) QepQueryDb.db.createTables() } /** * Separate class to support schema generation without actually connecting to the database. * * @param jdbcProfile Database profile to use for the schema */ case class QepQuerySchema(jdbcProfile: JdbcProfile) extends Loggable { import jdbcProfile.api._ def ddlForAllTables: jdbcProfile.DDL = { allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema } //to get the schema, use the REPL //println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n")) def createTables(database:Database) = { try { val future = database.run(ddlForAllTables.create) Await.result(future,10 seconds) } catch { //I'd prefer to check and create schema only if absent. No way to do that with Oracle. case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x) } } def dropTables(database:Database) = { val future = database.run(ddlForAllTables.drop) //Really wait forever for the cleanup Await.result(future,Duration.Inf) } class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") { def networkId = column[NetworkQueryId]("networkId") def userName = column[UserName]("userName") def userDomain = column[String]("domain") def queryName = column[QueryName]("queryName") def expression = column[String]("expression") def dateCreated = column[Time]("dateCreated") + def deleted = column[Boolean]("deleted") def queryXml = column[String]("queryXml") + def changeDate = column[Long]("changeDate") - def * = (networkId,userName,userDomain,queryName,expression,dateCreated,queryXml) <> (QepQuery.tupled,QepQuery.unapply) + def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply) } val allQepQueryQuery = TableQuery[QepQueries] + val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for( + queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists + ) yield queries + val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false) class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") { def networkId = column[NetworkQueryId]("networkId") def flagged = column[Boolean]("flagged") def flagMessage = column[String]("flagMessage") def changeDate = column[Long]("changeDate") def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply) } val allQepQueryFlags = TableQuery[QepQueryFlags] val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for( queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists ) yield queryFlags //todo there may be other custom breakdowns in the config. Use that as the source val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap) implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({ (resultType: ResultOutputType) => queryResultTypesToString(resultType) },{ (string: String) => stringsToQueryResultTypes(string) }) implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({ statusType => statusType.name },{ name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}")) }) class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") { def resultId = column[Long]("resultId") def networkQueryId = column[NetworkQueryId]("networkQueryId") def instanceId = column[Long]("instanceId") def adapterNode = column[String]("adapterNode") def resultType = column[ResultOutputType]("resultType") def size = column[Long]("size") def startDate = column[Option[Long]]("startDate") def endDate = column[Option[Long]]("endDate") def status = column[QueryResult.StatusType]("status") def statusMessage = column[Option[String]]("statusMessage") def changeDate = column[Long]("changeDate") def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply) } val allQueryResultRows = TableQuery[QepQueryResults] //Most recent query result rows for each queryId from each adapter val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for( queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists ) yield queryResultRows class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def resultId = column[Long]("resultId") def resultType = column[ResultOutputType]("resultType") def dataKey = column[String]("dataKey") def value = column[Long]("value") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply) } val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults] //Most recent query result rows for each queryId from each adapter val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for( breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists ) yield breakdownResultsRows /* case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller { */ class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") { def networkQueryId = column[NetworkQueryId]("networkQueryId") def adapterNode = column[String]("adapterNode") def codec = column[String]("codec") def stamp = column[String]("stamp") def summary = column[String]("summary") def description = column[String]("description") def details = column[String]("details") def changeDate = column[Long]("changeDate") def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply) } val allProblemDigestRows = TableQuery[QepResultProblemDigests] val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for( problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists ) yield problemDigests } object QepQuerySchema { val allConfig:Config = QepConfigSource.config val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database") val slickProfileClassName = config.getString("slickProfileClassName") val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName) val schema = QepQuerySchema(slickProfile) } case class QepQuery( networkId:NetworkQueryId, userName: UserName, userDomain: String, queryName: QueryName, expression: String, dateCreated: Time, - queryXml:String + deleted: Boolean, + queryXml: String, + changeDate: Time ){ def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = { QueryMaster( queryMasterId = networkId.toString, networkQueryId = networkId, name = queryName, userId = userName, groupId = userDomain, createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated), held = None, //todo if a query is held at the adapter, how will we know? do we care? Question out to Bill and leadership flagged = qepQueryFlag.map(_.flagged), flagMessage = qepQueryFlag.map(_.flagMessage) ) } } -object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,String,Time,String) => QepQuery) { +object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,String,Time,Boolean,String,Time) => QepQuery) { def apply(runQueryRequest: RunQueryRequest):QepQuery = { new QepQuery( networkId = runQueryRequest.networkQueryId, userName = runQueryRequest.authn.username, userDomain = runQueryRequest.authn.domain, queryName = runQueryRequest.queryDefinition.name, expression = runQueryRequest.queryDefinition.expr.getOrElse("No Expression").toString, dateCreated = System.currentTimeMillis(), - queryXml = runQueryRequest.toXmlString + deleted = false, + queryXml = runQueryRequest.toXmlString, + changeDate = System.currentTimeMillis() ) } } case class QepQueryFlag( networkQueryId: NetworkQueryId, flagged:Boolean, flagMessage:String, changeDate:Long ) object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) { def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = flagQueryRequest.networkQueryId, flagged = true, flagMessage = flagQueryRequest.message.getOrElse(""), changeDate = System.currentTimeMillis() ) } def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = { QepQueryFlag( networkQueryId = unflagQueryRequest.networkQueryId, flagged = false, flagMessage = "", changeDate = System.currentTimeMillis() ) } } /* //todo problemDigest in a separate table problemDigest: Option[ProblemDigest] = None, //todo breakdowns in a separate table breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty */ case class QueryResultRow( resultId:Long, networkQueryId:NetworkQueryId, instanceId:Long, adapterNode:String, resultType:ResultOutputType, size:Long, startDate:Option[Long], endDate:Option[Long], status:QueryResult.StatusType, statusMessage:Option[String], changeDate:Long ) { def toQueryResult(breakdowns:Map[ResultOutputType,I2b2ResultEnvelope],problemDigest:Option[ProblemDigest]) = QueryResult( resultId = resultId, instanceId = instanceId, resultType = Some(resultType), setSize = size, startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar), endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar), description = Some(adapterNode), statusType = status, statusMessage = statusMessage, breakdowns = breakdowns, problemDigest = problemDigest ) } object QueryResultRow extends ((Long,NetworkQueryId,Long,String,ResultOutputType,Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow) { def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = { new QueryResultRow( resultId = result.resultId, networkQueryId = networkQueryId, instanceId = result.instanceId, adapterNode = result.description.getOrElse(s"$result has None in its description field, not a name of an adapter node."), resultType = result.resultType.getOrElse(ResultOutputType.PATIENT_COUNT_XML), //todo how is this optional?? size = result.setSize, startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis), endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis), status = result.statusType, statusMessage = result.statusMessage, changeDate = System.currentTimeMillis() ) } } case class QepQueryBreakdownResultsRow( networkQueryId: NetworkQueryId, adapterNode:String, resultId:Long, resultType: ResultOutputType, dataKey:String, value:Long, changeDate:Long ) object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){ def breakdownRowsFor(networkQueryId:NetworkQueryId, adapterNode:String, resultId:Long, breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = { breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis())) } def resultEnvelopesFrom(breakdowns:Seq[QepQueryBreakdownResultsRow]): Map[ResultOutputType, I2b2ResultEnvelope] = { def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = { val data = breakdowns.map(b => b.dataKey -> b.value).toMap I2b2ResultEnvelope(resultType,data) } breakdowns.groupBy(_.resultType).map(r => r._1 -> resultEnvelopeFrom(r._1,r._2)) } } case class QepProblemDigestRow( networkQueryId: NetworkQueryId, adapterNode: String, codec: String, stampText: String, summary: String, description: String, details: String, changeDate:Long ){ def toProblemDigest = { ProblemDigest( codec, stampText, summary, description, if(!details.isEmpty) XML.loadString(details) else

) } } diff --git a/qep/service/src/main/sql/mysql.ddl b/qep/service/src/main/sql/mysql.ddl index 0fe5952e3..63d4a1a29 100644 --- a/qep/service/src/main/sql/mysql.ddl +++ b/qep/service/src/main/sql/mysql.ddl @@ -1,6 +1,6 @@ create table `queriesSent` (`shrineNodeId` TEXT NOT NULL,`userName` TEXT NOT NULL,`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`queryTopicId` TEXT,`queryTopicName` TEXT,`timeQuerySent` BIGINT NOT NULL); -create table `previousQueries` (`networkId` BIGINT NOT NULL,`userName` TEXT NOT NULL,`domain` TEXT NOT NULL,`queryName` TEXT NOT NULL,`expression` TEXT NOT NULL,`dateCreated` BIGINT NOT NULL,`queryXml` TEXT NOT NULL); +create table `previousQueries` (`networkId` BIGINT NOT NULL,`userName` TEXT NOT NULL,`domain` TEXT NOT NULL,`queryName` TEXT NOT NULL,`expression` TEXT NOT NULL,`dateCreated` BIGINT NOT NULL,`deleted` BOOLEAN NOT NULL,`queryXml` TEXT NOT NULL,`changeDate` BIGINT NOT NULL); create table `queryFlags` (`networkId` BIGINT NOT NULL,`flagged` BOOLEAN NOT NULL,`flagMessage` TEXT NOT NULL,`changeDate` BIGINT NOT NULL); create table `queryResults` (`resultId` BIGINT NOT NULL,`networkQueryId` BIGINT NOT NULL,`instanceId` BIGINT NOT NULL,`adapterNode` TEXT NOT NULL,`resultType` TEXT NOT NULL,`size` BIGINT NOT NULL,`startDate` BIGINT,`endDate` BIGINT,`status` TEXT NOT NULL,`statusMessage` TEXT,`changeDate` BIGINT NOT NULL); create table `queryBreakdownResults` (`networkQueryId` BIGINT NOT NULL,`adapterNode` TEXT NOT NULL,`resultId` BIGINT NOT NULL,`resultType` TEXT NOT NULL,`dataKey` TEXT NOT NULL,`value` BIGINT NOT NULL,`changeDate` BIGINT NOT NULL); create table `queryResultProblemDigests` (`networkQueryId` BIGINT NOT NULL,`adapterNode` TEXT NOT NULL,`codec` TEXT NOT NULL,`stamp` TEXT NOT NULL,`summary` TEXT NOT NULL,`description` TEXT NOT NULL,`details` TEXT NOT NULL,`changeDate` BIGINT NOT NULL); \ No newline at end of file diff --git a/qep/service/src/test/scala/net/shrine/qep/ShrineResourceJaxrsTest.scala b/qep/service/src/test/scala/net/shrine/qep/ShrineResourceJaxrsTest.scala index 34bb77c29..fe69f94f1 100644 --- a/qep/service/src/test/scala/net/shrine/qep/ShrineResourceJaxrsTest.scala +++ b/qep/service/src/test/scala/net/shrine/qep/ShrineResourceJaxrsTest.scala @@ -1,535 +1,535 @@ package net.shrine.qep import com.sun.jersey.api.client.UniformInterfaceException import net.shrine.client.JerseyShrineClient import net.shrine.crypto.TrustParam.AcceptAllCerts import net.shrine.protocol.{AggregatedReadInstanceResultsResponse, AggregatedReadQueryResultResponse, AggregatedReadTranslatedQueryDefinitionResponse, AggregatedRunQueryResponse, ApprovedTopic, AuthenticationInfo, BaseShrineResponse, Credential, DefaultBreakdownResultOutputTypes, DeleteQueryRequest, DeleteQueryResponse, FlagQueryRequest, FlagQueryResponse, QueryResult, ReadApprovedQueryTopicsRequest, ReadApprovedQueryTopicsResponse, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, ReadQueryDefinitionRequest, ReadQueryDefinitionResponse, ReadQueryInstancesRequest, ReadQueryInstancesResponse, ReadQueryResultRequest, ReadTranslatedQueryDefinitionRequest, RenameQueryRequest, RenameQueryResponse, RequestType, ResultOutputType, RunQueryRequest, ShrineRequest, ShrineRequestHandler, UnFlagQueryRequest, UnFlagQueryResponse} import net.shrine.protocol.query.{QueryDefinition, Term} import net.shrine.util.{AbstractPortSearchingJerseyTest, JerseyAppDescriptor, ShouldMatchersForJUnit, XmlDateHelper} import org.junit.{After, Before, Test} /** * * @author Clint Gilbert * @since Sep 14, 2011 * * @see http://cbmi.med.harvard.edu * * This software is licensed under the LGPL * @see http://www.gnu.org/licenses/lgpl.html * * Starts a ShrineResource in an embedded HTTP server, sends requests to it, then verifies that the requests don't fail, * and that the parameters made it from the client to the ShrineResource successfully. Uses a mock ShrineRequestHandler, so * it doesn't test that correct values are returned by the ShrineResource. */ final class ShrineResourceJaxrsTest extends AbstractPortSearchingJerseyTest with ShouldMatchersForJUnit { private val projectId = "some-project-id" private val topicId = "some-topic-id" private val userId = "some-user-id" private val authenticationInfo = AuthenticationInfo("some-domain", userId, new Credential("some-val", false)) private val shrineClient = new JerseyShrineClient(resource.getURI.toString, projectId, authenticationInfo, DefaultBreakdownResultOutputTypes.toSet, AcceptAllCerts) /** * We invoked the no-arg superclass constructor, so we must override configure() to provide an AppDescriptor * That tells Jersey to instantiate and expose ShrineResource */ override def configure = JerseyAppDescriptor.thatCreates(ShrineResource).using(MockShrineRequestHandler) @Before override def setUp(): Unit = super.setUp() @After override def tearDown(): Unit = super.tearDown() @Test def testReadApprovedQueryTopics { val response = shrineClient.readApprovedQueryTopics(userId) response should not(be(null)) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.readQueryDefinitionParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) val param = MockShrineRequestHandler.readApprovedQueryTopicsParam validateCachedParam(param, RequestType.SheriffRequest) param.userId should equal(userId) } @Test def testReadPreviousQueries = resetMockThen { val fetchSize = 123 val response = shrineClient.readPreviousQueries(userId, fetchSize) response should not(be(null)) MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.readQueryDefinitionParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) val param = MockShrineRequestHandler.readPreviousQueriesParam validateCachedParam(param, RequestType.UserRequest) param.fetchSize should equal(fetchSize) param.userId should equal(userId) } @Test def testReadPreviousQueriesUsernameMismatch = resetMockThen { intercept[UniformInterfaceException] { shrineClient.readPreviousQueries("foo", 123) } MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.readQueryDefinitionParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) } @Test def testRunQuery = resetMockThen { val queryDef = QueryDefinition("foo", Term("nuh")) def doTestRunQueryResponse(response: AggregatedRunQueryResponse, expectedOutputTypes: Set[ResultOutputType]) { response should not(be(null)) MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.readQueryDefinitionParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) val param = MockShrineRequestHandler.runQueryParam validateCachedParam(param, RequestType.QueryDefinitionRequest) param.outputTypes should equal(expectedOutputTypes) param.queryDefinition should equal(queryDef) param.topicId should equal(Some(topicId)) } def doTestRunQuery(outputTypes: Set[ResultOutputType]) { val responseScalaSet = shrineClient.runQuery(topicId, outputTypes, queryDef) doTestRunQueryResponse(responseScalaSet, outputTypes) val responseJavaSet = shrineClient.runQuery(topicId, outputTypes, queryDef) doTestRunQueryResponse(responseJavaSet, outputTypes) } Seq(ResultOutputType.values.toSet, Set(ResultOutputType.PATIENT_COUNT_XML), Set(ResultOutputType.PATIENTSET), Set.empty[ResultOutputType]).foreach(doTestRunQuery) } @Test def testReadQueryInstances = resetMockThen { val queryId = 123L val response = shrineClient.readQueryInstances(queryId) response should not(be(null)) MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.readQueryDefinitionParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) val param = MockShrineRequestHandler.readQueryInstancesParam validateCachedParam(param, RequestType.MasterRequest) param.networkQueryId should equal(queryId) } @Test def testReadInstanceResults = resetMockThen { val shrineNetworkQueryId = 98765L val response = shrineClient.readInstanceResults(shrineNetworkQueryId) response should not(be(null)) MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readQueryDefinitionParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) val param = MockShrineRequestHandler.readInstanceResultsParam validateCachedParam(param, RequestType.InstanceRequest) param.shrineNetworkQueryId should equal(shrineNetworkQueryId) } @Test def testReadQueryDefinition = resetMockThen { val queryId = 3789894L val response = shrineClient.readQueryDefinition(queryId) response should not(be(null)) MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) val param = MockShrineRequestHandler.readQueryDefinitionParam validateCachedParam(param, RequestType.GetRequestXml) param.queryId should equal(queryId) } @Test def testDeleteQuery = resetMockThen { val queryId = 3789894L val response = shrineClient.deleteQuery(queryId) response should not(be(null)) MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) val param = MockShrineRequestHandler.deleteQueryParam validateCachedParam(param, RequestType.MasterDeleteRequest) - param.queryId should equal(queryId) + param.networkQueryId should equal(queryId) } @Test def testRenameQuery = resetMockThen { val queryId = 3789894L val queryName = "aslkfhkasfh" val response = shrineClient.renameQuery(queryId, queryName) response should not(be(null)) MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) val param = MockShrineRequestHandler.renameQueryParam validateCachedParam(param, RequestType.MasterRenameRequest) param.networkQueryId should equal(queryId) param.queryName should equal(queryName) } @Test def testReadQueryResult = resetMockThen { val queryId = 3789894L val response = shrineClient.readQueryResult(queryId) response should not(be(null)) MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) val param = MockShrineRequestHandler.readQueryResultParam MockShrineRequestHandler.shouldBroadcastParam should be(true) param should not(be(null)) param.projectId should equal(projectId) param.authn should equal(authenticationInfo) param.requestType should equal(RequestType.GetQueryResult) param.waitTime should equal(ShrineResource.waitTime) param.queryId should equal(queryId) } @Test def testReadTranslatedQueryDefinition = resetMockThen { val queryDef = QueryDefinition("foo", Term("network")) val response = shrineClient.readTranslatedQueryDefinition(queryDef) response should not(be(null)) MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) val param = MockShrineRequestHandler.readTranslatedQueryDefinitionParam MockShrineRequestHandler.shouldBroadcastParam should be(true) param should not(be(null)) param.authn should equal(authenticationInfo) param.requestType should equal(RequestType.ReadTranslatedQueryDefinitionRequest) param.queryDef should equal(queryDef) } @Test def testFlagQuery: Unit = resetMockThen { val queryId = 12345L val message = "laskfhdklsjfhksdf" val response = shrineClient.flagQuery(queryId, Some(message), true) response should equal(FlagQueryResponse) MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) MockShrineRequestHandler.readTranslatedQueryDefinitionParam should be(null) val param = MockShrineRequestHandler.flagQueryRequestParam MockShrineRequestHandler.shouldBroadcastParam should be(true) param should not(be(null)) param.authn should equal(authenticationInfo) param.requestType should equal(RequestType.FlagQueryRequest) param.networkQueryId should equal(queryId) param.message should equal(Some(message)) } @Test def testUnFlagQuery: Unit = resetMockThen { val queryId = 12345L val response = shrineClient.unFlagQuery(queryId, true) response should equal(UnFlagQueryResponse) MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) MockShrineRequestHandler.readTranslatedQueryDefinitionParam should be(null) MockShrineRequestHandler.flagQueryRequestParam should be(null) val param = MockShrineRequestHandler.unFlagQueryRequestParam MockShrineRequestHandler.shouldBroadcastParam should be(true) param should not(be(null)) param.authn should equal(authenticationInfo) param.requestType should equal(RequestType.UnFlagQueryRequest) param.networkQueryId should equal(queryId) } private def validateCachedParam(param: ShrineRequest, expectedRequestType: RequestType) { MockShrineRequestHandler.shouldBroadcastParam should be(true) param should not(be(null)) param.projectId should equal(projectId) param.authn should equal(authenticationInfo) param.requestType should equal(expectedRequestType) param.waitTime should equal(ShrineResource.waitTime) } private def resetMockThen(body: => Any) { MockShrineRequestHandler.reset() //(Healthy?) paranoia MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null) MockShrineRequestHandler.readPreviousQueriesParam should be(null) MockShrineRequestHandler.runQueryParam should be(null) MockShrineRequestHandler.readQueryInstancesParam should be(null) MockShrineRequestHandler.readInstanceResultsParam should be(null) MockShrineRequestHandler.readQueryDefinitionParam should be(null) MockShrineRequestHandler.deleteQueryParam should be(null) MockShrineRequestHandler.renameQueryParam should be(null) MockShrineRequestHandler.readQueryResultParam should be(null) MockShrineRequestHandler.flagQueryRequestParam should be(null) MockShrineRequestHandler.unFlagQueryRequestParam should be(null) MockShrineRequestHandler.readTranslatedQueryDefinitionParam should be(null) body } /** * Mock ShrineRequestHandler; stores passed parameters for later inspection. * Private, since this is (basically) the enclosing test class's state */ private object MockShrineRequestHandler extends ShrineRequestHandler { var shouldBroadcastParam = false var readApprovedQueryTopicsParam: ReadApprovedQueryTopicsRequest = _ var readPreviousQueriesParam: ReadPreviousQueriesRequest = _ var runQueryParam: RunQueryRequest = _ var readQueryInstancesParam: ReadQueryInstancesRequest = _ var readInstanceResultsParam: ReadInstanceResultsRequest = _ var readQueryDefinitionParam: ReadQueryDefinitionRequest = _ var deleteQueryParam: DeleteQueryRequest = _ var renameQueryParam: RenameQueryRequest = _ var readQueryResultParam: ReadQueryResultRequest = _ var readTranslatedQueryDefinitionParam: ReadTranslatedQueryDefinitionRequest = _ var flagQueryRequestParam: FlagQueryRequest = _ var unFlagQueryRequestParam: UnFlagQueryRequest = _ def reset() { shouldBroadcastParam = false readApprovedQueryTopicsParam = null readPreviousQueriesParam = null runQueryParam = null readQueryInstancesParam = null readInstanceResultsParam = null readQueryDefinitionParam = null deleteQueryParam = null renameQueryParam = null readQueryResultParam = null readTranslatedQueryDefinitionParam = null flagQueryRequestParam = null unFlagQueryRequestParam = null } import XmlDateHelper.now private def setShouldBroadcastAndThen(shouldBroadcast: Boolean)(f: => BaseShrineResponse): BaseShrineResponse = { try { f } finally { shouldBroadcastParam = shouldBroadcast } } override def readApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { readApprovedQueryTopicsParam = request ReadApprovedQueryTopicsResponse(Seq(new ApprovedTopic(123L, "some topic"))) } override def readPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { readPreviousQueriesParam = request ReadPreviousQueriesResponse(Seq.empty) } override def readQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { readQueryInstancesParam = request ReadQueryInstancesResponse(999L, "userId", "groupId", Seq.empty) } override def readInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { readInstanceResultsParam = request AggregatedReadInstanceResultsResponse(1337L, Seq(new QueryResult(123L, 1337L, Some(ResultOutputType.PATIENT_COUNT_XML), 789L, None, None, Some("description"), QueryResult.StatusType.Finished, Some("statusMessage")))) } override def readQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { readQueryDefinitionParam = request ReadQueryDefinitionResponse(87456L, "name", "userId", now, "") } override def runQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { runQueryParam = request AggregatedRunQueryResponse(123L, now, "userId", "groupId", request.queryDefinition, 456L, Seq.empty) } override def deleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { deleteQueryParam = request DeleteQueryResponse(56834756L) } override def renameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { renameQueryParam = request RenameQueryResponse(873468L, "some-name") } override def readQueryResult(request: ReadQueryResultRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { readQueryResultParam = request AggregatedReadQueryResultResponse(1234567890L, Seq.empty) } override def readTranslatedQueryDefinition(request: ReadTranslatedQueryDefinitionRequest, shouldBroadcast: Boolean = true): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { readTranslatedQueryDefinitionParam = request AggregatedReadTranslatedQueryDefinitionResponse(Seq.empty) } override def flagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { flagQueryRequestParam = request FlagQueryResponse } override def unFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) { unFlagQueryRequestParam = request UnFlagQueryResponse } } } \ No newline at end of file diff --git a/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala b/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala index a0e76c7db..a6807a091 100644 --- a/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala +++ b/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala @@ -1,233 +1,237 @@ package net.shrine.qep.queries import net.shrine.protocol.QueryResult.StatusType import net.shrine.protocol.{I2b2ResultEnvelope, DefaultBreakdownResultOutputTypes, QueryResult, ResultOutputType} import net.shrine.util.{XmlDateHelper, ShouldMatchersForJUnit} import org.junit.{After, Before, Test} import net.shrine.problem.TestProblem /** * @author david * @since 1/20/16 */ class QepQueryDbTest extends ShouldMatchersForJUnit { val qepQuery = QepQuery( networkId = 1L, userName = "ben", userDomain = "testDomain", queryName = "testQuery", expression = "testExpression", dateCreated = System.currentTimeMillis(), - queryXml = "testXML" + deleted = false, + queryXml = "testXML", + changeDate = System.currentTimeMillis() ) val secondQepQuery = QepQuery( networkId = 2L, userName = "dave", userDomain = "testDomain", queryName = "testQuery", expression = "testExpression", + deleted = false, dateCreated = System.currentTimeMillis(), - queryXml = "testXML" + queryXml = "testXML", + changeDate = System.currentTimeMillis() ) val flag = QepQueryFlag( networkQueryId = 1L, flagged = true, flagMessage = "This query is flagged", changeDate = System.currentTimeMillis() ) @Test def testInsertQepQuery() { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepQuery(secondQepQuery) val results = QepQueryDb.db.selectAllQepQueries results should equal(Seq(qepQuery,secondQepQuery)) } @Test def testSelectQepQueriesForUser() { QepQueryDb.db.insertQepQuery(qepQuery) QepQueryDb.db.insertQepQuery(secondQepQuery) val results = QepQueryDb.db.selectPreviousQueriesByUserAndDomain("ben","testDomain") results should equal(Seq(qepQuery)) } @Test def testSelectQueryFlags() { val results1 = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(Set(1L,2L)) results1 should equal(Map.empty) QepQueryDb.db.insertQepQueryFlag(flag) val results2 = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(Set(1L,2L)) results2 should equal(Map(1L -> flag)) } val qepResultRowFromExampleCom = QueryResultRow( resultId = 10L, networkQueryId = 1L, instanceId = 100L, adapterNode = "example.com", resultType = ResultOutputType.PATIENT_COUNT_XML, size = 30L, startDate = Some(System.currentTimeMillis() - 60), endDate = Some(System.currentTimeMillis() - 30), status = QueryResult.StatusType.Finished, statusMessage = None, changeDate = System.currentTimeMillis() ) @Test def testInsertQueryResultRow() { QepQueryDb.db.insertQepResultRow(qepResultRowFromExampleCom) val results = QepQueryDb.db.selectMostRecentQepResultRowsFor(1L) results should equal(Seq(qepResultRowFromExampleCom)) } val queryResult = QueryResult( resultId = 20L, instanceId = 200L, resultType = Some(ResultOutputType.PATIENT_COUNT_XML), setSize = 2000L, startDate = Some(XmlDateHelper.now), endDate = Some(XmlDateHelper.now), description = Some("example.com"), statusType = StatusType.Finished, statusMessage = None ) @Test def testInsertQueryResult(): Unit = { QepQueryDb.db.insertQueryResult(2L,queryResult) val results = QepQueryDb.db.selectMostRecentQepResultsFor(2L) results should equal(Seq(queryResult)) } val qepResultRowFromExampleComInThePast = QueryResultRow( resultId = 8L, networkQueryId = 1L, instanceId = 100L, adapterNode = "example.com", resultType = ResultOutputType.PATIENT_COUNT_XML, size = 0L, startDate = qepResultRowFromExampleCom.startDate, endDate = None, status = QueryResult.StatusType.Processing, statusMessage = None, changeDate = qepResultRowFromExampleCom.changeDate - 40 ) val qepResultRowFromGeneralHospital = QueryResultRow( resultId = 100L, networkQueryId = 1L, instanceId = 100L, adapterNode = "generalhospital.org", resultType = ResultOutputType.PATIENT_COUNT_XML, size = 100L, startDate = Some(System.currentTimeMillis() - 60), endDate = Some(System.currentTimeMillis() - 30), status = QueryResult.StatusType.Finished, statusMessage = None, changeDate = System.currentTimeMillis() ) @Test def testGetMostRecentResultRows() { QepQueryDb.db.insertQepResultRow(qepResultRowFromExampleComInThePast) QepQueryDb.db.insertQepResultRow(qepResultRowFromGeneralHospital) QepQueryDb.db.insertQepResultRow(qepResultRowFromExampleCom) val results = QepQueryDb.db.selectMostRecentQepResultRowsFor(1L) results.to[Set] should equal(Set(qepResultRowFromExampleCom,qepResultRowFromGeneralHospital)) } val maleRow = QepQueryBreakdownResultsRow( networkQueryId = 1L, adapterNode = "example.com", resultId = 100L, resultType = DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML, dataKey = "male", value = 388, changeDate = System.currentTimeMillis() ) val femaleRow = QepQueryBreakdownResultsRow( networkQueryId = 1L, adapterNode = "example.com", resultId = 100L, resultType = DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML, dataKey = "female", value = 390, changeDate = System.currentTimeMillis() ) val unknownRow = QepQueryBreakdownResultsRow( networkQueryId = 1L, adapterNode = "example.com", resultId = 100L, resultType = DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML, dataKey = "unknown", value = 4, changeDate = System.currentTimeMillis() ) @Test def testInsertBreakdownRows(): Unit = { QepQueryDb.db.insertQueryBreakdown(maleRow) QepQueryDb.db.insertQueryBreakdown(femaleRow) QepQueryDb.db.insertQueryBreakdown(unknownRow) val results = QepQueryDb.db.selectAllBreakdownResultsRows results.to[Set] should equal(Set(maleRow,femaleRow,unknownRow)) } val breakdowns = Map(DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML,Map("male" -> 3000,"female" -> 4000,"unknown" -> 234))) @Test def testInsertQueryResultWithBreakdowns(): Unit = { val queryResultWithBreakdowns = queryResult.copy(breakdowns = breakdowns) QepQueryDb.db.insertQueryResult(2L,queryResultWithBreakdowns) val results = QepQueryDb.db.selectMostRecentQepResultsFor(2L) results should equal(Seq(queryResultWithBreakdowns)) } @Test def testInsertQueryResultWithProblem(): Unit = { val queryResultWithProblem = queryResult.copy(statusType = StatusType.Error,problemDigest = Some(TestProblem.toDigest)) QepQueryDb.db.insertQueryResult(2L,queryResultWithProblem) val results = QepQueryDb.db.selectMostRecentQepResultsFor(2L) results should equal(Seq(queryResultWithProblem)) } @Before def beforeEach() = { QepQueryDb.db.createTables() } @After def afterEach() = { QepQueryDb.db.dropTables() } }