diff --git a/adapter/adapter-service/src/main/scala/net/shrine/adapter/DeleteQueryAdapter.scala b/adapter/adapter-service/src/main/scala/net/shrine/adapter/DeleteQueryAdapter.scala
index 6acb1d44c..f029f838b 100644
--- a/adapter/adapter-service/src/main/scala/net/shrine/adapter/DeleteQueryAdapter.scala
+++ b/adapter/adapter-service/src/main/scala/net/shrine/adapter/DeleteQueryAdapter.scala
@@ -1,27 +1,27 @@
package net.shrine.adapter
import net.shrine.adapter.dao.AdapterDao
import net.shrine.protocol.BroadcastMessage
import net.shrine.protocol.DeleteQueryRequest
import net.shrine.protocol.DeleteQueryResponse
import net.shrine.protocol.ShrineResponse
/**
* @author Bill Simons
* @date 4/12/11
* @link http://cbmi.med.harvard.edu
* @link http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @link http://www.gnu.org/licenses/lgpl.html
*/
final class DeleteQueryAdapter(dao: AdapterDao) extends Adapter {
override protected[adapter] def processRequest(message: BroadcastMessage): ShrineResponse = {
val request = message.request.asInstanceOf[DeleteQueryRequest]
- dao.deleteQuery(request.queryId)
+ dao.deleteQuery(request.networkQueryId)
- DeleteQueryResponse(request.queryId)
+ DeleteQueryResponse(request.networkQueryId)
}
}
\ No newline at end of file
diff --git a/commons/crypto/src/test/scala/net/shrine/crypto/DefaultSignerVerifierTest.scala b/commons/crypto/src/test/scala/net/shrine/crypto/DefaultSignerVerifierTest.scala
index 013681ab7..0d2fc4df3 100644
--- a/commons/crypto/src/test/scala/net/shrine/crypto/DefaultSignerVerifierTest.scala
+++ b/commons/crypto/src/test/scala/net/shrine/crypto/DefaultSignerVerifierTest.scala
@@ -1,535 +1,535 @@
package net.shrine.crypto
import net.shrine.util.{Base64, ShouldMatchersForJUnit, XmlGcEnrichments}
import org.junit.Test
import net.shrine.protocol.BroadcastMessage
import net.shrine.protocol.AuthenticationInfo
import net.shrine.protocol.Credential
import net.shrine.protocol.DeleteQueryRequest
import net.shrine.protocol.RunQueryRequest
import net.shrine.protocol.ResultOutputType
import net.shrine.protocol.query.QueryDefinition
import net.shrine.protocol.query.Term
import net.shrine.protocol.query.Modifiers
import net.shrine.protocol.query.Or
import net.shrine.protocol.ReadQueryResultRequest
import net.shrine.protocol.DefaultBreakdownResultOutputTypes
import net.shrine.protocol.query.Constrained
import net.shrine.protocol.query.ValueConstraint
import net.shrine.protocol.CertId
import java.math.BigInteger
import java.security.cert.X509Certificate
import java.security.cert.CertificateFactory
import scala.io.Source
import java.io.ByteArrayInputStream
/**
* @author clint
* @since Nov 27, 2013
*/
final class DefaultSignerVerifierTest extends ShouldMatchersForJUnit {
private val authn = AuthenticationInfo("some-domain", "some-username", Credential("sadkljlajdl", isToken = false))
private val certCollection = TestKeystore.certCollection
private val signerVerifier = new DefaultSignerVerifier(certCollection)
import SigningCertStrategy._
import scala.concurrent.duration._
@Test
def testIssuersMatchBetweenCertsWithIPsInDistinguishedNames(): Unit = {
def readCert(fileName: String): X509Certificate = {
val factory = CertificateFactory.getInstance("X.509")
val source = Source.fromInputStream(getClass.getClassLoader.getResourceAsStream(fileName))
val encodedCertData = try { source.mkString } finally { source.close() }
val byteStream = new ByteArrayInputStream(Base64.fromBase64(encodedCertData))
try { factory.generateCertificate(byteStream).asInstanceOf[X509Certificate] }
finally { byteStream.close() }
}
val ca = readCert("test-caroot.pem")
val alpha = readCert("test-alpha-signed.pem")
val beta = readCert("test-beta-signed.pem")
val gamma = readCert("test-gamma-signed.pem")
def shouldMatch[F](field: X509Certificate => F)(a: X509Certificate, b: X509Certificate) {
field(a) should equal(field(b))
//Use options to handle null fields
Option(field(a)).map(_.hashCode) should equal(Option(field(b)).map(_.hashCode))
}
shouldMatch(_.getIssuerDN)(ca, alpha)
shouldMatch(_.getIssuerDN)(ca, beta)
shouldMatch(_.getIssuerDN)(ca, gamma)
shouldMatch(_.getIssuerX500Principal)(ca, alpha)
shouldMatch(_.getIssuerX500Principal)(ca, beta)
shouldMatch(_.getIssuerX500Principal)(ca, gamma)
shouldMatch(_.getIssuerUniqueID)(ca, alpha)
shouldMatch(_.getIssuerUniqueID)(ca, beta)
shouldMatch(_.getIssuerUniqueID)(ca, gamma)
ca.getSerialNumber should not equal(alpha.getSerialNumber)
ca.getSerialNumber should not equal(beta.getSerialNumber)
ca.getSerialNumber should not equal(gamma.getSerialNumber)
}
@Test
def testSigningAndVerificationQueryDefWithSubQueries(): Unit = {
//A failing case reported by Ben C.
val queryDef = QueryDefinition.fromI2b2 {
(t) (493.90) Asthma(250.00) Diabet@15:32:58
ANY
0
Event 1
STARTDATE
FIRST
LESS
Event 2
STARTDATE
FIRST
GREATEREQUAL
365
DAY
Event 1
EVENT
Event 1
SAMEINSTANCENUM
0
1
100
0
SAMEINSTANCENUM
1
-
6
(493.90) Asthma, unspecified type, unspecified
\\SHRINE\SHRINE\Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.90) Asthma, unspecified type, unspecified\
Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.90) Asthma, unspecified type, unspecified\
ENC
LA
false
-
6
(493.91) Asthma, unspecified type, with status asthmaticus
\\SHRINE\SHRINE\Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.91) Asthma, unspecified type, with status asthmaticus\
Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.91) Asthma, unspecified type, with status asthmaticus\
ENC
LA
false
-
6
(493.92) Asthma, unspecified type, with (acute) exacerbation
\\SHRINE\SHRINE\Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.92) Asthma, unspecified type, with (acute) exacerbation\
Diagnoses\Diseases of the respiratory system (460-519.99)\Chronic obstructive pulmonary disease and allied conditions (490-496.99)\Asthma (493)\Asthma, unspecified (493.9)\(493.92) Asthma, unspecified type, with (acute) exacerbation\
ENC
LA
false
Event 2
EVENT
Event 2
SAMEINSTANCENUM
0
1
100
0
SAMEINSTANCENUM
1
-
6
(250.00) Diabetes mellitus without mention of complication, type II or unspecified type, not stated as uncontrolled
\\SHRINE\SHRINE\Diagnoses\Endocrine, nutritional and metabolic diseases, and immunity disorders (240-279.99)\Diseases of other endocrine glands (249-259.99)\Diabetes mellitus (250)\Diabetes mellitus without mention of complication (250.0)\(250.00) Diabetes mellitus without mention of complication, type II or unspecified type, not stated as uncontrolled\
Diagnoses\Endocrine, nutritional and metabolic diseases, and immunity disorders (240-279.99)\Diseases of other endocrine glands (249-259.99)\Diabetes mellitus (250)\Diabetes mellitus without mention of complication (250.0)\(250.00) Diabetes mellitus without mention of complication, type II or unspecified type, not stated as uncontrolled\
ENC
LA
false
-
6
(530.81) Esophageal reflux
\\SHRINE\SHRINE\Diagnoses\Diseases of the digestive system (520-579.99)\Diseases of esophagus, stomach, and duodenum (530-539.99)\Diseases of esophagus (530)\Other specified disorders of esophagus (530.8)\(530.81) Esophageal reflux\
Diagnoses\Diseases of the digestive system (520-579.99)\Diseases of esophagus, stomach, and duodenum (530-539.99)\Diseases of esophagus (530)\Other specified disorders of esophagus (530.8)\(530.81) Esophageal reflux\
ENC
LA
false
}.get
def shouldVerify(signingCertStrategy: SigningCertStrategy): Unit = {
val resultTypes = DefaultBreakdownResultOutputTypes.toSet + ResultOutputType.PATIENT_COUNT_XML
val unsignedMessage = BroadcastMessage(authn, RunQueryRequest("some-project-id", 12345.milliseconds, authn, Some("topic-id"), Some("Topic Name"), resultTypes, queryDef))
val signedMessage = signerVerifier.sign(unsignedMessage, signingCertStrategy)
signerVerifier.verifySig(signedMessage, 1.hour) should be(right = true)
//NB: Simulate going from one machine to another
val roundTripped = xmlRoundTrip(signedMessage)
roundTripped.signature should equal(signedMessage.signature)
signerVerifier.verifySig(roundTripped, 1.hour) should be(right = true)
}
//shouldVerify(Attach)
shouldVerify(DontAttach)
}
//See https://open.med.harvard.edu/jira/browse/SHRINE-859
@Test
def testSigningAndVerificationQueryNameWithSpaces(): Unit = {
def shouldVerify(queryName: String, signingCertStrategy: SigningCertStrategy): Unit = {
val queryDef = QueryDefinition(queryName, Term("""\\PCORNET\PCORI\DEMOGRAPHIC\Age\>= 65 years old\65\"""))
val resultTypes = DefaultBreakdownResultOutputTypes.toSet + ResultOutputType.PATIENT_COUNT_XML
val unsignedMessage = BroadcastMessage(authn, RunQueryRequest("some-project-id", 12345.milliseconds, authn, Some("topic-id"), Some("Topic Name"), resultTypes, queryDef))
val signedMessage = signerVerifier.sign(unsignedMessage, signingCertStrategy)
signerVerifier.verifySig(signedMessage, 1.hour) should be(right = true)
//NB: Simulate going from one machine to another
val roundTripped = xmlRoundTrip(signedMessage)
roundTripped.signature should equal(signedMessage.signature)
signerVerifier.verifySig(roundTripped, 1.hour) should be(right = true)
}
shouldVerify("foo", Attach)
shouldVerify(" foo", Attach)
shouldVerify("foo ", Attach)
shouldVerify("fo o", Attach)
shouldVerify(" 65 years old@13:23:00", Attach)
shouldVerify("foo", DontAttach)
shouldVerify(" foo", DontAttach)
shouldVerify("foo ", DontAttach)
shouldVerify("fo o", DontAttach)
shouldVerify(" 65 years old@13:23:00", DontAttach)
}
@Test
def testSigningAndVerification(): Unit = doTestSigningAndVerification(signerVerifier)
@Test
def testSigningAndVerificationAttachedKnownCertNotSignedByCA(): Unit = {
//Messages will be signed with a key that's in our keystore, but is not signed by a CA
val descriptor = KeyStoreDescriptor(
"shrine.keystore.multiple-private-keys",
"chiptesting",
Some("private-key-2"),
Seq("carra ca"),
KeyStoreType.JKS)
val mySignerVerifier = new DefaultSignerVerifier(KeyStoreCertCollection.fromClassPathResource(descriptor))
val unsignedMessage = BroadcastMessage(authn, DeleteQueryRequest("some-project-id", 12345.milliseconds, authn, 87356L))
val signedMessage = mySignerVerifier.sign(unsignedMessage, Attach)
mySignerVerifier.verifySig(signedMessage, 1.hour) should be(right = false)
}
@Test
def testSigningAndVerificationAttachedUnknownCertNotSignedByCA(): Unit = {
//Messages will be signed with a key that's NOT in our keystore, but is not signed by a CA
val signerDescriptor = KeyStoreDescriptor(
"shrine.keystore.multiple-private-keys",
"chiptesting",
Some("private-key-2"), //This cert is NOT in TestKeystore.certCollection
Seq("carra ca"),
KeyStoreType.JKS)
val signer: Signer = new DefaultSignerVerifier(KeyStoreCertCollection.fromClassPathResource(signerDescriptor))
val verifier: Verifier = new DefaultSignerVerifier(TestKeystore.certCollection)
val unsignedMessage = BroadcastMessage(authn, DeleteQueryRequest("some-project-id", 12345.milliseconds, authn, 87356L))
val signedMessage = signer.sign(unsignedMessage, Attach)
verifier.verifySig(signedMessage, 1.hour) should be(right = false)
}
private def doTestSigningAndVerification(signerVerifier: Signer with Verifier): Unit = {
def doTest(signingCertStrategy: SigningCertStrategy) {
val unsignedMessage = BroadcastMessage(authn, DeleteQueryRequest("some-project-id", 12345.milliseconds, authn, 87356L))
val signedMessage = signerVerifier.sign(unsignedMessage, signingCertStrategy)
(unsignedMessage eq signedMessage) should be(right = false)
unsignedMessage should not equal (signedMessage)
signedMessage.networkAuthn should equal(unsignedMessage.networkAuthn)
signedMessage.request should equal(unsignedMessage.request)
signedMessage.requestId should equal(unsignedMessage.requestId)
unsignedMessage.signature should be(None)
signedMessage.signature.isDefined should be(right = true)
val sig = signedMessage.signature.get
sig.timestamp should not be (null)
sig.signedBy should equal(certCollection.myCertId.get)
sig.value should not be (null)
//The signed message should verify
signerVerifier.verifySig(signedMessage, 1.hour) should be(right = true)
def shouldNotVerify(message: BroadcastMessage) {
signerVerifier.verifySig(xmlRoundTrip(message), 1.hour) should be(right = false)
}
//The unsigned one should not
shouldNotVerify(unsignedMessage)
//Expired sigs shouldn't verify
signerVerifier.verifySig(signedMessage, 0.hours) should be(right = false)
//modifying anything should prevent verification
shouldNotVerify {
- val anotherRequest = signedMessage.request.asInstanceOf[DeleteQueryRequest].copy(queryId = 123L)
+ val anotherRequest = signedMessage.request.asInstanceOf[DeleteQueryRequest].copy(networkQueryId = 123L)
signedMessage.withRequest(anotherRequest)
}
shouldNotVerify {
signedMessage.withRequestId(99999L)
}
shouldNotVerify {
signedMessage.copy(networkAuthn = signedMessage.networkAuthn.copy(domain = "askldjlakjsd"))
}
shouldNotVerify {
signedMessage.copy(networkAuthn = signedMessage.networkAuthn.copy(username = "askldjlakjsd"))
}
shouldNotVerify {
signedMessage.copy(networkAuthn = signedMessage.networkAuthn.copy(credential = signedMessage.networkAuthn.credential.copy(isToken = true)))
}
shouldNotVerify {
signedMessage.copy(networkAuthn = signedMessage.networkAuthn.copy(credential = signedMessage.networkAuthn.credential.copy(value = "oieutorutoirutioerutoireuto")))
}
shouldNotVerify {
val timestamp = signedMessage.signature.get.timestamp
import scala.concurrent.duration._
import XmlGcEnrichments._
val newTimestamp = timestamp + 123.minutes
signedMessage.withSignature(signedMessage.signature.get.copy(timestamp = newTimestamp))
}
shouldNotVerify {
val timestamp = signedMessage.signature.get.timestamp
import scala.concurrent.duration._
import XmlGcEnrichments._
val newTimestamp = timestamp + (-99).minutes
signedMessage.withSignature(signedMessage.signature.get.copy(timestamp = newTimestamp))
}
}
doTest(Attach)
doTest(DontAttach)
}
@Test
def testSigningAndVerificationModifiedTerm(): Unit = {
import scala.concurrent.duration._
def doVerificationTest(signingCertStrategy: SigningCertStrategy, queryDef: QueryDefinition): Unit = {
val req = RunQueryRequest("some-project-id", 12345.milliseconds, authn, Some("topic-id"), Some("Topic Name"), Set(ResultOutputType.PATIENT_COUNT_XML), queryDef)
val unsignedMessage = BroadcastMessage(authn, req)
val signedMessage = signerVerifier.sign(unsignedMessage, signingCertStrategy)
(unsignedMessage eq signedMessage) should be(right = false)
unsignedMessage should not equal (signedMessage)
signedMessage.networkAuthn should equal(unsignedMessage.networkAuthn)
signedMessage.request should equal(unsignedMessage.request)
signedMessage.requestId should equal(unsignedMessage.requestId)
unsignedMessage.signature should be(None)
signedMessage.signature.isDefined should be(right = true)
val sig = signedMessage.signature.get
sig.timestamp should not be (null)
sig.signedBy should equal(certCollection.myCertId.get)
sig.value should not be (null)
//The signed message should verify
signerVerifier.verifySig(xmlRoundTrip(signedMessage), 1.hour) should be(right = true)
}
val t1 = Term("t1")
val t2 = Term("t2")
val t3 = Term("t3")
doVerificationTest(Attach, QueryDefinition("foo", Term("")))
doVerificationTest(Attach, QueryDefinition("foo", Constrained(t1, Some(Modifiers("n", "ap", t2.value)), None)))
doVerificationTest(Attach, QueryDefinition("foo", Or(t1, Constrained(t2, Some(Modifiers("n", "ap", t3.value)), None))))
doVerificationTest(Attach, QueryDefinition("foo", Or(t1, Constrained(t2, None, Some(ValueConstraint("foo", Some("bar"), "baz", "Nuh"))))))
doVerificationTest(Attach, QueryDefinition("foo", Or(t1, Constrained(t2, None, Some(ValueConstraint("foo", None, "baz", "Nuh"))))))
doVerificationTest(DontAttach, QueryDefinition("foo", Term("")))
doVerificationTest(DontAttach, QueryDefinition("foo", Constrained(t1, Some(Modifiers("n", "ap", t2.value)), None)))
doVerificationTest(DontAttach, QueryDefinition("foo", Or(t1, Constrained(t2, Some(Modifiers("n", "ap", t3.value)), None))))
doVerificationTest(DontAttach, QueryDefinition("foo", Or(t1, Constrained(t2, None, Some(ValueConstraint("foo", Some("bar"), "baz", "Nuh"))))))
doVerificationTest(DontAttach, QueryDefinition("foo", Or(t1, Constrained(t2, None, Some(ValueConstraint("foo", None, "baz", "Nuh"))))))
}
@Test
def testSigningAndVerificationReadQueryResultRequest(): Unit = {
def doTest(signingCertStrategy: SigningCertStrategy) {
val localAuthn = AuthenticationInfo("i2b2demo", "shrine", Credential("SessionKey:PX4LlvLrMhybWRQfoobarbaz", isToken = true))
import scala.concurrent.duration._
val unsignedMessage = BroadcastMessage(authn, ReadQueryResultRequest("SHRINE", 180.seconds, localAuthn, 7923919416951966472L))
val signedMessage = signerVerifier.sign(unsignedMessage, signingCertStrategy)
(unsignedMessage eq signedMessage) should be(right = false)
unsignedMessage should not equal (signedMessage)
signedMessage.networkAuthn should equal(unsignedMessage.networkAuthn)
signedMessage.request should equal(unsignedMessage.request)
signedMessage.requestId should equal(unsignedMessage.requestId)
unsignedMessage.signature should be(None)
signedMessage.signature.isDefined should be(right = true)
val sig = signedMessage.signature.get
sig.timestamp should not be (null)
sig.signedBy should equal(certCollection.myCertId.get)
sig.value should not be (null)
import scala.concurrent.duration._
//The signed message should verify
signerVerifier.verifySig(xmlRoundTrip(signedMessage), 1.hour) should be(right = true)
}
doTest(Attach)
doTest(DontAttach)
}
private def getCertByAlias(alias: String) = certCollection.asInstanceOf[KeyStoreCertCollection].getX509Cert(alias).get
@Test
def testIsSignedByTrustedCA(): Unit = {
import signerVerifier.isSignedByTrustedCA
val signedByCa = getCertByAlias("test-cert")
val notSignedByCa = getCertByAlias("spin-t1")
isSignedByTrustedCA(signedByCa).get should be(right = true)
isSignedByTrustedCA(notSignedByCa).isFailure should be(right = true)
//TODO: Test case where isSignedByTrustedCA produces Success(false):
//where CA cert (param's issuer-DN) IS in our keystore, but X509Certificate.verify(PublicKey) returns false
}
@Test
def testObtainAndValidateSigningCert(): Unit = {
import signerVerifier.obtainAndValidateSigningCert
import scala.concurrent.duration._
val unsignedMessage = BroadcastMessage(authn, ReadQueryResultRequest("SHRINE", 180.seconds, authn, 7923919416951966472L))
//attached signing cert signed by known CA
{
val signedMessageWithAttachedSigner = signerVerifier.sign(unsignedMessage, SigningCertStrategy.Attach)
val signerCert = obtainAndValidateSigningCert(signedMessageWithAttachedSigner.signature.get).get
signerCert should equal(getCertByAlias("test-cert"))
}
//Known signer, no attached signing cert
{
val signedMessageWithoutAttachedSigner = signerVerifier.sign(unsignedMessage, SigningCertStrategy.DontAttach)
val signerCert = obtainAndValidateSigningCert(signedMessageWithoutAttachedSigner.signature.get).get
signerCert should equal(getCertByAlias("test-cert"))
}
//No attached cert, unknown signer: obtaining signing cert should fail
{
val signedMessage = signerVerifier.sign(unsignedMessage, SigningCertStrategy.DontAttach)
val unknownSigner = CertId(new BigInteger("-1"))
val signedMessageWithUnknownSigner = signedMessage.copy(signature = Some(signedMessage.signature.get.copy(signedBy = unknownSigner)))
val signerCertAttempt = obtainAndValidateSigningCert(signedMessageWithUnknownSigner.signature.get)
signerCertAttempt.isFailure should be(right = true)
}
}
private def xmlRoundTrip(message: BroadcastMessage): BroadcastMessage = {
val roundTripped = BroadcastMessage.fromXml(message.toXml).get
roundTripped should equal(message)
message
}
}
\ No newline at end of file
diff --git a/commons/protocol/src/main/scala/net/shrine/protocol/DeleteQueryRequest.scala b/commons/protocol/src/main/scala/net/shrine/protocol/DeleteQueryRequest.scala
index da2d0c5d4..f5d383425 100644
--- a/commons/protocol/src/main/scala/net/shrine/protocol/DeleteQueryRequest.scala
+++ b/commons/protocol/src/main/scala/net/shrine/protocol/DeleteQueryRequest.scala
@@ -1,86 +1,85 @@
package net.shrine.protocol
import scala.concurrent.duration.Duration
import scala.util.Try
import scala.xml.NodeSeq
-import net.shrine.serialization.I2b2Unmarshaller
import net.shrine.util.XmlUtil
import net.shrine.util.NodeSeqEnrichments
import net.shrine.serialization.I2b2UnmarshallingHelpers
/**
* @author Bill Simons
- * @date 3/28/11
- * @link http://cbmi.med.harvard.edu
- * @link http://chip.org
+ * @since 3/28/11
+ * @see http://cbmi.med.harvard.edu
+ * @see http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
- * @link http://www.gnu.org/licenses/lgpl.html
+ * @see http://www.gnu.org/licenses/lgpl.html
*
* NB: this is a case class to get a structural equality contract in hashCode and equals, mostly for testing
*/
final case class DeleteQueryRequest(
- override val projectId: String,
- override val waitTime: Duration,
- override val authn: AuthenticationInfo,
- val queryId: Long) extends ShrineRequest(projectId, waitTime, authn) with CrcRequest with TranslatableRequest[DeleteQueryRequest] with HandleableShrineRequest with HandleableI2b2Request {
+ override val projectId: String,
+ override val waitTime: Duration,
+ override val authn: AuthenticationInfo,
+ networkQueryId: Long) extends ShrineRequest(projectId, waitTime, authn) with CrcRequest with TranslatableRequest[DeleteQueryRequest] with HandleableShrineRequest with HandleableI2b2Request {
override val requestType = RequestType.MasterDeleteRequest
override def handle(handler: ShrineRequestHandler, shouldBroadcast: Boolean) = handler.deleteQuery(this, shouldBroadcast)
-
+
override def handleI2b2(handler: I2b2RequestHandler, shouldBroadcast: Boolean) = handler.deleteQuery(this, shouldBroadcast)
override def toXml: NodeSeq = XmlUtil.stripWhitespace {
{ headerFragment }
- { queryId }
+ { networkQueryId }
}
-
- def withId(id: Long) = this.copy(queryId = id)
+
+ def withId(id: Long) = this.copy(networkQueryId = id)
override def withAuthn(ai: AuthenticationInfo) = this.copy(authn = ai)
override def withProject(proj: String) = this.copy(projectId = proj)
protected override def i2b2MessageBody = XmlUtil.stripWhitespace {
{ i2b2PsmHeader }
{ authn.username }
- { queryId }
+ { networkQueryId }
}
}
object DeleteQueryRequest extends I2b2XmlUnmarshaller[DeleteQueryRequest] with ShrineXmlUnmarshaller[DeleteQueryRequest] with ShrineRequestUnmarshaller with I2b2UnmarshallingHelpers {
override def fromI2b2(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[DeleteQueryRequest] = {
import NodeSeqEnrichments.Strictness._
for {
projectId <- i2b2ProjectId(xml)
waitTime <- i2b2WaitTime(xml)
authn <- i2b2AuthenticationInfo(xml)
masterId <- (xml withChild "message_body" withChild "request" withChild "query_master_id").map(_.text.toLong)
} yield {
DeleteQueryRequest(projectId, waitTime, authn, masterId)
}
}
override def fromXml(breakdownTypes: Set[ResultOutputType])(xml: NodeSeq): Try[DeleteQueryRequest] = {
import NodeSeqEnrichments.Strictness._
for {
waitTime <- shrineWaitTime(xml)
authn <- shrineAuthenticationInfo(xml)
queryId <- xml.withChild("queryId").map(_.text.toLong)
projectId <- shrineProjectId(xml)
} yield {
DeleteQueryRequest(projectId, waitTime, authn, queryId)
}
}
}
\ No newline at end of file
diff --git a/commons/protocol/src/test/scala/net/shrine/protocol/DeleteQueryRequestTest.scala b/commons/protocol/src/test/scala/net/shrine/protocol/DeleteQueryRequestTest.scala
index 668c0f1e1..3d64d57e3 100644
--- a/commons/protocol/src/test/scala/net/shrine/protocol/DeleteQueryRequestTest.scala
+++ b/commons/protocol/src/test/scala/net/shrine/protocol/DeleteQueryRequestTest.scala
@@ -1,85 +1,85 @@
package net.shrine.protocol
import org.junit.Test
import org.junit.Assert.assertTrue
import xml.Utility
import net.shrine.util.XmlUtil
/**
* @author Bill Simons
* @date 3/28/11
* @link http://cbmi.med.harvard.edu
* @link http://chip.org
*
* NOTICE: This software comes with NO guarantees whatsoever and is
* licensed as Lgpl Open Source
* @link http://www.gnu.org/licenses/lgpl.html
*/
final class DeleteQueryRequestTest extends ShrineRequestValidator {
val queryId = 2422297885846950097L
override def messageBody = XmlUtil.stripWhitespace {
{ authn.username }
0
0
CRC_QRY_deleteQueryMaster
{ authn.username }
{ queryId }
}
val deleteQueryRequest = XmlUtil.stripWhitespace {
{ requestHeaderFragment }
{ queryId }
}
@Test
override def testFromI2b2 {
val translatedRequest = DeleteQueryRequest.fromI2b2(DefaultBreakdownResultOutputTypes.toSet)(request).get
validateRequestWith(translatedRequest) {
- translatedRequest.queryId should equal(queryId)
+ translatedRequest.networkQueryId should equal(queryId)
}
}
@Test
override def testShrineRequestFromI2b2 {
assertTrue(CrcRequest.fromI2b2(DefaultBreakdownResultOutputTypes.toSet)(request).get.isInstanceOf[DeleteQueryRequest])
}
@Test
def testDoubleDispatchingShrineRequestFromI2b2 {
assertTrue(HandleableShrineRequest.fromI2b2(DefaultBreakdownResultOutputTypes.toSet)(request).get.isInstanceOf[DeleteQueryRequest])
}
@Test
override def testToXml {
DeleteQueryRequest(projectId, waitTime, authn, queryId).toXml should equal(deleteQueryRequest)
}
@Test
def testToI2b2 {
DeleteQueryRequest(projectId, waitTime, authn, queryId).toI2b2 should equal(request)
}
@Test
def testFromXml {
val actual = DeleteQueryRequest.fromXml(DefaultBreakdownResultOutputTypes.toSet)(deleteQueryRequest).get
validateRequestWith(actual) {
- actual.queryId should equal(queryId)
+ actual.networkQueryId should equal(queryId)
}
}
@Test
def testShrineRequestFromXml {
ShrineRequest.fromXml(Set.empty)(deleteQueryRequest).get.isInstanceOf[DeleteQueryRequest] should be(true)
}
}
\ No newline at end of file
diff --git a/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala b/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala
index aad2f697d..d6baf2068 100644
--- a/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala
+++ b/integration/src/test/scala/net/shrine/integration/OneHubTwoSpokesJaxrsTest.scala
@@ -1,90 +1,90 @@
package net.shrine.integration
import net.shrine.protocol.{DeleteQueryRequest, DeleteQueryResponse, RequestType, Result}
import net.shrine.util.ShouldMatchersForJUnit
import org.junit.{After, Before, Test}
/**
* @author clint
* @since Jan 8, 2014
*
* An end-to-end JAX-RS test that fires up a Hub and two spokes, makes a query,
* and verifies that the correct requests were broadcast to the spokes, and
* that the correct responses were received and aggregated at the hub.
*
* DeleteQueryResponses are used because they have very few fields and are easy
* to construct and verify. It might be nice in the future to use
* RunQuery{Request,Response}, but that was more trouble than it was worth for
* a first pass.
*
* NB: The hub runs on port 9997, and the two spokes run on ports 9998 and 9999.
*/
final class OneHubTwoSpokesJaxrsTest extends AbstractHubAndSpokesTest with ShouldMatchersForJUnit { thisTest =>
@Test
def testBroadcastDeleteQueryShrine: Unit = {
doTestBroadcastDeleteQuery(shrineHubComponent)
}
@Test
def testBroadcastDeleteQueryI2b2: Unit = {
doTestBroadcastDeleteQuery(i2b2HubComponent)
}
lazy val shrineHubComponent = Hubs.Shrine(thisTest, port = 9997)
lazy val i2b2HubComponent = Hubs.I2b2(thisTest, port = 9996)
private def doTestBroadcastDeleteQuery[H <: AnyRef](hubComponent: AbstractHubComponent[H]): Unit = {
val masterId = 123456L
val projectId = "some-project-id"
val client = hubComponent.clientFor(projectId, networkAuthn)
//Broadcast a message
val resp = client.deleteQuery(masterId, true)
//Make sure we got the right response
resp.queryId should equal(masterId)
//Make sure all the spokes received the right message
spokes.foreach { spoke =>
val lastMessage = spoke.mockHandler.lastMessage.get
lastMessage.networkAuthn.domain should equal(networkAuthn.domain)
lastMessage.networkAuthn.username should equal(networkAuthn.username)
val req = lastMessage.request.asInstanceOf[DeleteQueryRequest]
- req.queryId should equal(masterId)
+ req.networkQueryId should equal(masterId)
req.projectId should equal(projectId)
req.requestType should equal(RequestType.MasterDeleteRequest)
req.authn should equal(networkAuthn)
}
//Make sure we got the right responses at the hub
val multiplexer = hubComponent.broadcaster.lastMultiplexer.get
val expectedResponses = spokes.map { spoke =>
Result(spoke.nodeId, spoke.mockHandler.elapsed, DeleteQueryResponse(masterId))
}.toSet
multiplexer.resultsSoFar.toSet should equal(expectedResponses)
}
@Before
override def setUp() {
super.setUp()
shrineHubComponent.JerseyTest.setUp()
i2b2HubComponent.JerseyTest.setUp()
}
@After
override def tearDown() {
shrineHubComponent.JerseyTest.tearDown()
i2b2HubComponent.JerseyTest.tearDown()
super.tearDown()
}
}
\ No newline at end of file
diff --git a/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala b/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala
index 215b63c9b..54460a951 100644
--- a/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala
+++ b/integration/src/test/scala/net/shrine/integration/OneQepOneHubTwoSpokesJaxrsTest.scala
@@ -1,247 +1,247 @@
package net.shrine.integration
import net.shrine.adapter.client.RemoteAdapterClient
import net.shrine.adapter.service.JerseyTestComponent
import net.shrine.broadcaster.{AdapterClientBroadcaster, NodeHandle, PosterBroadcasterClient}
import net.shrine.broadcaster.service.{BroadcasterMultiplexerRequestHandler, BroadcasterMultiplexerResource, BroadcasterMultiplexerService}
import net.shrine.protocol.query.{Constrained, Modifiers, Or, QueryDefinition, Term, ValueConstraint}
import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, DeleteQueryResponse, FlagQueryRequest, FlagQueryResponse, NodeId, RequestType, Result, ResultOutputType, RunQueryRequest, RunQueryResponse, UnFlagQueryRequest, UnFlagQueryResponse}
import net.shrine.util.ShouldMatchersForJUnit
import org.junit.{After, Before, Test}
/**
* @author clint
* @since Mar 6, 2014
*/
final class OneQepOneHubTwoSpokesJaxrsTest extends AbstractHubAndSpokesTest with ShouldMatchersForJUnit { thisTest =>
@Test
def testBroadcastDeleteQueryShrine(): Unit = doTestBroadcastDeleteQuery(shrineQueryEntryPointComponent)
@Test
def testBroadcastDeleteQueryI2b2(): Unit = doTestBroadcastDeleteQuery(i2b2QueryEntryPointComponent)
@Test
def testBroadcastFlagQueryShrine(): Unit = doTestBroadcastFlagQuery(shrineQueryEntryPointComponent)
@Test
def testBroadcastFlagQueryI2b2(): Unit = doTestBroadcastFlagQuery(i2b2QueryEntryPointComponent)
@Test
def testBroadcastUnFlagQueryShrine(): Unit = doTestBroadcastUnFlagQuery(shrineQueryEntryPointComponent)
@Test
def testBroadcastUnFlagQueryI2b2(): Unit = doTestBroadcastUnFlagQuery(i2b2QueryEntryPointComponent)
@Test
def testBroadcastRunQueryShrine(): Unit = doTestBroadcastRunQuery(shrineQueryEntryPointComponent)
@Test
def testBroadcastRunQueryI2b2(): Unit = doTestBroadcastRunQuery(i2b2QueryEntryPointComponent)
private def doTestBroadcastDeleteQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = {
val masterId = 123456L
val projectId = "some-project-id"
val client = queryEntryPointComponent.clientFor(projectId, networkAuthn)
//Broadcast a message
val resp = client.deleteQuery(masterId, true)
//Make sure we got the right response
resp.queryId should equal(masterId)
//Make sure all the spokes received the right message
spokes.foreach { spoke =>
val lastMessage = spoke.mockHandler.lastMessage.get
lastMessage.networkAuthn.domain should equal(networkAuthn.domain)
lastMessage.networkAuthn.username should equal(networkAuthn.username)
val req = lastMessage.request.asInstanceOf[DeleteQueryRequest]
- req.queryId should equal(masterId)
+ req.networkQueryId should equal(masterId)
req.projectId should equal(projectId)
req.requestType should equal(RequestType.MasterDeleteRequest)
req.authn should equal(networkAuthn)
}
//Make sure we got the right responses at the hub
val multiplexer = HubComponent.broadcaster.lastMultiplexer.get
val expectedResponses = spokes.map { spoke =>
Result(spoke.nodeId, spoke.mockHandler.elapsed, DeleteQueryResponse(masterId))
}.toSet
multiplexer.resultsSoFar.toSet should equal(expectedResponses)
}
private def doTestBroadcastRunQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = {
val masterId = 123456L
val projectId = "some-project-id"
val client = queryEntryPointComponent.clientFor(projectId, networkAuthn)
//Include a modified term, to ensure they're parsed properly
val queryDefinition = QueryDefinition("foo", Or(Term("x"), Constrained(Term("y"), Modifiers("some-modifier", "ap", "k"), ValueConstraint("foo", Some("bar"), "baz", "nuh"))))
//Broadcast a message
val resp = client.runQuery("some-topic-id", Set(ResultOutputType.PATIENT_COUNT_XML), queryDefinition, true)
resp.results.size should equal(spokes.size)
//Make sure all the spokes received the right message
spokes.foreach { spoke =>
val lastMessage = spoke.mockHandler.lastMessage.get
lastMessage.networkAuthn.domain should equal(networkAuthn.domain)
lastMessage.networkAuthn.username should equal(networkAuthn.username)
val req = lastMessage.request.asInstanceOf[RunQueryRequest]
req.projectId should equal(projectId)
req.requestType should equal(RequestType.QueryDefinitionRequest)
req.authn should equal(networkAuthn)
req.queryDefinition should equal(queryDefinition)
}
//Make sure we got the right responses at the hub
val multiplexer = HubComponent.broadcaster.lastMultiplexer.get
multiplexer.resultsSoFar.collect { case Result(_, _, payload) => payload.getClass } should equal((1 to spokes.size).map(_ => classOf[RunQueryResponse]))
val expectedResponders = spokes.map(_.nodeId).toSet
multiplexer.resultsSoFar.map(_.origin).toSet should equal(expectedResponders)
}
private def doTestBroadcastFlagQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = {
val networkQueryId = 123456L
val projectId = "some-project-id"
val client = queryEntryPointComponent.clientFor(projectId, networkAuthn)
val message = "flag message"
//Broadcast a message
val resp = client.flagQuery(networkQueryId, Some(message), true)
//Make sure we got the right response
resp should be(FlagQueryResponse)
//Make sure all the spokes received the right message
spokes.foreach { spoke =>
val lastMessage = spoke.mockHandler.lastMessage.get
lastMessage.networkAuthn.domain should equal(networkAuthn.domain)
lastMessage.networkAuthn.username should equal(networkAuthn.username)
val req = lastMessage.request.asInstanceOf[FlagQueryRequest]
req.networkQueryId should equal(networkQueryId)
req.projectId should equal(projectId)
req.requestType should equal(RequestType.FlagQueryRequest)
req.authn should equal(networkAuthn)
req.message should be(Some(message))
}
//Make sure we got the right responses at the hub
val multiplexer = HubComponent.broadcaster.lastMultiplexer.get
val expectedResponses = spokes.map { spoke =>
Result(spoke.nodeId, spoke.mockHandler.elapsed, FlagQueryResponse)
}.toSet
multiplexer.resultsSoFar.toSet should equal(expectedResponses)
}
private def doTestBroadcastUnFlagQuery[H <: AnyRef](queryEntryPointComponent: AbstractHubComponent[H]): Unit = {
val networkQueryId = 123456L
val projectId = "some-project-id"
val client = queryEntryPointComponent.clientFor(projectId, networkAuthn)
//Broadcast a message
val resp = client.unFlagQuery(networkQueryId, true)
//Make sure we got the right response
resp should be(UnFlagQueryResponse)
//Make sure all the spokes received the right message
spokes.foreach { spoke =>
val lastMessage = spoke.mockHandler.lastMessage.get
lastMessage.networkAuthn.domain should equal(networkAuthn.domain)
lastMessage.networkAuthn.username should equal(networkAuthn.username)
val req = lastMessage.request.asInstanceOf[UnFlagQueryRequest]
req.networkQueryId should equal(networkQueryId)
req.projectId should equal(projectId)
req.requestType should equal(RequestType.UnFlagQueryRequest)
req.authn should equal(networkAuthn)
}
//Make sure we got the right responses at the hub
val multiplexer = HubComponent.broadcaster.lastMultiplexer.get
val expectedResponses = spokes.map { spoke =>
Result(spoke.nodeId, spoke.mockHandler.elapsed, UnFlagQueryResponse)
}.toSet
multiplexer.resultsSoFar.toSet should equal(expectedResponses)
}
import scala.concurrent.duration._
lazy val i2b2QueryEntryPointComponent = Hubs.I2b2(thisTest, port = 9995, broadcasterClient = Some(PosterBroadcasterClient(posterFor(HubComponent), DefaultBreakdownResultOutputTypes.toSet)))
lazy val shrineQueryEntryPointComponent = Hubs.Shrine(thisTest, port = 9996, broadcasterClient = Some(PosterBroadcasterClient(posterFor(HubComponent), DefaultBreakdownResultOutputTypes.toSet)))
object HubComponent extends JerseyTestComponent[BroadcasterMultiplexerRequestHandler] {
override val basePath = "broadcaster/broadcast"
override val port = 9997
override def resourceClass(handler: BroadcasterMultiplexerRequestHandler) = BroadcasterMultiplexerResource(handler)
lazy val broadcaster: InspectableDelegatingBroadcaster = {
val destinations: Set[NodeHandle] = spokes.map { spoke =>
val client = RemoteAdapterClient(NodeId.Unknown,posterFor(spoke), DefaultBreakdownResultOutputTypes.toSet)
NodeHandle(spoke.nodeId, client)
}
InspectableDelegatingBroadcaster(AdapterClientBroadcaster(destinations, MockHubDao))
}
override lazy val makeHandler: BroadcasterMultiplexerRequestHandler = {
BroadcasterMultiplexerService(broadcaster, 1.hour)
}
}
@Before
override def setUp() {
super.setUp()
HubComponent.JerseyTest.setUp()
shrineQueryEntryPointComponent.JerseyTest.setUp()
i2b2QueryEntryPointComponent.JerseyTest.setUp()
}
@After
override def tearDown() {
i2b2QueryEntryPointComponent.JerseyTest.tearDown()
shrineQueryEntryPointComponent.JerseyTest.tearDown()
HubComponent.JerseyTest.tearDown()
super.tearDown()
}
}
\ No newline at end of file
diff --git a/integration/src/test/scala/net/shrine/integration/mockAdapterRequestHandlers.scala b/integration/src/test/scala/net/shrine/integration/mockAdapterRequestHandlers.scala
index b46e3c67e..bdee2980e 100644
--- a/integration/src/test/scala/net/shrine/integration/mockAdapterRequestHandlers.scala
+++ b/integration/src/test/scala/net/shrine/integration/mockAdapterRequestHandlers.scala
@@ -1,84 +1,84 @@
package net.shrine.integration
import net.shrine.adapter.service.AdapterRequestHandler
import net.shrine.log.Loggable
import net.shrine.protocol.NodeId
import net.shrine.protocol.BroadcastMessage
import net.shrine.protocol.Result
import net.shrine.protocol.DeleteQueryRequest
import net.shrine.protocol.DeleteQueryResponse
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import net.shrine.protocol.FlagQueryRequest
import net.shrine.protocol.UnFlagQueryRequest
import net.shrine.protocol.FlagQueryResponse
import net.shrine.protocol.UnFlagQueryResponse
import net.shrine.protocol.RunQueryRequest
import net.shrine.protocol.RunQueryResponse
import net.shrine.util.XmlDateHelper
import net.shrine.protocol.QueryResult
/**
* @author clint
* @date Dec 17, 2013
*/
class MockAdapterRequestHandler(val nodeId: NodeId) extends AdapterRequestHandler with Loggable {
val elapsed = 1.second
private[this] val lock = new AnyRef
@volatile private[this] var lastMessageOption: Option[BroadcastMessage] = None
def lastMessage: Option[BroadcastMessage] = lock.synchronized(lastMessageOption)
override def handleRequest(message: BroadcastMessage): Result = {
lock.synchronized {
lastMessageOption = Option(message)
}
message.request match {
- case req: DeleteQueryRequest => Result(nodeId, elapsed, DeleteQueryResponse(req.queryId))
+ case req: DeleteQueryRequest => Result(nodeId, elapsed, DeleteQueryResponse(req.networkQueryId))
case req: FlagQueryRequest => Result(nodeId, elapsed, FlagQueryResponse)
case req: UnFlagQueryRequest => Result(nodeId, elapsed, UnFlagQueryResponse)
case req: RunQueryRequest => Result(nodeId, elapsed, {
val now = XmlDateHelper.now
val queryResult = QueryResult(34782L, 2395723L, req.outputTypes.headOption, 123L, Some(now), Some(now), None, QueryResult.StatusType.Finished, None)
RunQueryResponse(message.requestId, XmlDateHelper.now, req.authn.username, req.authn.domain, req.queryDefinition, 94587L, queryResult)
})
case r => {
error(s"Unexpected request: $r")
???
}
}
}
}
object MockAdapterRequestHandler extends MockAdapterRequestHandler(NodeId("MOCK"))
/**
* @author clint
* @date Dec 17, 2013
*/
object AlwaysThrowsAdapterRequestHandler extends AdapterRequestHandler {
override def handleRequest(message: BroadcastMessage): Result = throw new Exception("blarg") with NoStackTrace
}
/**
* @author clint
* @date Dec 17, 2013
*/
final case class TimesOutAdapterRequestHandler(howLong: Duration) extends AdapterRequestHandler {
val nodeId = NodeId("TIMESOUT")
override def handleRequest(message: BroadcastMessage): Result = message.request match {
case req: DeleteQueryRequest => {
Thread.sleep(howLong.toMillis)
- Result(nodeId, 1.second, DeleteQueryResponse(req.queryId))
+ Result(nodeId, 1.second, DeleteQueryResponse(req.networkQueryId))
}
case _ => ???
}
}
\ No newline at end of file
diff --git a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
index 5a7f29386..b54f4ba75 100644
--- a/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
+++ b/qep/service/src/main/scala/net/shrine/qep/AbstractQepService.scala
@@ -1,235 +1,236 @@
package net.shrine.qep
import net.shrine.aggregation.{Aggregator, Aggregators, DeleteQueryAggregator, FlagQueryAggregator, ReadInstanceResultsAggregator, ReadQueryDefinitionAggregator, RenameQueryAggregator, RunQueryAggregator, UnFlagQueryAggregator}
import net.shrine.authentication.{AuthenticationResult, Authenticator, NotAuthenticatedException}
import net.shrine.authorization.AuthorizationResult.{Authorized, NotAuthorized}
import net.shrine.authorization.QueryAuthorizationService
import net.shrine.broadcaster.BroadcastAndAggregationService
import net.shrine.log.Loggable
import net.shrine.protocol.{QueryResult, AggregatedReadInstanceResultsResponse, AggregatedRunQueryResponse, AuthenticationInfo, BaseShrineRequest, BaseShrineResponse, Credential, DeleteQueryRequest, FlagQueryRequest, QueryInstance, ReadApprovedQueryTopicsRequest, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, ReadQueryDefinitionRequest, ReadQueryInstancesRequest, ReadQueryInstancesResponse, ReadResultOutputTypesRequest, ReadResultOutputTypesResponse, RenameQueryRequest, ResultOutputType, RunQueryRequest, UnFlagQueryRequest}
import net.shrine.qep.audit.QepAuditDb
import net.shrine.qep.dao.AuditDao
import net.shrine.qep.queries.QepQueryDb
import net.shrine.util.XmlDateHelper
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
/**
* @author clint
* @since Feb 19, 2014
*/
trait AbstractQepService[BaseResp <: BaseShrineResponse] extends Loggable {
val commonName:String
val auditDao: AuditDao
val authenticator: Authenticator
val authorizationService: QueryAuthorizationService
val includeAggregateResult: Boolean
val broadcastAndAggregationService: BroadcastAndAggregationService
val queryTimeout: Duration
val breakdownTypes: Set[ResultOutputType]
val collectQepAudit:Boolean
protected def doReadResultOutputTypes(request: ReadResultOutputTypesRequest): BaseResp = {
info(s"doReadResultOutputTypes($request)")
authenticateAndThen(request) { authResult =>
val resultOutputTypes = ResultOutputType.nonErrorTypes ++ breakdownTypes
//TODO: XXX: HACK: Would like to remove the cast
ReadResultOutputTypesResponse(resultOutputTypes).asInstanceOf[BaseResp]
}
}
protected def doFlagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = {
QepQueryDb.db.insertQepQueryFlag(request)
doBroadcastQuery(request, new FlagQueryAggregator, shouldBroadcast)
}
protected def doUnFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseResp = {
QepQueryDb.db.insertQepQueryFlag(request)
doBroadcastQuery(request, new UnFlagQueryAggregator, shouldBroadcast)
}
protected def doRunQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseResp = {
info(s"doRunQuery($request,$shouldBroadcast) with $runQueryAggregatorFor")
//store the query in the qep's database
doBroadcastQuery(request, runQueryAggregatorFor(request), shouldBroadcast)
}
protected def doReadQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseResp = {
info(s"doReadQueryDefinition($request,$shouldBroadcast)")
doBroadcastQuery(request, new ReadQueryDefinitionAggregator, shouldBroadcast)
}
protected def doReadInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseResp = {
info(s"doReadInstanceResults($request,$shouldBroadcast)")
val networkId = request.shrineNetworkQueryId
//read from the QEP database code here. Only broadcast if some result is in some sketchy state
val resultsFromDb:Seq[QueryResult] = QepQueryDb.db.selectMostRecentQepResultsFor(networkId)
//If any query result was pending
if(resultsFromDb.nonEmpty && (!resultsFromDb.exists(!_.statusType.isDone))) {
debug(s"Using qep cached results for query $networkId")
AggregatedReadInstanceResultsResponse(networkId,resultsFromDb).asInstanceOf[BaseResp]
}
else {
debug(s"Using qep cached results for query $networkId")
val response = doBroadcastQuery(request, new ReadInstanceResultsAggregator(networkId, false), shouldBroadcast)
//put the new results in the database if we got what we wanted
response match {
case arirr:AggregatedReadInstanceResultsResponse => arirr.results.foreach(r => QepQueryDb.db.insertQueryResult(networkId,r))
case _ => //do nothing
}
response
}
}
protected def doReadQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseResp = {
info(s"doReadQueryInstances($request,$shouldBroadcast)")
authenticateAndThen(request) { authResult =>
val now = XmlDateHelper.now
val networkQueryId = request.networkQueryId
val username = request.authn.username
val groupId = request.projectId
//NB: Return a dummy response, with a dummy QueryInstance containing the network (Shrine) id of the query we'd like
//to get "instances" for. This allows the legacy web client to formulate a request for query results that Shrine
//can understand, while meeting the conversational requirements of the legacy web client.
val instance = QueryInstance(networkQueryId.toString, networkQueryId.toString, username, groupId, now, now)
//TODO: XXX: HACK: Would like to remove the cast
//NB: Munge in username from authentication result
ReadQueryInstancesResponse(networkQueryId, authResult.username, groupId, Seq(instance)).asInstanceOf[BaseResp]
}
}
protected def doReadPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): ReadPreviousQueriesResponse = {
info(s"doReadPreviousQueries($request,$shouldBroadcast)")
//todo if any results are in one of the pending states go ahead and request them async (has to wait for async Shrine)
//pull queries from the local database.
QepQueryDb.db.selectPreviousQueries(request)
}
protected def doRenameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseResp = {
- //todo handle with local/local
info(s"doRenameQuery($request,$shouldBroadcast)")
+ QepQueryDb.db.renamePreviousQuery(request)
+
doBroadcastQuery(request, new RenameQueryAggregator, shouldBroadcast)
}
protected def doDeleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseResp = {
//todo handle with local/local
info(s"doDeleteQuery($request,$shouldBroadcast)")
doBroadcastQuery(request, new DeleteQueryAggregator, shouldBroadcast)
}
protected def doReadApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseResp = authenticateAndThen(request) { _ =>
info(s"doReadApprovedQueryTopics($request,$shouldBroadcast)")
//TODO: XXX: HACK: Would like to remove the cast
authorizationService.readApprovedEntries(request) match {
case Left(errorResponse) => errorResponse.asInstanceOf[BaseResp]
case Right(validResponse) => validResponse.asInstanceOf[BaseResp]
}
}
import broadcastAndAggregationService.sendAndAggregate
protected def doBroadcastQuery(request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean): BaseResp = {
authenticateAndThen(request) { authResult =>
debug(s"doBroadcastQuery($request) authResult is $authResult")
//NB: Use credentials obtained from Authenticator (oddly, we authenticate with one set of credentials and are "logged in" under (possibly!) another
//When making BroadcastMessages
val networkAuthn = AuthenticationInfo(authResult.domain, authResult.username, Credential("", isToken = false))
//NB: Only audit RunQueryRequests
request match {
case runQueryRequest: RunQueryRequest =>
// inject modified, authorized runQueryRequest
//although it might make more sense to put this whole if block in the aggregator, the RunQueryAggregator lives in the hub, far from this DB code
auditAuthorizeAndThen(runQueryRequest) { authorizedRequest =>
debug(s"doBroadcastQuery authorizedRequest is $authorizedRequest")
// tuck the ACT audit metrics data into a database here
if (collectQepAudit) QepAuditDb.db.insertQepQuery(authorizedRequest,commonName)
QepQueryDb.db.insertQepQuery(authorizedRequest)
val response: BaseResp = doSynchronousQuery(networkAuthn,authorizedRequest,aggregator,shouldBroadcast)
response match {
//todo do in one transaction
case aggregated:AggregatedRunQueryResponse => aggregated.results.foreach(QepQueryDb.db.insertQueryResult(runQueryRequest.networkQueryId,_))
case _ => debug(s"Unanticipated response type $response")
}
response
}
case _ => doSynchronousQuery(networkAuthn,request,aggregator,shouldBroadcast)
}
}
}
private def doSynchronousQuery(networkAuthn: AuthenticationInfo,request: BaseShrineRequest, aggregator: Aggregator, shouldBroadcast: Boolean) = {
info(s"doSynchronousQuery($request) started")
val response = waitFor(sendAndAggregate(networkAuthn, request, aggregator, shouldBroadcast)).asInstanceOf[BaseResp]
info(s"doSynchronousQuery($request) completed with response $response")
response
}
private[qep] val runQueryAggregatorFor: RunQueryRequest => RunQueryAggregator = Aggregators.forRunQueryRequest(includeAggregateResult)
protected def waitFor[R](futureResponse: Future[R]): R = {
XmlDateHelper.time("Waiting for aggregated results")(debug(_)) {
Await.result(futureResponse, queryTimeout)
}
}
private[qep] def auditAuthorizeAndThen[T](request: RunQueryRequest)(body: (RunQueryRequest => T)): T = {
auditTransactionally(request) {
debug(s"auditAuthorizeAndThen($request) with $authorizationService")
val authorizedRequest = authorizationService.authorizeRunQueryRequest(request) match {
case na: NotAuthorized => throw na.toException
case authorized: Authorized => request.copy(topicName = authorized.topicIdAndName.map(x => x._2))
}
body(authorizedRequest)
}
}
private[qep] def auditTransactionally[T](request: RunQueryRequest)(body: => T): T = {
try { body } finally {
auditDao.addAuditEntry(
request.projectId,
request.authn.domain,
request.authn.username,
request.queryDefinition.toI2b2String, //TODO: Use i2b2 format Still?
request.topicId)
}
}
import AuthenticationResult._
private[qep] def authenticateAndThen[T](request: BaseShrineRequest)(f: Authenticated => T): T = {
val AuthenticationInfo(domain, username, _) = request.authn
val authResult = authenticator.authenticate(request.authn)
authResult match {
case a: Authenticated => f(a)
case NotAuthenticated(_, _, reason) => throw new NotAuthenticatedException(s"User $domain:$username could not be authenticated: $reason")
}
}
}
\ No newline at end of file
diff --git a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
index 669cd742d..592a24be1 100644
--- a/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
+++ b/qep/service/src/main/scala/net/shrine/qep/queries/QepQueryDb.scala
@@ -1,493 +1,525 @@
package net.shrine.qep.queries
import java.sql.SQLException
import javax.sql.DataSource
import com.typesafe.config.Config
import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName}
import net.shrine.log.Loggable
import net.shrine.problem.ProblemDigest
-import net.shrine.protocol.{I2b2ResultEnvelope, QueryResult, ResultOutputType, DefaultBreakdownResultOutputTypes, UnFlagQueryRequest, FlagQueryRequest, QueryMaster, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RunQueryRequest}
+import net.shrine.protocol.{DeleteQueryRequest, RenameQueryRequest, I2b2ResultEnvelope, QueryResult, ResultOutputType, DefaultBreakdownResultOutputTypes, UnFlagQueryRequest, FlagQueryRequest, QueryMaster, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RunQueryRequest}
import net.shrine.qep.QepConfigSource
import net.shrine.slick.TestableDataSourceCreator
import net.shrine.util.XmlDateHelper
import slick.driver.JdbcProfile
import scala.collection.immutable.Iterable
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, Future, blocking}
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global
import scala.xml.XML
/**
* DB code for the QEP's query instances and query results.
*
* @author david
* @since 1/19/16
*/
case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource) extends Loggable {
import schemaDef._
import jdbcProfile.api._
val database = Database.forDataSource(dataSource)
def createTables() = schemaDef.createTables(database)
def dropTables() = schemaDef.dropTables(database)
def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = {
val future: Future[R] = database.run(action)
blocking {
Await.result(future, 10 seconds)
}
}
def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = {
debug(s"insertQepQuery $runQueryRequest")
insertQepQuery(QepQuery(runQueryRequest))
}
def insertQepQuery(qepQuery: QepQuery):Unit = {
dbRun(allQepQueryQuery += qepQuery)
}
def selectAllQepQueries:Seq[QepQuery] = {
- dbRun(allQepQueryQuery.result)
+ dbRun(mostRecentVisibleQepQueries.result)
}
//todo order
def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = {
val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(request.authn.username,request.authn.domain)
val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set])
val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId)))
ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2)))
}
//todo order
def selectPreviousQueriesByUserAndDomain(userName: UserName,domain: String):Seq[QepQuery] = {
- dbRun(allQepQueryQuery.filter(_.userName === userName).filter(_.userDomain === domain).result)
+ dbRun(mostRecentVisibleQepQueries.filter(_.userName === userName).filter(_.userDomain === domain).result)
+ }
+
+ def renamePreviousQuery(request:RenameQueryRequest):Unit = {
+
+ val networkQueryId = request.networkQueryId
+ dbRun(
+ for {
+ queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
+ _ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName))
+ } yield queryResults
+ )
+ }
+
+ def deletePreviousQuery(request:DeleteQueryRequest):Unit = {
+
+ val networkQueryId = request.networkQueryId
+ dbRun(
+ for {
+ queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
+ _ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true))
+ } yield queryResults
+ )
}
def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(flagQueryRequest))
}
def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(unflagQueryRequest))
}
def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = {
dbRun(allQepQueryFlags += qepQueryFlag)
}
def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = {
val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result)
flags.map(x => x.networkQueryId -> x).toMap
}
def insertQepResultRow(qepQueryRow:QueryResultRow) = {
dbRun(allQueryResultRows += qepQueryRow)
}
def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = {
val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node"))
val queryResultRow = QueryResultRow(networkQueryId,result)
val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_))
val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq]
dbRun(
for {
_ <- allQueryResultRows += queryResultRow
_ <- allBreakdownResultsRows ++= breakdowns
_ <- allProblemDigestRows ++= problem
} yield ()
)
}
def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = {
dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result)
}
def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = {
val (queryResults, breakdowns,problems) = dbRun(
for {
queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result
breakdowns <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result
problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result
} yield (queryResults, breakdowns, problems)
)
val resultIdsToI2b2ResultEnvelopes: Map[Long, Map[ResultOutputType, I2b2ResultEnvelope]] = breakdowns.groupBy(_.resultId).map(rIdToB => rIdToB._1 -> QepQueryBreakdownResultsRow.resultEnvelopesFrom(rIdToB._2))
def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = {
if(problemSeq.size == 1) problemSeq.head.toProblemDigest
else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq")
}
val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) )
queryResults.map(r => r.toQueryResult(
resultIdsToI2b2ResultEnvelopes.getOrElse(r.resultId,Map.empty),
adapterNodesToProblemDigests.get(r.adapterNode)
))
}
def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = {
dbRun(allBreakdownResultsRows += breakdownResultsRow)
}
def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = {
dbRun(allBreakdownResultsRows.result)
}
}
object QepQueryDb extends Loggable {
val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config)
val db = QepQueryDb(QepQuerySchema.schema,dataSource)
val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart")
if(createTablesOnStart) QepQueryDb.db.createTables()
}
/**
* Separate class to support schema generation without actually connecting to the database.
*
* @param jdbcProfile Database profile to use for the schema
*/
case class QepQuerySchema(jdbcProfile: JdbcProfile) extends Loggable {
import jdbcProfile.api._
def ddlForAllTables: jdbcProfile.DDL = {
allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema
}
//to get the schema, use the REPL
//println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n"))
def createTables(database:Database) = {
try {
val future = database.run(ddlForAllTables.create)
Await.result(future,10 seconds)
} catch {
//I'd prefer to check and create schema only if absent. No way to do that with Oracle.
case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x)
}
}
def dropTables(database:Database) = {
val future = database.run(ddlForAllTables.drop)
//Really wait forever for the cleanup
Await.result(future,Duration.Inf)
}
class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") {
def networkId = column[NetworkQueryId]("networkId")
def userName = column[UserName]("userName")
def userDomain = column[String]("domain")
def queryName = column[QueryName]("queryName")
def expression = column[String]("expression")
def dateCreated = column[Time]("dateCreated")
+ def deleted = column[Boolean]("deleted")
def queryXml = column[String]("queryXml")
+ def changeDate = column[Long]("changeDate")
- def * = (networkId,userName,userDomain,queryName,expression,dateCreated,queryXml) <> (QepQuery.tupled,QepQuery.unapply)
+ def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply)
}
val allQepQueryQuery = TableQuery[QepQueries]
+ val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for(
+ queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists
+ ) yield queries
+ val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false)
class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") {
def networkId = column[NetworkQueryId]("networkId")
def flagged = column[Boolean]("flagged")
def flagMessage = column[String]("flagMessage")
def changeDate = column[Long]("changeDate")
def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply)
}
val allQepQueryFlags = TableQuery[QepQueryFlags]
val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for(
queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists
) yield queryFlags
//todo there may be other custom breakdowns in the config. Use that as the source
val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values
val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap
val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap)
implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({
(resultType: ResultOutputType) => queryResultTypesToString(resultType)
},{
(string: String) => stringsToQueryResultTypes(string)
})
implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({
statusType => statusType.name
},{
name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}"))
})
class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") {
def resultId = column[Long]("resultId")
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def instanceId = column[Long]("instanceId")
def adapterNode = column[String]("adapterNode")
def resultType = column[ResultOutputType]("resultType")
def size = column[Long]("size")
def startDate = column[Option[Long]]("startDate")
def endDate = column[Option[Long]]("endDate")
def status = column[QueryResult.StatusType]("status")
def statusMessage = column[Option[String]]("statusMessage")
def changeDate = column[Long]("changeDate")
def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply)
}
val allQueryResultRows = TableQuery[QepQueryResults]
//Most recent query result rows for each queryId from each adapter
val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for(
queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists
) yield queryResultRows
class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def resultId = column[Long]("resultId")
def resultType = column[ResultOutputType]("resultType")
def dataKey = column[String]("dataKey")
def value = column[Long]("value")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply)
}
val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults]
//Most recent query result rows for each queryId from each adapter
val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for(
breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists
) yield breakdownResultsRows
/*
case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller {
*/
class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def codec = column[String]("codec")
def stamp = column[String]("stamp")
def summary = column[String]("summary")
def description = column[String]("description")
def details = column[String]("details")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply)
}
val allProblemDigestRows = TableQuery[QepResultProblemDigests]
val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for(
problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists
) yield problemDigests
}
object QepQuerySchema {
val allConfig:Config = QepConfigSource.config
val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database")
val slickProfileClassName = config.getString("slickProfileClassName")
val slickProfile:JdbcProfile = QepConfigSource.objectForName(slickProfileClassName)
val schema = QepQuerySchema(slickProfile)
}
case class QepQuery(
networkId:NetworkQueryId,
userName: UserName,
userDomain: String,
queryName: QueryName,
expression: String,
dateCreated: Time,
- queryXml:String
+ deleted: Boolean,
+ queryXml: String,
+ changeDate: Time
){
def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = {
QueryMaster(
queryMasterId = networkId.toString,
networkQueryId = networkId,
name = queryName,
userId = userName,
groupId = userDomain,
createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated),
held = None, //todo if a query is held at the adapter, how will we know? do we care? Question out to Bill and leadership
flagged = qepQueryFlag.map(_.flagged),
flagMessage = qepQueryFlag.map(_.flagMessage)
)
}
}
-object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,String,Time,String) => QepQuery) {
+object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,String,Time,Boolean,String,Time) => QepQuery) {
def apply(runQueryRequest: RunQueryRequest):QepQuery = {
new QepQuery(
networkId = runQueryRequest.networkQueryId,
userName = runQueryRequest.authn.username,
userDomain = runQueryRequest.authn.domain,
queryName = runQueryRequest.queryDefinition.name,
expression = runQueryRequest.queryDefinition.expr.getOrElse("No Expression").toString,
dateCreated = System.currentTimeMillis(),
- queryXml = runQueryRequest.toXmlString
+ deleted = false,
+ queryXml = runQueryRequest.toXmlString,
+ changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryFlag(
networkQueryId: NetworkQueryId,
flagged:Boolean,
flagMessage:String,
changeDate:Long
)
object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) {
def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = flagQueryRequest.networkQueryId,
flagged = true,
flagMessage = flagQueryRequest.message.getOrElse(""),
changeDate = System.currentTimeMillis()
)
}
def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = unflagQueryRequest.networkQueryId,
flagged = false,
flagMessage = "",
changeDate = System.currentTimeMillis()
)
}
}
/*
//todo problemDigest in a separate table
problemDigest: Option[ProblemDigest] = None,
//todo breakdowns in a separate table
breakdowns: Map[ResultOutputType,I2b2ResultEnvelope] = Map.empty
*/
case class QueryResultRow(
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:ResultOutputType,
size:Long,
startDate:Option[Long],
endDate:Option[Long],
status:QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long
) {
def toQueryResult(breakdowns:Map[ResultOutputType,I2b2ResultEnvelope],problemDigest:Option[ProblemDigest]) = QueryResult(
resultId = resultId,
instanceId = instanceId,
resultType = Some(resultType),
setSize = size,
startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar),
endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar),
description = Some(adapterNode),
statusType = status,
statusMessage = statusMessage,
breakdowns = breakdowns,
problemDigest = problemDigest
)
}
object QueryResultRow extends ((Long,NetworkQueryId,Long,String,ResultOutputType,Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow)
{
def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = {
new QueryResultRow(
resultId = result.resultId,
networkQueryId = networkQueryId,
instanceId = result.instanceId,
adapterNode = result.description.getOrElse(s"$result has None in its description field, not a name of an adapter node."),
resultType = result.resultType.getOrElse(ResultOutputType.PATIENT_COUNT_XML), //todo how is this optional??
size = result.setSize,
startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis),
endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis),
status = result.statusType,
statusMessage = result.statusMessage,
changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryBreakdownResultsRow(
networkQueryId: NetworkQueryId,
adapterNode:String,
resultId:Long,
resultType: ResultOutputType,
dataKey:String,
value:Long,
changeDate:Long
)
object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){
def breakdownRowsFor(networkQueryId:NetworkQueryId,
adapterNode:String,
resultId:Long,
breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = {
breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis()))
}
def resultEnvelopesFrom(breakdowns:Seq[QepQueryBreakdownResultsRow]): Map[ResultOutputType, I2b2ResultEnvelope] = {
def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = {
val data = breakdowns.map(b => b.dataKey -> b.value).toMap
I2b2ResultEnvelope(resultType,data)
}
breakdowns.groupBy(_.resultType).map(r => r._1 -> resultEnvelopeFrom(r._1,r._2))
}
}
case class QepProblemDigestRow(
networkQueryId: NetworkQueryId,
adapterNode: String,
codec: String,
stampText: String,
summary: String,
description: String,
details: String,
changeDate:Long
){
def toProblemDigest = {
ProblemDigest(
codec,
stampText,
summary,
description,
if(!details.isEmpty) XML.loadString(details)
else
)
}
}
diff --git a/qep/service/src/main/sql/mysql.ddl b/qep/service/src/main/sql/mysql.ddl
index 0fe5952e3..63d4a1a29 100644
--- a/qep/service/src/main/sql/mysql.ddl
+++ b/qep/service/src/main/sql/mysql.ddl
@@ -1,6 +1,6 @@
create table `queriesSent` (`shrineNodeId` TEXT NOT NULL,`userName` TEXT NOT NULL,`networkQueryId` BIGINT NOT NULL,`queryName` TEXT NOT NULL,`queryTopicId` TEXT,`queryTopicName` TEXT,`timeQuerySent` BIGINT NOT NULL);
-create table `previousQueries` (`networkId` BIGINT NOT NULL,`userName` TEXT NOT NULL,`domain` TEXT NOT NULL,`queryName` TEXT NOT NULL,`expression` TEXT NOT NULL,`dateCreated` BIGINT NOT NULL,`queryXml` TEXT NOT NULL);
+create table `previousQueries` (`networkId` BIGINT NOT NULL,`userName` TEXT NOT NULL,`domain` TEXT NOT NULL,`queryName` TEXT NOT NULL,`expression` TEXT NOT NULL,`dateCreated` BIGINT NOT NULL,`deleted` BOOLEAN NOT NULL,`queryXml` TEXT NOT NULL,`changeDate` BIGINT NOT NULL);
create table `queryFlags` (`networkId` BIGINT NOT NULL,`flagged` BOOLEAN NOT NULL,`flagMessage` TEXT NOT NULL,`changeDate` BIGINT NOT NULL);
create table `queryResults` (`resultId` BIGINT NOT NULL,`networkQueryId` BIGINT NOT NULL,`instanceId` BIGINT NOT NULL,`adapterNode` TEXT NOT NULL,`resultType` TEXT NOT NULL,`size` BIGINT NOT NULL,`startDate` BIGINT,`endDate` BIGINT,`status` TEXT NOT NULL,`statusMessage` TEXT,`changeDate` BIGINT NOT NULL);
create table `queryBreakdownResults` (`networkQueryId` BIGINT NOT NULL,`adapterNode` TEXT NOT NULL,`resultId` BIGINT NOT NULL,`resultType` TEXT NOT NULL,`dataKey` TEXT NOT NULL,`value` BIGINT NOT NULL,`changeDate` BIGINT NOT NULL);
create table `queryResultProblemDigests` (`networkQueryId` BIGINT NOT NULL,`adapterNode` TEXT NOT NULL,`codec` TEXT NOT NULL,`stamp` TEXT NOT NULL,`summary` TEXT NOT NULL,`description` TEXT NOT NULL,`details` TEXT NOT NULL,`changeDate` BIGINT NOT NULL);
\ No newline at end of file
diff --git a/qep/service/src/test/scala/net/shrine/qep/ShrineResourceJaxrsTest.scala b/qep/service/src/test/scala/net/shrine/qep/ShrineResourceJaxrsTest.scala
index 34bb77c29..fe69f94f1 100644
--- a/qep/service/src/test/scala/net/shrine/qep/ShrineResourceJaxrsTest.scala
+++ b/qep/service/src/test/scala/net/shrine/qep/ShrineResourceJaxrsTest.scala
@@ -1,535 +1,535 @@
package net.shrine.qep
import com.sun.jersey.api.client.UniformInterfaceException
import net.shrine.client.JerseyShrineClient
import net.shrine.crypto.TrustParam.AcceptAllCerts
import net.shrine.protocol.{AggregatedReadInstanceResultsResponse, AggregatedReadQueryResultResponse, AggregatedReadTranslatedQueryDefinitionResponse, AggregatedRunQueryResponse, ApprovedTopic, AuthenticationInfo, BaseShrineResponse, Credential, DefaultBreakdownResultOutputTypes, DeleteQueryRequest, DeleteQueryResponse, FlagQueryRequest, FlagQueryResponse, QueryResult, ReadApprovedQueryTopicsRequest, ReadApprovedQueryTopicsResponse, ReadInstanceResultsRequest, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, ReadQueryDefinitionRequest, ReadQueryDefinitionResponse, ReadQueryInstancesRequest, ReadQueryInstancesResponse, ReadQueryResultRequest, ReadTranslatedQueryDefinitionRequest, RenameQueryRequest, RenameQueryResponse, RequestType, ResultOutputType, RunQueryRequest, ShrineRequest, ShrineRequestHandler, UnFlagQueryRequest, UnFlagQueryResponse}
import net.shrine.protocol.query.{QueryDefinition, Term}
import net.shrine.util.{AbstractPortSearchingJerseyTest, JerseyAppDescriptor, ShouldMatchersForJUnit, XmlDateHelper}
import org.junit.{After, Before, Test}
/**
*
* @author Clint Gilbert
* @since Sep 14, 2011
*
* @see http://cbmi.med.harvard.edu
*
* This software is licensed under the LGPL
* @see http://www.gnu.org/licenses/lgpl.html
*
* Starts a ShrineResource in an embedded HTTP server, sends requests to it, then verifies that the requests don't fail,
* and that the parameters made it from the client to the ShrineResource successfully. Uses a mock ShrineRequestHandler, so
* it doesn't test that correct values are returned by the ShrineResource.
*/
final class ShrineResourceJaxrsTest extends AbstractPortSearchingJerseyTest with ShouldMatchersForJUnit {
private val projectId = "some-project-id"
private val topicId = "some-topic-id"
private val userId = "some-user-id"
private val authenticationInfo = AuthenticationInfo("some-domain", userId, new Credential("some-val", false))
private val shrineClient = new JerseyShrineClient(resource.getURI.toString, projectId, authenticationInfo, DefaultBreakdownResultOutputTypes.toSet, AcceptAllCerts)
/**
* We invoked the no-arg superclass constructor, so we must override configure() to provide an AppDescriptor
* That tells Jersey to instantiate and expose ShrineResource
*/
override def configure = JerseyAppDescriptor.thatCreates(ShrineResource).using(MockShrineRequestHandler)
@Before
override def setUp(): Unit = super.setUp()
@After
override def tearDown(): Unit = super.tearDown()
@Test
def testReadApprovedQueryTopics {
val response = shrineClient.readApprovedQueryTopics(userId)
response should not(be(null))
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.readQueryDefinitionParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
val param = MockShrineRequestHandler.readApprovedQueryTopicsParam
validateCachedParam(param, RequestType.SheriffRequest)
param.userId should equal(userId)
}
@Test
def testReadPreviousQueries = resetMockThen {
val fetchSize = 123
val response = shrineClient.readPreviousQueries(userId, fetchSize)
response should not(be(null))
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.readQueryDefinitionParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
val param = MockShrineRequestHandler.readPreviousQueriesParam
validateCachedParam(param, RequestType.UserRequest)
param.fetchSize should equal(fetchSize)
param.userId should equal(userId)
}
@Test
def testReadPreviousQueriesUsernameMismatch = resetMockThen {
intercept[UniformInterfaceException] {
shrineClient.readPreviousQueries("foo", 123)
}
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.readQueryDefinitionParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
}
@Test
def testRunQuery = resetMockThen {
val queryDef = QueryDefinition("foo", Term("nuh"))
def doTestRunQueryResponse(response: AggregatedRunQueryResponse, expectedOutputTypes: Set[ResultOutputType]) {
response should not(be(null))
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.readQueryDefinitionParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
val param = MockShrineRequestHandler.runQueryParam
validateCachedParam(param, RequestType.QueryDefinitionRequest)
param.outputTypes should equal(expectedOutputTypes)
param.queryDefinition should equal(queryDef)
param.topicId should equal(Some(topicId))
}
def doTestRunQuery(outputTypes: Set[ResultOutputType]) {
val responseScalaSet = shrineClient.runQuery(topicId, outputTypes, queryDef)
doTestRunQueryResponse(responseScalaSet, outputTypes)
val responseJavaSet = shrineClient.runQuery(topicId, outputTypes, queryDef)
doTestRunQueryResponse(responseJavaSet, outputTypes)
}
Seq(ResultOutputType.values.toSet,
Set(ResultOutputType.PATIENT_COUNT_XML),
Set(ResultOutputType.PATIENTSET),
Set.empty[ResultOutputType]).foreach(doTestRunQuery)
}
@Test
def testReadQueryInstances = resetMockThen {
val queryId = 123L
val response = shrineClient.readQueryInstances(queryId)
response should not(be(null))
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.readQueryDefinitionParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
val param = MockShrineRequestHandler.readQueryInstancesParam
validateCachedParam(param, RequestType.MasterRequest)
param.networkQueryId should equal(queryId)
}
@Test
def testReadInstanceResults = resetMockThen {
val shrineNetworkQueryId = 98765L
val response = shrineClient.readInstanceResults(shrineNetworkQueryId)
response should not(be(null))
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readQueryDefinitionParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
val param = MockShrineRequestHandler.readInstanceResultsParam
validateCachedParam(param, RequestType.InstanceRequest)
param.shrineNetworkQueryId should equal(shrineNetworkQueryId)
}
@Test
def testReadQueryDefinition = resetMockThen {
val queryId = 3789894L
val response = shrineClient.readQueryDefinition(queryId)
response should not(be(null))
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
val param = MockShrineRequestHandler.readQueryDefinitionParam
validateCachedParam(param, RequestType.GetRequestXml)
param.queryId should equal(queryId)
}
@Test
def testDeleteQuery = resetMockThen {
val queryId = 3789894L
val response = shrineClient.deleteQuery(queryId)
response should not(be(null))
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
val param = MockShrineRequestHandler.deleteQueryParam
validateCachedParam(param, RequestType.MasterDeleteRequest)
- param.queryId should equal(queryId)
+ param.networkQueryId should equal(queryId)
}
@Test
def testRenameQuery = resetMockThen {
val queryId = 3789894L
val queryName = "aslkfhkasfh"
val response = shrineClient.renameQuery(queryId, queryName)
response should not(be(null))
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
val param = MockShrineRequestHandler.renameQueryParam
validateCachedParam(param, RequestType.MasterRenameRequest)
param.networkQueryId should equal(queryId)
param.queryName should equal(queryName)
}
@Test
def testReadQueryResult = resetMockThen {
val queryId = 3789894L
val response = shrineClient.readQueryResult(queryId)
response should not(be(null))
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
val param = MockShrineRequestHandler.readQueryResultParam
MockShrineRequestHandler.shouldBroadcastParam should be(true)
param should not(be(null))
param.projectId should equal(projectId)
param.authn should equal(authenticationInfo)
param.requestType should equal(RequestType.GetQueryResult)
param.waitTime should equal(ShrineResource.waitTime)
param.queryId should equal(queryId)
}
@Test
def testReadTranslatedQueryDefinition = resetMockThen {
val queryDef = QueryDefinition("foo", Term("network"))
val response = shrineClient.readTranslatedQueryDefinition(queryDef)
response should not(be(null))
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
val param = MockShrineRequestHandler.readTranslatedQueryDefinitionParam
MockShrineRequestHandler.shouldBroadcastParam should be(true)
param should not(be(null))
param.authn should equal(authenticationInfo)
param.requestType should equal(RequestType.ReadTranslatedQueryDefinitionRequest)
param.queryDef should equal(queryDef)
}
@Test
def testFlagQuery: Unit = resetMockThen {
val queryId = 12345L
val message = "laskfhdklsjfhksdf"
val response = shrineClient.flagQuery(queryId, Some(message), true)
response should equal(FlagQueryResponse)
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
MockShrineRequestHandler.readTranslatedQueryDefinitionParam should be(null)
val param = MockShrineRequestHandler.flagQueryRequestParam
MockShrineRequestHandler.shouldBroadcastParam should be(true)
param should not(be(null))
param.authn should equal(authenticationInfo)
param.requestType should equal(RequestType.FlagQueryRequest)
param.networkQueryId should equal(queryId)
param.message should equal(Some(message))
}
@Test
def testUnFlagQuery: Unit = resetMockThen {
val queryId = 12345L
val response = shrineClient.unFlagQuery(queryId, true)
response should equal(UnFlagQueryResponse)
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
MockShrineRequestHandler.readTranslatedQueryDefinitionParam should be(null)
MockShrineRequestHandler.flagQueryRequestParam should be(null)
val param = MockShrineRequestHandler.unFlagQueryRequestParam
MockShrineRequestHandler.shouldBroadcastParam should be(true)
param should not(be(null))
param.authn should equal(authenticationInfo)
param.requestType should equal(RequestType.UnFlagQueryRequest)
param.networkQueryId should equal(queryId)
}
private def validateCachedParam(param: ShrineRequest, expectedRequestType: RequestType) {
MockShrineRequestHandler.shouldBroadcastParam should be(true)
param should not(be(null))
param.projectId should equal(projectId)
param.authn should equal(authenticationInfo)
param.requestType should equal(expectedRequestType)
param.waitTime should equal(ShrineResource.waitTime)
}
private def resetMockThen(body: => Any) {
MockShrineRequestHandler.reset()
//(Healthy?) paranoia
MockShrineRequestHandler.readApprovedQueryTopicsParam should be(null)
MockShrineRequestHandler.readPreviousQueriesParam should be(null)
MockShrineRequestHandler.runQueryParam should be(null)
MockShrineRequestHandler.readQueryInstancesParam should be(null)
MockShrineRequestHandler.readInstanceResultsParam should be(null)
MockShrineRequestHandler.readQueryDefinitionParam should be(null)
MockShrineRequestHandler.deleteQueryParam should be(null)
MockShrineRequestHandler.renameQueryParam should be(null)
MockShrineRequestHandler.readQueryResultParam should be(null)
MockShrineRequestHandler.flagQueryRequestParam should be(null)
MockShrineRequestHandler.unFlagQueryRequestParam should be(null)
MockShrineRequestHandler.readTranslatedQueryDefinitionParam should be(null)
body
}
/**
* Mock ShrineRequestHandler; stores passed parameters for later inspection.
* Private, since this is (basically) the enclosing test class's state
*/
private object MockShrineRequestHandler extends ShrineRequestHandler {
var shouldBroadcastParam = false
var readApprovedQueryTopicsParam: ReadApprovedQueryTopicsRequest = _
var readPreviousQueriesParam: ReadPreviousQueriesRequest = _
var runQueryParam: RunQueryRequest = _
var readQueryInstancesParam: ReadQueryInstancesRequest = _
var readInstanceResultsParam: ReadInstanceResultsRequest = _
var readQueryDefinitionParam: ReadQueryDefinitionRequest = _
var deleteQueryParam: DeleteQueryRequest = _
var renameQueryParam: RenameQueryRequest = _
var readQueryResultParam: ReadQueryResultRequest = _
var readTranslatedQueryDefinitionParam: ReadTranslatedQueryDefinitionRequest = _
var flagQueryRequestParam: FlagQueryRequest = _
var unFlagQueryRequestParam: UnFlagQueryRequest = _
def reset() {
shouldBroadcastParam = false
readApprovedQueryTopicsParam = null
readPreviousQueriesParam = null
runQueryParam = null
readQueryInstancesParam = null
readInstanceResultsParam = null
readQueryDefinitionParam = null
deleteQueryParam = null
renameQueryParam = null
readQueryResultParam = null
readTranslatedQueryDefinitionParam = null
flagQueryRequestParam = null
unFlagQueryRequestParam = null
}
import XmlDateHelper.now
private def setShouldBroadcastAndThen(shouldBroadcast: Boolean)(f: => BaseShrineResponse): BaseShrineResponse = {
try { f } finally {
shouldBroadcastParam = shouldBroadcast
}
}
override def readApprovedQueryTopics(request: ReadApprovedQueryTopicsRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
readApprovedQueryTopicsParam = request
ReadApprovedQueryTopicsResponse(Seq(new ApprovedTopic(123L, "some topic")))
}
override def readPreviousQueries(request: ReadPreviousQueriesRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
readPreviousQueriesParam = request
ReadPreviousQueriesResponse(Seq.empty)
}
override def readQueryInstances(request: ReadQueryInstancesRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
readQueryInstancesParam = request
ReadQueryInstancesResponse(999L, "userId", "groupId", Seq.empty)
}
override def readInstanceResults(request: ReadInstanceResultsRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
readInstanceResultsParam = request
AggregatedReadInstanceResultsResponse(1337L, Seq(new QueryResult(123L, 1337L, Some(ResultOutputType.PATIENT_COUNT_XML), 789L, None, None, Some("description"), QueryResult.StatusType.Finished, Some("statusMessage"))))
}
override def readQueryDefinition(request: ReadQueryDefinitionRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
readQueryDefinitionParam = request
ReadQueryDefinitionResponse(87456L, "name", "userId", now, "")
}
override def runQuery(request: RunQueryRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
runQueryParam = request
AggregatedRunQueryResponse(123L, now, "userId", "groupId", request.queryDefinition, 456L, Seq.empty)
}
override def deleteQuery(request: DeleteQueryRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
deleteQueryParam = request
DeleteQueryResponse(56834756L)
}
override def renameQuery(request: RenameQueryRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
renameQueryParam = request
RenameQueryResponse(873468L, "some-name")
}
override def readQueryResult(request: ReadQueryResultRequest, shouldBroadcast: Boolean): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
readQueryResultParam = request
AggregatedReadQueryResultResponse(1234567890L, Seq.empty)
}
override def readTranslatedQueryDefinition(request: ReadTranslatedQueryDefinitionRequest, shouldBroadcast: Boolean = true): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
readTranslatedQueryDefinitionParam = request
AggregatedReadTranslatedQueryDefinitionResponse(Seq.empty)
}
override def flagQuery(request: FlagQueryRequest, shouldBroadcast: Boolean = true): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
flagQueryRequestParam = request
FlagQueryResponse
}
override def unFlagQuery(request: UnFlagQueryRequest, shouldBroadcast: Boolean = true): BaseShrineResponse = setShouldBroadcastAndThen(shouldBroadcast) {
unFlagQueryRequestParam = request
UnFlagQueryResponse
}
}
}
\ No newline at end of file
diff --git a/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala b/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala
index a0e76c7db..a6807a091 100644
--- a/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala
+++ b/qep/service/src/test/scala/net/shrine/qep/queries/QepQueryDbTest.scala
@@ -1,233 +1,237 @@
package net.shrine.qep.queries
import net.shrine.protocol.QueryResult.StatusType
import net.shrine.protocol.{I2b2ResultEnvelope, DefaultBreakdownResultOutputTypes, QueryResult, ResultOutputType}
import net.shrine.util.{XmlDateHelper, ShouldMatchersForJUnit}
import org.junit.{After, Before, Test}
import net.shrine.problem.TestProblem
/**
* @author david
* @since 1/20/16
*/
class QepQueryDbTest extends ShouldMatchersForJUnit {
val qepQuery = QepQuery(
networkId = 1L,
userName = "ben",
userDomain = "testDomain",
queryName = "testQuery",
expression = "testExpression",
dateCreated = System.currentTimeMillis(),
- queryXml = "testXML"
+ deleted = false,
+ queryXml = "testXML",
+ changeDate = System.currentTimeMillis()
)
val secondQepQuery = QepQuery(
networkId = 2L,
userName = "dave",
userDomain = "testDomain",
queryName = "testQuery",
expression = "testExpression",
+ deleted = false,
dateCreated = System.currentTimeMillis(),
- queryXml = "testXML"
+ queryXml = "testXML",
+ changeDate = System.currentTimeMillis()
)
val flag = QepQueryFlag(
networkQueryId = 1L,
flagged = true,
flagMessage = "This query is flagged",
changeDate = System.currentTimeMillis()
)
@Test
def testInsertQepQuery() {
QepQueryDb.db.insertQepQuery(qepQuery)
QepQueryDb.db.insertQepQuery(secondQepQuery)
val results = QepQueryDb.db.selectAllQepQueries
results should equal(Seq(qepQuery,secondQepQuery))
}
@Test
def testSelectQepQueriesForUser() {
QepQueryDb.db.insertQepQuery(qepQuery)
QepQueryDb.db.insertQepQuery(secondQepQuery)
val results = QepQueryDb.db.selectPreviousQueriesByUserAndDomain("ben","testDomain")
results should equal(Seq(qepQuery))
}
@Test
def testSelectQueryFlags() {
val results1 = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(Set(1L,2L))
results1 should equal(Map.empty)
QepQueryDb.db.insertQepQueryFlag(flag)
val results2 = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(Set(1L,2L))
results2 should equal(Map(1L -> flag))
}
val qepResultRowFromExampleCom = QueryResultRow(
resultId = 10L,
networkQueryId = 1L,
instanceId = 100L,
adapterNode = "example.com",
resultType = ResultOutputType.PATIENT_COUNT_XML,
size = 30L,
startDate = Some(System.currentTimeMillis() - 60),
endDate = Some(System.currentTimeMillis() - 30),
status = QueryResult.StatusType.Finished,
statusMessage = None,
changeDate = System.currentTimeMillis()
)
@Test
def testInsertQueryResultRow() {
QepQueryDb.db.insertQepResultRow(qepResultRowFromExampleCom)
val results = QepQueryDb.db.selectMostRecentQepResultRowsFor(1L)
results should equal(Seq(qepResultRowFromExampleCom))
}
val queryResult = QueryResult(
resultId = 20L,
instanceId = 200L,
resultType = Some(ResultOutputType.PATIENT_COUNT_XML),
setSize = 2000L,
startDate = Some(XmlDateHelper.now),
endDate = Some(XmlDateHelper.now),
description = Some("example.com"),
statusType = StatusType.Finished,
statusMessage = None
)
@Test
def testInsertQueryResult(): Unit = {
QepQueryDb.db.insertQueryResult(2L,queryResult)
val results = QepQueryDb.db.selectMostRecentQepResultsFor(2L)
results should equal(Seq(queryResult))
}
val qepResultRowFromExampleComInThePast = QueryResultRow(
resultId = 8L,
networkQueryId = 1L,
instanceId = 100L,
adapterNode = "example.com",
resultType = ResultOutputType.PATIENT_COUNT_XML,
size = 0L,
startDate = qepResultRowFromExampleCom.startDate,
endDate = None,
status = QueryResult.StatusType.Processing,
statusMessage = None,
changeDate = qepResultRowFromExampleCom.changeDate - 40
)
val qepResultRowFromGeneralHospital = QueryResultRow(
resultId = 100L,
networkQueryId = 1L,
instanceId = 100L,
adapterNode = "generalhospital.org",
resultType = ResultOutputType.PATIENT_COUNT_XML,
size = 100L,
startDate = Some(System.currentTimeMillis() - 60),
endDate = Some(System.currentTimeMillis() - 30),
status = QueryResult.StatusType.Finished,
statusMessage = None,
changeDate = System.currentTimeMillis()
)
@Test
def testGetMostRecentResultRows() {
QepQueryDb.db.insertQepResultRow(qepResultRowFromExampleComInThePast)
QepQueryDb.db.insertQepResultRow(qepResultRowFromGeneralHospital)
QepQueryDb.db.insertQepResultRow(qepResultRowFromExampleCom)
val results = QepQueryDb.db.selectMostRecentQepResultRowsFor(1L)
results.to[Set] should equal(Set(qepResultRowFromExampleCom,qepResultRowFromGeneralHospital))
}
val maleRow = QepQueryBreakdownResultsRow(
networkQueryId = 1L,
adapterNode = "example.com",
resultId = 100L,
resultType = DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML,
dataKey = "male",
value = 388,
changeDate = System.currentTimeMillis()
)
val femaleRow = QepQueryBreakdownResultsRow(
networkQueryId = 1L,
adapterNode = "example.com",
resultId = 100L,
resultType = DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML,
dataKey = "female",
value = 390,
changeDate = System.currentTimeMillis()
)
val unknownRow = QepQueryBreakdownResultsRow(
networkQueryId = 1L,
adapterNode = "example.com",
resultId = 100L,
resultType = DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML,
dataKey = "unknown",
value = 4,
changeDate = System.currentTimeMillis()
)
@Test
def testInsertBreakdownRows(): Unit = {
QepQueryDb.db.insertQueryBreakdown(maleRow)
QepQueryDb.db.insertQueryBreakdown(femaleRow)
QepQueryDb.db.insertQueryBreakdown(unknownRow)
val results = QepQueryDb.db.selectAllBreakdownResultsRows
results.to[Set] should equal(Set(maleRow,femaleRow,unknownRow))
}
val breakdowns = Map(DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML -> I2b2ResultEnvelope(DefaultBreakdownResultOutputTypes.PATIENT_GENDER_COUNT_XML,Map("male" -> 3000,"female" -> 4000,"unknown" -> 234)))
@Test
def testInsertQueryResultWithBreakdowns(): Unit = {
val queryResultWithBreakdowns = queryResult.copy(breakdowns = breakdowns)
QepQueryDb.db.insertQueryResult(2L,queryResultWithBreakdowns)
val results = QepQueryDb.db.selectMostRecentQepResultsFor(2L)
results should equal(Seq(queryResultWithBreakdowns))
}
@Test
def testInsertQueryResultWithProblem(): Unit = {
val queryResultWithProblem = queryResult.copy(statusType = StatusType.Error,problemDigest = Some(TestProblem.toDigest))
QepQueryDb.db.insertQueryResult(2L,queryResultWithProblem)
val results = QepQueryDb.db.selectMostRecentQepResultsFor(2L)
results should equal(Seq(queryResultWithProblem))
}
@Before
def beforeEach() = {
QepQueryDb.db.createTables()
}
@After
def afterEach() = {
QepQueryDb.db.dropTables()
}
}