If you have questions about your query results or this SHRINE network, contact the Data Steward at your site.
diff --git a/apps/dashboard-app/src/main/scala/net/shrine/dashboard/DashboardService.scala b/apps/dashboard-app/src/main/scala/net/shrine/dashboard/DashboardService.scala
index f5bb3edb1..27cfe0eab 100644
--- a/apps/dashboard-app/src/main/scala/net/shrine/dashboard/DashboardService.scala
+++ b/apps/dashboard-app/src/main/scala/net/shrine/dashboard/DashboardService.scala
@@ -1,565 +1,574 @@
package net.shrine.dashboard
import akka.actor.Actor
import akka.event.Logging
import net.shrine.authentication.UserAuthenticator
import net.shrine.authorization.steward.OutboundUser
import net.shrine.config.ConfigExtensions
import net.shrine.crypto.{BouncyKeyStoreCollection, KeyStoreDescriptorParser, UtilHasher}
import net.shrine.dashboard.httpclient.HttpClientDirectives.{forwardUnmatchedPath, requestUriThenRoute}
import net.shrine.dashboard.jwtauth.ShrineJwtAuthenticator
import net.shrine.i2b2.protocol.pm.User
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources, Problems}
import net.shrine.serialization.NodeSeqSerializer
import net.shrine.source.ConfigSource
import net.shrine.spray._
import net.shrine.status.protocol.{Config => StatusProtocolConfig}
import net.shrine.util.{SingleHubModel, Versions}
import org.json4s.native.JsonMethods.{parse => json4sParse}
import org.json4s.{DefaultFormats, Formats}
import shapeless.HNil
import spray.http.{HttpRequest, HttpResponse, StatusCodes, Uri}
import spray.httpx.Json4sSupport
import spray.routing._
import spray.routing.directives.LogEntry
import scala.collection.immutable.Iterable
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}
/**
* Mixes the DashboardService trait with an Akka Actor to provide the actual service.
*/
class DashboardServiceActor extends Actor with DashboardService {
// the HttpService trait defines only one abstract member, which
// connects the services environment to the enclosing actor or test
def actorRefFactory = context
// this actor only runs our route, but you could add
// other things here, like request stream processing
// or timeout handling
def receive = runRoute(route)
}
/**
* A web service that provides the Dashboard endpoints. It is a trait to support testing independent of Akka.
*/
trait DashboardService extends HttpService with Loggable {
val userAuthenticator = UserAuthenticator(ConfigSource.config)
//don't need to do anything special for unauthorized users, but they do need access to a static form.
lazy val route:Route = gruntWatchCorsSupport {
redirectToIndex ~ staticResources ~ versionCheck ~ makeTrouble ~ authenticatedInBrowser ~ authenticatedDashboard ~ post {
// Chicken and egg problem; Can't check status of certs validation between sites if you need valid certs to exchange messages
pathPrefix("status")
pathPrefix("verifySignature")
verifySignature
}
}
/** logs the request method, uri and response at info level */
def logEntryForRequestResponse(req: HttpRequest): Any => Option[LogEntry] = {
case res: HttpResponse => Some(LogEntry(s"\n Request: $req\n Response: $res", Logging.InfoLevel))
case _ => None // other kind of responses
}
/** logs just the request method, uri and response status at info level */
def logEntryForRequest(req: HttpRequest): Any => Option[LogEntry] = {
case res: HttpResponse => Some(LogEntry(s"\n Request: $req\n Response status: ${res.status}", Logging.InfoLevel))
case _ => None // other kind of responses
}
lazy val versionCheck = pathPrefix("version"){
val response: AppVersion = AppVersion()
implicit val formats = response.json4sMarshaller
complete(response)
}
case class AppVersion(
currentVersion:String,
buildDate:String
) extends DefaultJsonSupport
object AppVersion {
def apply(): AppVersion = AppVersion(Versions.version, Versions.buildDate)
}
def authenticatedInBrowser: Route = pathPrefixTest("user"|"admin"|"toDashboard") {
logRequestResponse(logEntryForRequestResponse _) { //logging is controlled by Akka's config, slf4j, and log4j config
reportIfFailedToAuthenticate {
- authenticate(userAuthenticator.basicUserAuthenticator) { user =>
- pathPrefix("user") {
- userRoute(user)
- } ~
- pathPrefix("admin") {
- adminRoute(user)
+ authenticate(userAuthenticator.basicUserAuthenticator) { user =>
+ pathPrefix("user") {
+ userRoute(user)
} ~
- pathPrefix("toDashboard") {
- toDashboardRoute(user)
+ pathPrefix("admin") {
+ adminRoute(user)
+ } ~
+ pathPrefix("toDashboard") {
+ toDashboardRoute(user)
+ }
}
}
- }
}
}
val reportIfFailedToAuthenticate = routeRouteResponse {
case Rejected(List(AuthenticationFailedRejection(_,_))) =>
complete("AuthenticationFailed")
}
def authenticatedDashboard:Route = pathPrefix("fromDashboard") {
logRequestResponse(logEntryForRequestResponse _) { //logging is controlled by Akka's config, slf4j, and log4j config
get { //all remote dashboard calls are gets.
authenticate(ShrineJwtAuthenticator.authenticate) { user =>
info(s"Sucessfully authenticated user `$user`")
adminRoute(user)
}
}
}
}
def makeTrouble = pathPrefix("makeTrouble") {
complete(throw new IllegalStateException("fake trouble"))
}
lazy val redirectToIndex = pathEnd {
redirect("shrine-dashboard/client/index.html", StatusCodes.PermanentRedirect) //todo pick up "shrine-dashboard" programatically
} ~
( path("index.html") | pathSingleSlash) {
redirect("client/index.html", StatusCodes.PermanentRedirect)
}
lazy val staticResources = pathPrefix("client") {
pathEnd {
redirect("client/index.html", StatusCodes.PermanentRedirect)
} ~
pathSingleSlash {
redirect("index.html", StatusCodes.PermanentRedirect)
} ~ {
getFromResourceDirectory("client")
}
}
def userRoute(user:User):Route = get {
pathPrefix("whoami") {
complete(OutboundUser.createFromUser(user))
}
}
//todo check that this an admin.
def adminRoute(user:User):Route = get {
pathPrefix("happy") {
val happyBaseUrl: String = ConfigSource.config.getString("shrine.dashboard.happyBaseUrl")
-
- forwardUnmatchedPath(happyBaseUrl)
+ detach() {
+ forwardUnmatchedPath(happyBaseUrl)
+ }
} ~
pathPrefix("messWithHappyVersion") { //todo is this used?
val happyBaseUrl: String = ConfigSource.config.getString("shrine.dashboard.happyBaseUrl")
def pullClasspathFromConfig(httpResponse:HttpResponse,uri:Uri):Route = {
ctx => {
val result = httpResponse.entity.asString
ctx.complete(s"Got '$result' from $uri")
}
}
-
- requestUriThenRoute(happyBaseUrl+"/version",pullClasspathFromConfig)
+ detach() {
+ requestUriThenRoute(happyBaseUrl + "/version", pullClasspathFromConfig)
+ }
} ~
pathPrefix("ping") { complete("pong") }~
pathPrefix("status") { statusRoute(user) }
}
//Manually test this by running a curl command
//curl -k -w "\n%{response_code}\n" -u dave:kablam "https://shrine-dev1.catalyst:6443/shrine-dashboard/toDashboard/shrine-dev2.catalyst/ping"
/**
* Forward a request from this dashboard to a remote dashboard
*/
def toDashboardRoute(user:User):Route = get {
pathPrefix(Segment) { dnsName =>
import scala.collection.JavaConversions._
// Check that it makes sense to call toDashboard
KeyStoreInfo.keyStoreDescriptor.trustModel match {
case SingleHubModel(false) =>
warn("toDashboard route called on a non-hub node, returning Forbidden")
complete(StatusCodes.Forbidden)
case _ =>
ConfigSource.config.getObject("shrine.hub.downstreamNodes")
.values
.map(cv => Try(new java.net.URL(cv.unwrapped().toString)) match {
case Failure(exception) =>
MalformedURLProblem(exception, cv.unwrapped().toString)
throw exception
case Success(goodUrl) => goodUrl
})
.find(_.getHost == dnsName) match {
case None =>
warn(s"Could not find a downstream node matching the requested host `$dnsName`, returning NotFound")
complete(StatusCodes.NotFound)
case Some(downstreamUrl) =>
val remoteDashboardPathPrefix = downstreamUrl.getPath
.replaceFirst("shrine/rest/adapter/requests", "shrine-dashboard/fromDashboard") // I don't think this needs to be configurable
val port = if (downstreamUrl.getPort == -1)
downstreamUrl.getDefaultPort
else
downstreamUrl.getPort
val baseUrl = s"${downstreamUrl.getProtocol}://$dnsName:$port$remoteDashboardPathPrefix"
info(s"toDashboardRoute: BaseURL: $baseUrl")
- forwardUnmatchedPath(baseUrl,Some(ShrineJwtAuthenticator.createOAuthCredentials(user, dnsName)))
+ detach() {
+ forwardUnmatchedPath(baseUrl, Some(ShrineJwtAuthenticator.createOAuthCredentials(user, dnsName)))
+ }
}
}
}
}
case class MalformedURLProblem(malformattedURLException: Throwable, malformattedURL: String) extends AbstractProblem(ProblemSources.Dashboard) {
override val throwable = Some(malformattedURLException)
override def summary: String = s"Encountered a malformatted url `$malformattedURL` while parsing urls from downstream nodes"
override def description: String = description
}
def statusRoute(user:User):Route = get {
val( adapter , hub , i2b2 , keystore , optionalParts , qep , summary ) =
("adapter", "hub", "i2b2", "keystore", "optionalParts", "qep", "summary")
pathPrefix("classpath") { getClasspath }~
pathPrefix("config") { getConfig }~
pathPrefix("problems") { getProblems }~
pathPrefix(adapter) { getFromSubService(adapter) }~
pathPrefix(hub) { getFromSubService(hub) }~
pathPrefix(i2b2) { getFromSubService(i2b2) }~
pathPrefix(keystore) { getFromSubService(keystore) }~
pathPrefix(optionalParts) { getFromSubService(optionalParts) }~
pathPrefix(qep) { getFromSubService(qep) }~
pathPrefix(summary) { getFromSubService(summary) }
}
val statusBaseUrl = ConfigSource.config.getString("shrine.dashboard.statusBaseUrl")
// TODO: Move this over to Status API?
lazy val verifySignature:Route = {
-
- formField("sha256".as[String].?) { sha256: Option[String] =>
- val response = sha256.map(s => KeyStoreInfo.hasher.handleSig(s))
- implicit val format = ShaResponse.json4sFormats
- response match {
- case None => complete(StatusCodes.BadRequest)
- case Some(sh@ShaResponse(ShaResponse.badFormat, _)) => complete(StatusCodes.BadRequest -> sh)
- case Some(sh@ShaResponse(_, false)) => complete(StatusCodes.NotFound -> sh)
- case Some(sh@ShaResponse(_, true)) => complete(StatusCodes.OK -> sh)
+ formField("sha256".as[String].?) { sha256: Option[String] =>
+ val response = sha256.map(s => KeyStoreInfo.hasher.handleSig(s))
+ implicit val format = ShaResponse.json4sFormats
+ response match {
+ case None => complete(StatusCodes.BadRequest)
+ case Some(sh@ShaResponse(ShaResponse.badFormat, _)) => complete(StatusCodes.BadRequest -> sh)
+ case Some(sh@ShaResponse(_, false)) => complete(StatusCodes.NotFound -> sh)
+ case Some(sh@ShaResponse(_, true)) => complete(StatusCodes.OK -> sh)
}
}
}
lazy val getConfig:Route = {
def completeConfigRoute(httpResponse:HttpResponse,uri:Uri):Route = {
ctx => {
val config = ParsedConfig(httpResponse.entity.asString)
ctx.complete(
ShrineConfig(config)
)
}
}
requestUriThenRoute(statusBaseUrl + "/config", completeConfigRoute)
}
lazy val getClasspath:Route = {
def pullClasspathFromConfig(httpResponse:HttpResponse,uri:Uri):Route = {
ctx => {
val result = httpResponse.entity.asString
val shrineConfig = ShrineConfig(ParsedConfig(result))
ctx.complete(shrineConfig)
}
}
requestUriThenRoute(statusBaseUrl + "/config",pullClasspathFromConfig)
}
def getFromSubService(key: String):Route = {
- requestUriThenRoute(s"$statusBaseUrl/$key")
+ detach() {
+ requestUriThenRoute(s"$statusBaseUrl/$key")
+ }
}
// table based view, can see N problems at a time. Front end sends how many problems that they want
// to skip, and it will take N the 'nearest N' ie with n = 20, 0-19 -> 20, 20-39 -> 20-40
lazy val getProblems:Route = {
def floorMod(x: Int, y: Int) = {
x - (x % y)
}
val db = Problems.DatabaseConnector
// Intellij loudly complains if you use parameters instead of chained parameter calls.
// ¯\_(ツ)_/¯
parameter("offset".as[Int].?(0)) {(offsetPreMod:Int) =>
parameter("n".as[Int].?(20)) {(nPreMax:Int) =>
parameter("epoch".as[Long].?) {(epoch:Option[Long]) =>
val n = Math.max(0, nPreMax)
val moddedOffset = floorMod(Math.max(0, offsetPreMod), n)
- val query = for {
- result <- db.IO.sizeAndProblemDigest(n, moddedOffset)
- } yield (result._2, floorMod(Math.max(0, moddedOffset), n), n, result._1)
-
- val query2 = for {
- dateOffset <- db.IO.findIndexOfDate(epoch.getOrElse(0))
- moddedOffset = floorMod(dateOffset, n)
- result <- db.IO.sizeAndProblemDigest(n, moddedOffset)
- } yield (result._2, moddedOffset, n, result._1)
-
- val queryReal = if (epoch.isEmpty) query else query2
- val tupled = db.runBlocking(queryReal)
- val response: ProblemResponse = ProblemResponse(tupled._1, tupled._2, tupled._3, tupled._4)
- implicit val formats = response.json4sMarshaller
- complete(response)
- }}}
+ detach() {
+ val query = for {
+ result <- db.IO.sizeAndProblemDigest(n, moddedOffset)
+ } yield (result._2, floorMod(Math.max(0, moddedOffset), n), n, result._1)
+
+ val query2 = for {
+ dateOffset <- db.IO.findIndexOfDate(epoch.getOrElse(0))
+ moddedOffset = floorMod(dateOffset, n)
+ result <- db.IO.sizeAndProblemDigest(n, moddedOffset)
+ } yield (result._2, moddedOffset, n, result._1)
+
+ val queryReal = if (epoch.isEmpty) query else query2
+ val tupled = db.runBlocking(queryReal)
+ val response: ProblemResponse = ProblemResponse(tupled._1, tupled._2, tupled._3, tupled._4)
+ implicit val formats = response.json4sMarshaller
+ complete(response)
+ }
+ }
+ }
+ }
}
}
case class ProblemResponse(size: Int, offset: Int, n: Int, problems: Seq[ProblemDigest]) extends Json4sSupport {
override implicit def json4sFormats: Formats = DefaultFormats + new NodeSeqSerializer
}
object KeyStoreInfo {
val config = ConfigSource.config
val keyStoreDescriptor = KeyStoreDescriptorParser(
config.getConfig("shrine.keystore"),
config.getConfigOrEmpty("shrine.hub"),
config.getConfigOrEmpty("shrine.queryEntryPoint"))
val certCollection = BouncyKeyStoreCollection.fromFileRecoverWithClassPath(keyStoreDescriptor)
val hasher = UtilHasher(certCollection)
}
/**
* Centralized parsing logic for map of shrine.conf
* the class literal `T.class` in Java.
*/
//todo most of this info should come directly from the status service in Shrine, not from reading the config
case class ParsedConfig(configMap:Map[String, String]){
private val trueVal = "true"
private val rootKey = "shrine"
def isHub =
getOrElse(rootKey + ".hub.create", "")
.toLowerCase == trueVal
def stewardEnabled =
configMap.keySet
.contains(rootKey + ".queryEntryPoint.shrineSteward")
def shouldQuerySelf =
getOrElse(rootKey + ".hub.shouldQuerySelf", "")
.toLowerCase == trueVal
def fromJsonString(jsonString:String): String = jsonString.split("\"").mkString("")
def get(key:String): Option[String] = configMap.get(key).map(fromJsonString)
def getOrElse(key:String, elseVal:String = ""): String = get(key).getOrElse(elseVal)
}
object ParsedConfig {
def apply(jsonString:String):ParsedConfig = {
implicit def json4sFormats: Formats = DefaultFormats
ParsedConfig(json4sParse(jsonString).extract[StatusProtocolConfig].keyValues)//.filterKeys(_.toLowerCase.startsWith("shrine")))
}
}
case class DownstreamNode(name:String, url:String)
object DownstreamNode {
def create(configMap:Map[String,String]):Iterable[DownstreamNode] = {
for ((k, v) <- configMap.filterKeys(_.toLowerCase.startsWith
("shrine.hub.downstreamnodes")))
yield DownstreamNode(k.split('.').last,v.split("\"").mkString(""))
}
}
//todo replace with the actual config, scrubbed of passwords
case class ShrineConfig(isHub:Boolean,
hub:Hub,
pmEndpoint:Endpoint,
ontEndpoint:Endpoint,
hiveCredentials: HiveCredentials,
adapter: Adapter,
queryEntryPoint:QEP,
networkStatusQuery:String,
configMap:Map[String, String]
) extends DefaultJsonSupport
object ShrineConfig extends DefaultJsonSupport {
def apply(config:ParsedConfig):ShrineConfig = {
val hub = Hub(config)
val isHub = config.isHub
val pmEndpoint = Endpoint("pm",config)
val ontEndpoint = Endpoint("ont",config)
val hiveCredentials = HiveCredentials(config)
val adapter = Adapter(config)
val queryEntryPoint = QEP(config)
val networkStatusQuery = config.configMap("shrine.networkStatusQuery")
ShrineConfig(isHub, hub, pmEndpoint, ontEndpoint, hiveCredentials, adapter, queryEntryPoint, networkStatusQuery, config.configMap)
}
}
case class Endpoint(acceptAllCerts:Boolean, url:String, timeoutSeconds:Int)
object Endpoint{
def apply(endpointType:String,parsedConfig:ParsedConfig):Endpoint = {
val prefix = "shrine." + endpointType.toLowerCase + "Endpoint."
val acceptAllCerts = parsedConfig.configMap.getOrElse(prefix + "acceptAllCerts", "") == "true"
val url = parsedConfig.configMap.getOrElse(prefix + "url","")
val timeoutSeconds = parsedConfig.configMap.getOrElse(prefix + "timeout.seconds", "0").toInt
Endpoint(acceptAllCerts, url, timeoutSeconds)
}
}
case class HiveCredentials(domain:String,
username:String,
password:String,
crcProjectId:String,
ontProjectId:String)
object HiveCredentials{
def apply(parsedConfig:ParsedConfig):HiveCredentials = {
val key = "shrine.hiveCredentials."
val domain = parsedConfig.configMap.getOrElse(key + "domain","")
val username = parsedConfig.configMap.getOrElse(key + "username","")
val password = "REDACTED"
val crcProjectId = parsedConfig.configMap.getOrElse(key + "crcProjectId","")
val ontProjectId = parsedConfig.configMap.getOrElse(key + "ontProjectId","")
HiveCredentials(domain, username, password, crcProjectId, ontProjectId)
}
}
// -- hub only -- //
//todo delete when the Dashboard front end can use the status service's hub method
case class Hub(shouldQuerySelf:Boolean,
create:Boolean,
downstreamNodes:Iterable[DownstreamNode])
object Hub{
def apply(parsedConfig:ParsedConfig):Hub = {
val shouldQuerySelf = parsedConfig.shouldQuerySelf
val create = parsedConfig.isHub
val downstreamNodes = DownstreamNode.create(parsedConfig.configMap)
Hub(shouldQuerySelf, create, downstreamNodes)
}
}
// -- adapter info -- //
case class Adapter(crcEndpointUrl:String, setSizeObfuscation:Boolean, adapterLockoutAttemptsThreshold:Int,
adapterMappingsFilename:String)
object Adapter{
def apply(parsedConfig:ParsedConfig):Adapter = {
val key = "shrine.adapter."
val crcEndpointUrl = parsedConfig.configMap.getOrElse(key + "crcEndpoint.url","")
val setSizeObfuscation = parsedConfig.configMap.getOrElse(key + "setSizeObfuscation","").toLowerCase == "true"
val adapterLockoutAttemptsThreshold = parsedConfig.configMap.getOrElse(key + "adapterLockoutAttemptsThreshold", "0").toInt
val adapterMappingsFileName = parsedConfig.configMap.getOrElse(key + "adapterMappingsFileName","")
Adapter(crcEndpointUrl, setSizeObfuscation, adapterLockoutAttemptsThreshold, adapterMappingsFileName)
}
}
case class Steward(qepUserName:String, stewardBaseUrl:String)
object Steward {
def apply (parsedConfig:ParsedConfig):Steward = {
val key = "shrine.queryEntryPoint.shrineSteward."
val qepUserName = parsedConfig.configMap.getOrElse(key + "qepUserName","")
val stewardBaseUrl = parsedConfig.configMap.getOrElse(key + "stewardBaseUrl","")
Steward(qepUserName, stewardBaseUrl)
}
}
// -- if needed -- //
case class TimeoutInfo (timeUnit:String, description:String)
case class DatabaseInfo(createTablesOnStart:Boolean, dataSourceFrom:String,
jndiDataSourceName:String, slickProfileClassName:String)
case class Audit(database:DatabaseInfo, collectQepAudit:Boolean)
object Audit{
def apply(parsedConfig:ParsedConfig):Audit = {
val key = "shrine.queryEntryPoint.audit."
val createTablesOnStart = parsedConfig.configMap.getOrElse(key + "database.createTablesOnStart","") == "true"
val dataSourceFrom = parsedConfig.configMap.getOrElse(key + "database.dataSourceFrom","")
val jndiDataSourceName = parsedConfig.configMap.getOrElse(key + "database.jndiDataSourceName","")
val slickProfileClassName = parsedConfig.configMap.getOrElse(key + "database.slickProfileClassName","")
val collectQepAudit = parsedConfig.configMap.getOrElse(key + "collectQepAudit","") == "true"
val database = DatabaseInfo(createTablesOnStart, dataSourceFrom, jndiDataSourceName, slickProfileClassName)
Audit(database, collectQepAudit)
}
}
case class QEP(
maxQueryWaitTimeMinutes:Int,
create:Boolean,
attachSigningCert:Boolean,
authorizationType:String,
includeAggregateResults:Boolean,
authenticationType:String,
audit:Audit,
shrineSteward:Steward,
broadcasterServiceEndpointUrl:Option[String]
)
object QEP{
val key = "shrine.queryEntryPoint."
def apply(parsedConfig:ParsedConfig):QEP = QEP(
maxQueryWaitTimeMinutes = parsedConfig.configMap.getOrElse(key + "maxQueryWaitTime.minutes", "0").toInt,
create = parsedConfig.configMap.getOrElse(key + "create","") == "true",
attachSigningCert = parsedConfig.configMap.getOrElse(key + "attachSigningCert","") == "true",
authorizationType = parsedConfig.configMap.getOrElse(key + "authorizationType",""),
includeAggregateResults = parsedConfig.configMap.getOrElse(key + "includeAggregateResults","") == "true",
authenticationType = parsedConfig.configMap.getOrElse(key + "authenticationType", ""),
audit = Audit(parsedConfig),
shrineSteward = Steward(parsedConfig),
broadcasterServiceEndpointUrl = parsedConfig.configMap.get(key + "broadcasterServiceEndpoint.url")
)
}
//adapted from https://gist.github.com/joseraya/176821d856b43b1cfe19
object gruntWatchCorsSupport extends Directive0 with RouteConcatenation {
import spray.http.AllOrigins
import spray.http.HttpHeaders.{`Access-Control-Allow-Headers`, `Access-Control-Allow-Methods`, `Access-Control-Allow-Origin`, `Access-Control-Max-Age`}
import spray.http.HttpMethods.{GET, OPTIONS, POST}
import spray.routing.directives.MethodDirectives.options
import spray.routing.directives.RespondWithDirectives.respondWithHeaders
import spray.routing.directives.RouteDirectives.complete
private val allowOriginHeader = `Access-Control-Allow-Origin`(AllOrigins)
private val optionsCorsHeaders = List(
`Access-Control-Allow-Headers`("Origin, X-Requested-With, Content-Type, Accept, Accept-Encoding, Accept-Language, Host, Referer, User-Agent, Authorization"),
`Access-Control-Max-Age`(1728000)) //20 days
val gruntWatch:Boolean = ConfigSource.config.getBoolean("shrine.dashboard.gruntWatch")
override def happly(f: (HNil) => Route): Route = {
if(gruntWatch) {
options {
respondWithHeaders(`Access-Control-Allow-Methods`(OPTIONS, GET, POST) :: allowOriginHeader :: optionsCorsHeaders){
complete(StatusCodes.OK)
}
} ~ f(HNil)
}
else f(HNil)
}
}
\ No newline at end of file
diff --git a/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala b/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala
index d8633d605..7d174f32a 100644
--- a/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala
+++ b/apps/dashboard-app/src/main/scala/net/shrine/dashboard/httpclient/HttpClientDirectives.scala
@@ -1,135 +1,130 @@
package net.shrine.dashboard.httpclient
import java.io.InputStream
-import java.security.cert.X509Certificate
-import javax.net.ssl.{SSLContext, X509TrustManager}
+import akka.actor.ActorSystem
+import net.shrine.hornetqclient.HttpClient
import net.shrine.log.Loggable
-import spray.can.Http
-import akka.io.IO
-import akka.actor.{ActorRef, ActorSystem}
-import spray.can.Http.{ConnectionAttemptFailedException, HostConnectorSetup}
+import net.shrine.source.ConfigSource
import spray.http.{HttpCredentials, HttpEntity, HttpHeader, HttpHeaders, HttpRequest, HttpResponse, StatusCodes, Uri}
-import spray.io.ClientSSLEngineProvider
import spray.routing.{RequestContext, Route}
-import akka.pattern.ask
-import net.shrine.hornetqclient.HttpClient
-import net.shrine.source.ConfigSource
-import scala.concurrent.{Await, Future, TimeoutException, blocking}
-import scala.concurrent.duration.DurationInt
+import scala.concurrent.blocking
import scala.language.postfixOps
-import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.NonFatal
/**
* From https://github.com/bthuillier/spray/commit/d31fc1b5e1415e1b908fe7d1f01f364a727e2593 with extra bits from http://www.cakesolutions.net/teamblogs/http-proxy-with-spray .
* Replace when Spray has its own version.
*
* @author david
* @since 9/14/15
*/
trait HttpClientDirectives extends Loggable {
implicit val system = ActorSystem("dashboardServer",ConfigSource.config)
/**
* Proxy the request to the specified base uri appended with the unmatched path.
*
*/
def forwardUnmatchedPath(baseUri: Uri,maybeCredentials:Option[HttpCredentials] = None): Route = {
def completeWithEntityAsString(httpResponse:HttpResponse,uri:Uri):Route = {
ctx => {
ctx.complete(httpResponse.entity.asString)
}
}
requestWithUnmatchedPath(baseUri,completeWithEntityAsString,maybeCredentials)
}
/**
* Make the request to the specified base uri appended with the unmatched path, then use the returned entity (as a string) to complete the route.
*
*/
def requestWithUnmatchedPath(baseUri:Uri, route:(HttpResponse,Uri) => Route,maybeCredentials:Option[HttpCredentials] = None): Route = {
ctx => {
val resourceUri = baseUri.withPath(baseUri.path.++(ctx.unmatchedPath)).withQuery(ctx.request.uri.query)
- requestUriThenRoute(resourceUri,route,maybeCredentials)(ctx)
+ blocking {
+ requestUriThenRoute(resourceUri,route,maybeCredentials)(ctx)
+ }
}
}
/**
* Just pass the result through
*/
def passThrough(httpResponse: HttpResponse,uri: Uri):Route = ctx => ctx.complete(httpResponse.entity.asString)
/**
* proxy the request to the specified uri with the unmatched path, then use the returned entity (as a string) to complete the route.
*
*/
def requestUriThenRoute(
resourceUri:Uri,
route:(HttpResponse,Uri) => Route = passThrough,
maybeCredentials:Option[HttpCredentials] = None
): Route = {
ctx => {
- val httpResponse = httpResponseForUri(resourceUri,ctx,maybeCredentials)
- info(s"Got $httpResponse for $resourceUri")
+ blocking {
+ val httpResponse = httpResponseForUri(resourceUri, ctx, maybeCredentials)
+ info(s"Got $httpResponse for $resourceUri")
- handleCommonErrorsOrRoute(route)(httpResponse,resourceUri)(ctx)
+ handleCommonErrorsOrRoute(route)(httpResponse, resourceUri)(ctx)
+ }
}
}
private def httpResponseForUri(resourceUri:Uri,ctx: RequestContext,maybeCredentials:Option[HttpCredentials] = None):HttpResponse = {
info(s"Requesting $resourceUri")
if(resourceUri.scheme == "classpath") ClasspathResourceHttpClient.loadFromResource(resourceUri.path.toString())
else {
val basicRequest = HttpRequest(ctx.request.method,resourceUri)
val request = maybeCredentials.fold(basicRequest){ (credentials: HttpCredentials) =>
val headers: List[HttpHeader] = basicRequest.headers :+ HttpHeaders.Authorization(credentials)
basicRequest.copy(headers = headers)
}
HttpClient.webApiCall(request)
}
}
def handleCommonErrorsOrRoute(route:(HttpResponse,Uri) => Route)(httpResponse: HttpResponse,uri:Uri): Route = {
ctx => {
if(httpResponse.status != StatusCodes.OK) {
//todo create and report a problem
val ctxCopy: RequestContext = ctx.withHttpResponseMapped(_.copy(status = httpResponse.status))
ctxCopy.complete(s"$uri replied with $httpResponse")
}
else route(httpResponse,uri)(ctx)
}
}
}
object HttpClientDirectives extends HttpClientDirectives
/**
* For testing, get an HttpResponse for a classpath resource
*/
object ClasspathResourceHttpClient extends Loggable {
def loadFromResource(resourceName:String):HttpResponse = {
blocking {
val cleanResourceName = if (resourceName.startsWith ("/") ) resourceName.drop(1)
else resourceName
val classLoader = getClass.getClassLoader
try {
val is: InputStream = classLoader.getResourceAsStream (cleanResourceName)
val string:String = scala.io.Source.fromInputStream (is).mkString
HttpResponse(entity = HttpEntity(string))
}
catch{
case NonFatal(x) => {
info(s"Could not load $resourceName",x)
HttpResponse(status = StatusCodes.NotFound,entity = HttpEntity(s"Could not load $resourceName due to ${x.getMessage}"))
}
}
}
}
}
\ No newline at end of file
diff --git a/apps/dashboard-app/src/main/scala/net/shrine/dashboard/jwtauth/ShrineJwtAuthenticator.scala b/apps/dashboard-app/src/main/scala/net/shrine/dashboard/jwtauth/ShrineJwtAuthenticator.scala
index 7fa53977e..3a9985c51 100644
--- a/apps/dashboard-app/src/main/scala/net/shrine/dashboard/jwtauth/ShrineJwtAuthenticator.scala
+++ b/apps/dashboard-app/src/main/scala/net/shrine/dashboard/jwtauth/ShrineJwtAuthenticator.scala
@@ -1,220 +1,219 @@
package net.shrine.dashboard.jwtauth
import java.io.ByteArrayInputStream
import java.security.cert.{CertificateFactory, X509Certificate}
import java.security.{Key, Principal, PrivateKey}
import java.util.Date
import io.jsonwebtoken.impl.TextCodec
import io.jsonwebtoken.{ClaimJwtException, Claims, Jws, Jwts, SignatureAlgorithm}
import net.shrine.crypto.{DownStreamCertCollection, HubCertCollection, PeerCertCollection}
import net.shrine.dashboard.KeyStoreInfo
import net.shrine.i2b2.protocol.pm.User
import net.shrine.log.Loggable
import net.shrine.protocol.Credential
import spray.http.HttpHeaders.{Authorization, `WWW-Authenticate`}
import spray.http.{HttpChallenge, HttpHeader, HttpRequest, OAuth2BearerToken}
import spray.routing.AuthenticationFailedRejection
import spray.routing.AuthenticationFailedRejection.{CredentialsMissing, CredentialsRejected}
import spray.routing.authentication._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
/**
* An Authenticator that uses Jwt in a Bearer header to authenticate. See http://jwt.io/introduction/ for what this is all about,
* https://tools.ietf.org/html/rfc7519 for what it might include for claims.
*
* @author david
* @since 12/21/15
*/
object ShrineJwtAuthenticator extends Loggable {
val certCollection = KeyStoreInfo.certCollection
//from https://groups.google.com/forum/#!topic/spray-user/5DBEZUXbjtw
def authenticate(implicit ec: ExecutionContext): ContextAuthenticator[User] = { ctx =>
Future {
val attempt: Try[Authentication[User]] = for {
header:HttpHeader <- extractAuthorizationHeader(ctx.request)
jwtsString:String <- extractJwtsStringAndCheckScheme(header)
jwtsClaims <- extractJwtsClaims(jwtsString)
cert: X509Certificate <- extractAndCheckCert(jwtsClaims)
jwtsBody:Claims <- Try{jwtsClaims.getBody}
jwtsSubject <- failIfNull(jwtsBody.getSubject,MissingRequiredJwtsClaim("subject",cert.getSubjectDN))
jwtsIssuer <- failIfNull(jwtsBody.getSubject,MissingRequiredJwtsClaim("issuer",cert.getSubjectDN))
} yield {
val user = User(
fullName = cert.getSubjectDN.getName,
username = jwtsSubject,
domain = jwtsIssuer,
credential = Credential(jwtsIssuer, isToken = false),
params = Map(),
rolesByProject = Map()
)
Right(user)
}
//todo use a fold() in Scala 2.12
attempt match {
case Success(rightUser) => rightUser
case Failure(x) => x match
{
case anticipated: ShrineJwtException =>
info(s"Failed to authenticate due to ${anticipated.toString}",anticipated)
anticipated.rejection
case fromJwts: ClaimJwtException =>
info(s"Failed to authenticate due to ${fromJwts.toString} while authenticating ${ctx.request}",fromJwts)
rejectedCredentials
/*
case x: CertificateExpiredException => {
//todo will these even be thrown here? Get some identification here
info(s"Cert expired.", x)
rejectedCredentials
}
case x: CertificateNotYetValidException => {
info(s"Cert not yet valid.", x)
rejectedCredentials
}
*/
case unanticipated =>
warn(s"Unanticipated ${unanticipated.toString} while authenticating ${ctx.request}",unanticipated)
rejectedCredentials
}
}
}
}
def createOAuthCredentials(user:User, dnsName: String): OAuth2BearerToken = {
+ val oauthEntry = certCollection match {
+ case HubCertCollection(myEntry, _, _) => myEntry
+ case DownStreamCertCollection(myEntry, _, _) => myEntry // Note: The DownStreamCertCollection will never be accepted
+ case PeerCertCollection(_, _, sites) => sites.find(_.url == dnsName).get.entry.get
+ // This looks really dangerous, but it's an invariant that all remote sites for PeerToPeer collections have entries.
+ }
+ val base64Cert: String = TextCodec.BASE64URL.encode(oauthEntry.cert.getEncoded)
- val oauthEntry = certCollection match {
- case HubCertCollection(myEntry, _, _) => myEntry
- case DownStreamCertCollection(myEntry, _, _) => myEntry // Note: The DownStreamCertCollection will never be accepted
- case PeerCertCollection(_, _, sites) => sites.find(_.url == dnsName).get.entry.get
- // This looks really dangerous, but it's an invariant that all remote sites for PeerToPeer collections have entries.
- }
- val base64Cert:String = TextCodec.BASE64URL.encode(oauthEntry.cert.getEncoded)
-
- val key: PrivateKey = certCollection.myEntry.privateKey.get
- val expiration: Date = new Date(System.currentTimeMillis() + 30 * 1000) //good for 30 seconds
- val jwtsString = Jwts.builder().
+ val key: PrivateKey = certCollection.myEntry.privateKey.get
+ val expiration: Date = new Date(System.currentTimeMillis() + 30 * 1000) //good for 30 seconds
+ val jwtsString = Jwts.builder().
setHeaderParam("kid", base64Cert).
setSubject(s"${user.username} at ${user.domain}").
setIssuer(java.net.InetAddress.getLocalHost.getHostName). //todo is it OK for me to use issuer this way or should I use my own claim?
setExpiration(expiration).
signWith(SignatureAlgorithm.RS512, key).
compact()
- OAuth2BearerToken(jwtsString)
+ OAuth2BearerToken(jwtsString)
}
def extractAuthorizationHeader(request: HttpRequest):Try[HttpHeader] = Try {
case class NoAuthorizationHeaderException(request: HttpRequest) extends ShrineJwtException(s"No ${Authorization.name} header found in $request",missingCredentials)
//noinspection ComparingUnrelatedTypes
request.headers.find(_.name.equals(Authorization.name)).getOrElse{throw NoAuthorizationHeaderException(request)}
}
def extractJwtsStringAndCheckScheme(httpHeader: HttpHeader) = Try {
val splitHeaderValue: Array[String] = httpHeader.value.trim.split(" ")
if (splitHeaderValue.length != 2) {
case class WrongNumberOfSegmentsException(httpHeader: HttpHeader) extends ShrineJwtException(s"Header had ${splitHeaderValue.length} space-delimited segments, not 2, in $httpHeader.",missingCredentials)
throw new WrongNumberOfSegmentsException(httpHeader)
}
else if(splitHeaderValue(0) != BearerAuthScheme) {
case class NotBearerAuthException(httpHeader: HttpHeader) extends ShrineJwtException(s"Expected $BearerAuthScheme, not ${splitHeaderValue(0)} in $httpHeader.",missingCredentials)
throw new NotBearerAuthException(httpHeader)
}
else splitHeaderValue(1)
}
def extractJwtsClaims(jwtsString:String): Try[Jws[Claims]] = Try {
Jwts.parser().setSigningKeyResolver(new SigningKeyResolverBridge()).parseClaimsJws(jwtsString)
}
def extractAndCheckCert(jwtsClaims:Jws[Claims]): Try[X509Certificate] = Try {
val cert = KeySource.certForString(jwtsClaims.getHeader.getKeyId)
val issuingSite = jwtsClaims.getBody.getIssuer
case class CertIssuerNotInCollectionException(issuingSite:String,issuer: Principal, aliases: Iterable[String]) extends ShrineJwtException(s"Could not find a certificate with issuer DN $issuer. Known cert aliases are ${aliases.mkString(",")}")
//todo is this the right way to find a cert in the certCollection?
certCollection match {
case DownStreamCertCollection(_, caEntry, _)
if caEntry.signed(cert) => cert
case HubCertCollection(ca, _, _) if ca.signed(cert) => cert
case PeerCertCollection(_, entries, _) if entries.exists(_.signed(cert))
=> cert
case px: PeerCertCollection => px.allEntries.find(_.signed(cert)).map(_.cert).getOrElse(
throw CertIssuerNotInCollectionException(issuingSite, cert.getIssuerDN, px.allEntries.flatMap(_.aliases))
)
case dc: DownStreamCertCollection => throw CertIssuerNotInCollectionException(issuingSite, cert.getIssuerDN, dc.caEntry.aliases)
case hc: HubCertCollection => throw CertIssuerNotInCollectionException(issuingSite,cert.getIssuerDN, hc.caEntry.aliases)
}
// debug(s"certCollection.caCerts.contains(${cert.getSubjectX500Principal}) is ${caEntry.cert.getSubjectX500Principal == cert.getSubjectX500Principal}")
// certCollection.caCerts.get(cert.getSubjectX500Principal).fold{
// //if not in the keystore, check that the issuer is available
// val issuer: Principal = cert.getIssuerX500Principal
// case class CertIssuerNotInCollectionException(issuingSite:String,issuer: Principal) extends ShrineJwtException(s"Could not find a CA certificate with issuer DN $issuer. Known CA cert aliases are ${certCollection.caCertAliases.mkString(",")}")
// val signingCert = certCollection.caCerts.getOrElse(issuer,{throw CertIssuerNotInCollectionException(issuingSite,issuer)})
//
// //verify that the cert was signed using the signingCert
// //todo this can throw CertificateException, NoSuchAlgorithmException, InvalidKeyException, NoSuchProviderException, SignatureException
// cert.verify(signingCert.getPublicKey)
// //todo has cert expired?
// info(s"${cert.getSubjectX500Principal} verified using $issuer from the KeyStore")
// cert
// }{ principal => //if the cert is in the certCollection then all is well
// info(s"$principal is in the KeyStore")
// cert
// }
}
def failIfNull[E](e:E,t:Throwable):Try[E] = Try {
if(e == null) throw t
else e
}
case class MissingRequiredJwtsClaim(field:String,principal: Principal) extends ShrineJwtException(s"$field is null from ${principal.getName}")
val BearerAuthScheme = "Bearer"
val challengeHeader: `WWW-Authenticate` = `WWW-Authenticate`(HttpChallenge(BearerAuthScheme, "dashboard-to-dashboard"))
val missingCredentials: Authentication[User] = Left(AuthenticationFailedRejection(CredentialsMissing, List(challengeHeader)))
val rejectedCredentials: Authentication[User] = Left(AuthenticationFailedRejection(CredentialsRejected, List(challengeHeader)))
}
class KeySource {}
object KeySource extends Loggable {
def keyForString(string: String): Key = {
val certificate =certForString(string)
//todo validate cert with something like obtainAndValidateSigningCert
//check date on cert vs time. throws CertificateExpiredException or CertificateNotYetValidException for problems
//todo skip this until you rebuild the certs used for testing certificate.checkValidity(now)
certificate.getPublicKey
}
def certForString(string: String): X509Certificate = {
val certBytes = TextCodec.BASE64URL.decode(string)
val inputStream = new ByteArrayInputStream(certBytes)
val certificate = try {
CertificateFactory.getInstance("X.509").generateCertificate(inputStream).asInstanceOf[X509Certificate]
}
finally {
inputStream.close()
}
certificate
}
}
abstract class ShrineJwtException(message:String,
val rejection:Authentication[User] = ShrineJwtAuthenticator.rejectedCredentials,
cause:Throwable = null) extends RuntimeException(message,cause)
\ No newline at end of file
diff --git a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala
index 718ff87ca..718a36536 100644
--- a/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala
+++ b/apps/meta-app/src/main/scala/net/shrine/metadata/QepService.scala
@@ -1,389 +1,387 @@
package net.shrine.metadata
import java.util.UUID
import akka.actor.ActorSystem
import net.shrine.audit.{NetworkQueryId, QueryName, Time}
import net.shrine.authorization.steward.UserName
import net.shrine.i2b2.protocol.pm.User
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources}
import net.shrine.protocol.ResultOutputType
import net.shrine.qep.querydb.{FullQueryResult, QepQuery, QepQueryBreakdownResultsRow, QepQueryDb, QepQueryDbChangeNotifier, QepQueryFlag}
import net.shrine.source.ConfigSource
import net.shrine.config.ConfigExtensions
import rapture.json._
import rapture.json.formatters.humanReadable
import rapture.json.jsonBackends.jawn._
import spray.http.{StatusCode, StatusCodes}
import spray.routing._
import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Promise
+import scala.concurrent.{blocking, Promise}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Try
import scala.util.control.NonFatal
/**
* An API to support the web client's work with queries.
*
* The current API supplies information about previous running queries. Eventually this will support accessing
* information about queries running now and the ability to submit queries.
*/
//todo move this to the qep/service module, or somewhere in the qep subproject
trait QepService extends HttpService with Loggable {
def system: ActorSystem
val qepQueryDbChangeNotifier = QepQueryDbChangeNotifier(system)
val qepReceiver = QepReceiver //start the QepReceiver by bringing it into context
val qepInfo =
"""
|The SHRINE query entry point service.
|
|This API gives a researcher access to queries, and (eventually) the ability to run queries.
|
""".stripMargin
def qepRoute(user: User): Route = pathPrefix("qep") {
get {
detach(){
queryResult(user) ~ queryResultsTable(user)
}
} ~
pathEndOrSingleSlash{complete(qepInfo)} ~
respondWithStatus(StatusCodes.NotFound){complete(qepInfo)}
}
/*
Races to complete are OK in spray. They're already happening, in fact.
When a request comes in
if the request can be fulfilled immediately then do that
if not
create a promise to fulfil to trigger the complete
create a promise to bump that first one on timeout
schedule a runnable to bump the timeout promise
create a promise to bump that first one if the conditions are right
create a promise to bump the conditional one and stuff it in a concurrent map for other parts of the system to find
onSuccess remove the conditional promise and cancel the scheduled timeout.
*/
def queryResult(user:User):Route = path("queryResult" / LongNumber) { queryId: NetworkQueryId =>
//take optional parameters for version and an awaitTime, but insist on both
//If the timeout parameter isn't supplied then the deadline is now so it will reply immediately
parameters('afterVersion.as[Long] ? 0L, 'timeoutSeconds.as[Long] ? 0L) { (afterVersion: Long, timeoutSeconds: Long) =>
//check that the timeout is less than the spray "give up" timeout
val sprayRequestTimeout = ConfigSource.config.get("spray.can.server.request-timeout",Duration(_)).toSeconds
val maximumTimeout = sprayRequestTimeout - 1
if (maximumTimeout <= timeoutSeconds) warn(s"""spray.can.server.request-timeout $sprayRequestTimeout is too short
|relative to timeoutSeconds $timeoutSeconds . The server may produce a timeout-related error. Using
|$maximumTimeout instead of $timeoutSeconds to try to prevent that.""".stripMargin)
val timeout = Seq(maximumTimeout,timeoutSeconds).min
//times for local races.
val requestStartTime = System.currentTimeMillis()
val deadline = requestStartTime + (timeout * 1000)
detach(){
val troubleOrResultsRow = selectResultsRow(queryId, user)
if (shouldRespondNow(deadline, afterVersion, troubleOrResultsRow)) {
//bypass all the concurrent/interrupt business. Just reply.
completeWithQueryResult(queryId,troubleOrResultsRow)
}
else {
debug(s"Creating promises to respond about $queryId with a version later than $afterVersion by $deadline ")
// the Promise used to respond
val okToRespond = Promise[Either[(StatusCode,String),ResultsRow]]()
//Schedule the timeout
val okToRespondTimeout = Promise[Unit]()
okToRespondTimeout.future.transform({unit =>
okToRespond.tryComplete(Try(selectResultsRow(queryId, user)))
},{x:Throwable => x match {case NonFatal(t) => ExceptionWhilePreparingTimeoutResponse(queryId,t)}
x
})
val timeLeft = (deadline - System.currentTimeMillis()) milliseconds
case class TriggerRunnable(networkQueryId: NetworkQueryId,promise: Promise[Unit]) extends Runnable {
val unit:Unit = ()
override def run(): Unit = promise.trySuccess(unit)
}
val timeoutCanceller = system.scheduler.scheduleOnce(timeLeft,TriggerRunnable(queryId,okToRespondTimeout))
//Set up for an interrupt from new data
val okToRespondIfNewData = Promise[Unit]()
okToRespondIfNewData.future.transform({unit =>
val latestResultsRow = selectResultsRow(queryId, user)
if(shouldRespondNow(deadline,afterVersion,latestResultsRow)) {
okToRespond.tryComplete(Try(selectResultsRow(queryId, user)))
}
},{x:Throwable => x match {case NonFatal(t) => ExceptionWhilePreparingTriggeredResponse(queryId,t)}
x
})
val requestId = UUID.randomUUID()
//put id -> okToRespondIfNewData in a map so that outside processes can trigger it
qepQueryDbChangeNotifier.putLongPollRequest(requestId,queryId,okToRespondIfNewData)
onSuccess(okToRespond.future){ latestResultsRow:Either[(StatusCode,String),ResultsRow] =>
//clean up concurrent bits before responding
qepQueryDbChangeNotifier.removeLongPollRequest(requestId)
timeoutCanceller.cancel()
completeWithQueryResult(queryId,latestResultsRow)
}
}
}
}
}
/**
* @param deadline time when a response must go
* @param afterVersion last timestamp the requester knows about
* @param resultsRow either the result row or something is not right
* @return true to respond now, false to dither
*/
def shouldRespondNow(deadline: Long,
afterVersion: Long,
resultsRow:Either[(StatusCode,String),ResultsRow]
):Boolean = {
val currentTime = System.currentTimeMillis()
if (currentTime >= deadline) true
else resultsRow.fold(
{_._1 != StatusCodes.NotFound},
{_.dataVersion > afterVersion}
)
}
def completeWithQueryResult(networkQueryId: NetworkQueryId,troubleOrResultsRow:Either[(StatusCode,String),ResultsRow]): Route = {
debug(s"Responding to a request for $networkQueryId with $troubleOrResultsRow")
troubleOrResultsRow.fold({ trouble =>
//something is wrong. Respond now.
respondWithStatus(trouble._1) {
complete(trouble._2)
}
}, { queryAndResults =>
//everything is fine. Respond now.
val json: Json = Json(queryAndResults)
val formattedJson: String = Json.format(json)(humanReadable())
complete(formattedJson)
})
}
def selectResultsRow(queryId:NetworkQueryId,user:User):Either[(StatusCode,String),ResultsRow] = {
//query once and determine if the latest change > afterVersion
-
- val queryOption: Option[QepQuery] = QepQueryDb.db.selectQueryById(queryId)
- queryOption.map{query: QepQuery =>
- if (user.sameUserAs(query.userName, query.userDomain)) {
- val mostRecentQueryResults: Seq[Result] = QepQueryDb.db.selectMostRecentFullQueryResultsFor(queryId).map(Result(_))
- val flag = QepQueryDb.db.selectMostRecentQepQueryFlagFor(queryId).map(QueryFlag(_))
- val queryCell = QueryCell(query, flag)
- val queryAndResults = ResultsRow(queryCell, mostRecentQueryResults)
-
- Right(queryAndResults)
- }
- else Left((StatusCodes.Forbidden,s"Query $queryId belongs to a different user"))
- }.getOrElse(Left[(StatusCode,String),ResultsRow]((StatusCodes.NotFound,s"No query with id $queryId found")))
+ val queryOption: Option[QepQuery] = QepQueryDb.db.selectQueryById(queryId)
+ queryOption.map { query: QepQuery =>
+ if (user.sameUserAs(query.userName, query.userDomain)) {
+ val mostRecentQueryResults: Seq[Result] = QepQueryDb.db.selectMostRecentFullQueryResultsFor(queryId).map(Result(_))
+ val flag = QepQueryDb.db.selectMostRecentQepQueryFlagFor(queryId).map(QueryFlag(_))
+ val queryCell = QueryCell(query, flag)
+ val queryAndResults = ResultsRow(queryCell, mostRecentQueryResults)
+
+ Right(queryAndResults)
+ }
+ else Left((StatusCodes.Forbidden, s"Query $queryId belongs to a different user"))
+ }.getOrElse(Left[(StatusCode, String), ResultsRow]((StatusCodes.NotFound, s"No query with id $queryId found")))
}
def queryResultsTable(user: User): Route = path("queryResultsTable") {
- matchQueryParameters(Some(user.username)){ queryParameters:QueryParameters =>
-
+ matchQueryParameters(Some(user.username)) { queryParameters: QueryParameters =>
val queryRowCount: Int = QepQueryDb.db.countPreviousQueriesByUserAndDomain(
userName = user.username,
domain = user.domain
)
val queries: Seq[QepQuery] = QepQueryDb.db.selectPreviousQueriesByUserAndDomain(
userName = user.username,
domain = user.domain,
skip = queryParameters.skipOption,
limit = queryParameters.limitOption
)
//todo revisit json structure to remove things the front-end doesn't use
val adapters: Seq[String] = QepQueryDb.db.selectDistinctAdaptersWithResults
val flags: Map[NetworkQueryId, QueryFlag] = QepQueryDb.db.selectMostRecentQepQueryFlagsFor(queries.map(q => q.networkId).to[Set])
.map(q => q._1 -> QueryFlag(q._2))
val queryResults: Seq[ResultsRow] = queries.map(q => ResultsRow(
- query = QueryCell(q,flags.get(q.networkId)),
+ query = QueryCell(q, flags.get(q.networkId)),
results = QepQueryDb.db.selectMostRecentFullQueryResultsFor(q.networkId).map(Result(_))))
- val table: ResultsTable = ResultsTable(queryRowCount,queryParameters.skipOption.getOrElse(0),adapters,queryResults)
+ val table: ResultsTable = ResultsTable(queryRowCount, queryParameters.skipOption.getOrElse(0), adapters, queryResults)
val jsonTable: Json = Json(table)
val formattedTable: String = Json.format(jsonTable)(humanReadable())
complete(formattedTable)
}
}
def matchQueryParameters(userName: Option[UserName])(parameterRoute: QueryParameters => Route): Route = {
parameters('skip.as[Int].?, 'limit.as[Int].?) { (skipOption, limitOption) =>
val qp = QueryParameters(
userName,
skipOption,
limitOption
)
parameterRoute(qp)
}
}
}
//todo maybe move to QepQueryDb class
case class QueryParameters(
researcherIdOption:Option[UserName] = None,
skipOption:Option[Int] = None,
limitOption:Option[Int] = None
)
case class ResultsTable(
rowCount:Int,
rowOffset:Int,
adapters:Seq[String], //todo type for adapter name
queryResults:Seq[ResultsRow]
)
case class ResultsRow(
query:QueryCell,
results: Seq[Result],
isComplete: Boolean, //a member variable to appear in json
dataVersion:Long //a time stamp in 1.23, a counting integer in a future release
)
object ResultsRow {
def apply(
query: QueryCell,
results: Seq[Result]
): ResultsRow = {
val isComplete = if (results.isEmpty) false
else results.forall(_.isComplete)
val dataVersion = (Seq(query.changeDate) ++ results.map(_.changeDate)).max //the latest change date
ResultsRow(query, results, isComplete, dataVersion)
}
}
case class QueryCell(
networkId:String, //easier to support in json, lessens the impact of using a GUID iff we can get there
queryName: QueryName,
dateCreated: Time,
queryXml: String,
changeDate: Time,
flag:Option[QueryFlag]
)
object QueryCell {
def apply(qepQuery: QepQuery,flag: Option[QueryFlag]): QueryCell = QueryCell(
networkId = qepQuery.networkId.toString,
queryName = qepQuery.queryName,
dateCreated = qepQuery.dateCreated,
queryXml = qepQuery.queryXml,
changeDate = qepQuery.changeDate,
flag
)
}
case class QueryFlag(
flagged:Boolean,
flagMessage:String,
changeDate:Long
)
object QueryFlag{
def apply(qepQueryFlag: QepQueryFlag): QueryFlag = QueryFlag(qepQueryFlag.flagged, qepQueryFlag.flagMessage, qepQueryFlag.changeDate)
}
case class Result (
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:Option[ResultOutputType],
count:Long,
status:String, //todo QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long,
breakdowns: Seq[BreakdownResultsForType],
problemDigest:Option[ProblemDigestForJson]
) {
def isComplete = true //todo until I get to SHRINE-2148
}
object Result {
def apply(fullQueryResult: FullQueryResult): Result = new Result(
resultId = fullQueryResult.resultId,
networkQueryId = fullQueryResult.networkQueryId,
instanceId = fullQueryResult.instanceId,
adapterNode = fullQueryResult.adapterNode,
resultType = fullQueryResult.resultType,
count = fullQueryResult.count,
status = fullQueryResult.status.toString,
statusMessage = fullQueryResult.statusMessage,
changeDate = fullQueryResult.changeDate,
breakdowns = fullQueryResult.breakdownTypeToResults.map(tToR => BreakdownResultsForType(fullQueryResult.adapterNode,tToR._1,tToR._2)).to[Seq],
problemDigest = fullQueryResult.problemDigest.map(ProblemDigestForJson(_))
)
}
//todo replace when you figure out how to json-ize xml in rapture
case class ProblemDigestForJson(codec: String,
stampText: String,
summary: String,
description: String,
detailsString: String,
epoch: Long)
object ProblemDigestForJson {
def apply(problemDigest: ProblemDigest): ProblemDigestForJson = ProblemDigestForJson(
problemDigest.codec,
problemDigest.stampText,
problemDigest.summary,
problemDigest.description,
problemDigest.detailsXml.text,
problemDigest.epoch)
}
case class BreakdownResultsForType(resultType:ResultOutputType,results:Seq[BreakdownResult])
object BreakdownResultsForType {
def apply(adapterName: String, breakdownType: ResultOutputType, breakdowns: Seq[QepQueryBreakdownResultsRow]): BreakdownResultsForType = {
val breakdownResults = breakdowns.filter(_.adapterNode == adapterName).map(row => BreakdownResult(row.dataKey,row.value,row.changeDate))
BreakdownResultsForType(breakdownType,breakdownResults)
}
}
case class BreakdownResult(dataKey:String,value:Long,changeDate:Long)
case class ExceptionWhilePreparingTriggeredResponse(networkQueryId: NetworkQueryId,x:Throwable) extends AbstractProblem(ProblemSources.Qep) {
override def throwable = Some(x)
override def summary: String = "Unable to prepare a triggered response due to an exception."
override def description: String = s"Unable to prepare a promised response for query $networkQueryId due to a ${x.getClass.getSimpleName}"
}
case class ExceptionWhilePreparingTimeoutResponse(networkQueryId: NetworkQueryId,x:Throwable) extends AbstractProblem(ProblemSources.Qep) {
override def throwable = Some(x)
override def summary: String = "Unable to prepare a triggered response due to an exception."
override def description: String = s"Unable to prepare a promised response for $networkQueryId due to a ${x.getClass.getSimpleName}"
}
\ No newline at end of file
diff --git a/apps/steward-app/src/main/scala/net/shrine/steward/StewardService.scala b/apps/steward-app/src/main/scala/net/shrine/steward/StewardService.scala
index 2707b6142..554df06ef 100644
--- a/apps/steward-app/src/main/scala/net/shrine/steward/StewardService.scala
+++ b/apps/steward-app/src/main/scala/net/shrine/steward/StewardService.scala
@@ -1,375 +1,388 @@
package net.shrine.steward
import akka.actor.Actor
import akka.event.Logging
import net.shrine.authentication.UserAuthenticator
import net.shrine.authorization.steward._
import net.shrine.i2b2.protocol.pm.User
import net.shrine.serialization.NodeSeqSerializer
import net.shrine.source.ConfigSource
import net.shrine.steward.db._
import net.shrine.steward.pmauth.Authorizer
import org.json4s.native.Serialization
import shapeless.HNil
import spray.http.{HttpRequest, HttpResponse, StatusCodes}
import spray.httpx.Json4sSupport
import spray.routing.directives.LogEntry
import spray.routing._
import org.json4s.{DefaultFormats, DefaultJsonFormats, Formats, Serialization}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}
// we don't implement our route structure directly in the service actor because
// we want to be able to test it independently, without having to spin up an actor
class StewardServiceActor extends Actor with StewardService {
// the HttpService trait defines only one abstract member, which
// connects the services environment to the enclosing actor or test
def actorRefFactory = context
// this actor only runs our route, but you could add
// other things here, like request stream processing
// or timeout handling
def receive = runRoute(route)
}
// this trait defines our service behavior independently from the service actor
trait StewardService extends HttpService with Json4sSupport {
implicit def json4sFormats: Formats = DefaultFormats + new NodeSeqSerializer
val userAuthenticator = UserAuthenticator(ConfigSource.config)
//don't need to do anything special for unauthorized users, but they do need access to a static form.
lazy val route:Route = gruntWatchCorsSupport{
requestLogRoute ~ fullLogRoute
}
lazy val requestLogRoute = logRequestResponse(logEntryForRequest _) {
redirectToIndex ~ staticResources ~ makeTrouble ~ about
}
lazy val fullLogRoute = logRequestResponse(logEntryForRequestResponse _) {
routeForQepUser ~ authenticatedInBrowser
}
// logs just the request method, uri and response at info level
//logging is controlled by Akka's config, slf4j, and log4j config
def logEntryForRequestResponse(req: HttpRequest): Any => Option[LogEntry] = {
case res: HttpResponse => {
Some(LogEntry(s"\n Request: $req \n Response: $res", Logging.InfoLevel))
}
case _ => None // other kind of responses
}
// logs just the request method, uri and response status at info level
def logEntryForRequest(req: HttpRequest): Any => Option[LogEntry] = {
case res: HttpResponse => {
Some(LogEntry(s"\n Request: $req \n Response status: ${res.status}", Logging.InfoLevel))
}
case _ => None // other kind of responses
}
//pathPrefixTest shields the QEP code from the redirect.
def authenticatedInBrowser: Route = pathPrefixTest("user"|"steward"|"researcher") {
reportIfFailedToAuthenticate {
- authenticate(userAuthenticator.basicUserAuthenticator) { user =>
-
- StewardDatabase.db.upsertUser(user)
-
+ authenticate(userAuthenticator.basicUserAuthenticator) { user =>
+ StewardDatabase.db.upsertUser(user)
pathPrefix("user") {userRoute(user)} ~
pathPrefix("steward") {stewardRoute(user)} ~
pathPrefix("researcher") {researcherRoute(user)}
}
}
}
val reportIfFailedToAuthenticate = routeRouteResponse {
case Rejected(List(AuthenticationFailedRejection(_,_))) =>
complete("AuthenticationFailed")
}
def makeTrouble = pathPrefix("makeTrouble") {
complete(throw new IllegalStateException("fake trouble"))
}
lazy val redirectToIndex = pathEnd {
redirect("steward/client/index.html", StatusCodes.PermanentRedirect) //todo pick up "steward" programatically
} ~
( path("index.html") | pathSingleSlash) {
redirect("client/index.html", StatusCodes.PermanentRedirect)
}
lazy val staticResources = pathPrefix("client") {
pathEnd {
redirect("client/index.html", StatusCodes.PermanentRedirect)
} ~
pathSingleSlash {
redirect("index.html", StatusCodes.PermanentRedirect)
} ~ {
getFromResourceDirectory("client")
}
}
lazy val about = pathPrefix("about") {
path("createTopicsMode") {
get {
complete(CreateTopicsMode.createTopicsInState.name)
}
}
}
def userRoute(user:User):Route = get {
pathPrefix("whoami") {
complete(OutboundUser.createFromUser(user))
}
}
def routeForQepUser:Route = pathPrefix("qep") {
- authenticate(userAuthenticator.basicUserAuthenticator) { user =>
+ authenticate(userAuthenticator.basicUserAuthenticator) { user =>
- StewardDatabase.db.upsertUser(user)
+ StewardDatabase.db.upsertUser(user)
- authorize(Authorizer.authorizeQep(user)) {
- pathPrefix("requestQueryAccess") ( requestQueryAccess ) ~
- pathPrefix("approvedTopics") ( getApprovedTopicsForUser )
+ authorize(Authorizer.authorizeQep(user)) {
+ pathPrefix("requestQueryAccess")(requestQueryAccess) ~
+ pathPrefix("approvedTopics")(getApprovedTopicsForUser)
}
}
}
def requestQueryAccess:Route = post {
requestQueryAccessWithTopic ~ requestQueryAccessWithoutTopic
}
def requestQueryAccessWithTopic:Route = path("user" /Segment/ "topic" / IntNumber) { (userId,topicId) =>
- entity(as[InboundShrineQuery]) { shrineQuery:InboundShrineQuery =>
- //todo really pull the user out of the shrine query and check vs the PM. If they aren't there, reject them for this new reason
- val result: (TopicState, Option[TopicIdAndName]) = StewardDatabase.db.logAndCheckQuery(userId,Some(topicId),shrineQuery)
+ entity(as[InboundShrineQuery]) { shrineQuery: InboundShrineQuery =>
+ //todo really pull the user out of the shrine query and check vs the PM. If they aren't there, reject them for this new reason
+ val result: (TopicState, Option[TopicIdAndName]) = StewardDatabase.db.logAndCheckQuery(userId, Some(topicId), shrineQuery)
- respondWithStatus(result._1.statusCode) {
- if(result._1.statusCode == StatusCodes.OK) complete (result._2.getOrElse(""))
- else complete(result._1.message)
+ respondWithStatus(result._1.statusCode) {
+ if (result._1.statusCode == StatusCodes.OK) complete(result._2.getOrElse(""))
+ else complete(result._1.message)
}
}
}
def requestQueryAccessWithoutTopic:Route = path("user" /Segment) { userId =>
- entity(as[InboundShrineQuery]) { shrineQuery:InboundShrineQuery =>
- //todo really pull the user out of the shrine query and check vs the PM. If they aren't there, reject them for this new reason
- val result = StewardDatabase.db.logAndCheckQuery(userId,None,shrineQuery)
- respondWithStatus(result._1.statusCode) {
- if(result._1.statusCode == StatusCodes.OK) complete (result._2)
- else complete(result._1.message)
+ entity(as[InboundShrineQuery]) { shrineQuery: InboundShrineQuery =>
+ //todo really pull the user out of the shrine query and check vs the PM. If they aren't there, reject them for this new reason
+ val result = StewardDatabase.db.logAndCheckQuery(userId, None, shrineQuery)
+ respondWithStatus(result._1.statusCode) {
+ if (result._1.statusCode == StatusCodes.OK) complete(result._2)
+ else complete(result._1.message)
}
}
}
lazy val getApprovedTopicsForUser:Route = get {
//todo change to "researcher"
path("user" /Segment) { userId =>
- //todo really pull the user out of the shrine query and check vs the PM. If they aren't there, reject them for this new reason
- val queryParameters = QueryParameters(researcherIdOption = Some(userId),stateOption = Some(TopicState.approved))
- val researchersTopics = StewardDatabase.db.selectTopicsForResearcher(queryParameters)
+ //todo really pull the user out of the shrine query and check vs the PM. If they aren't there, reject them for this new reason
+ val queryParameters = QueryParameters(researcherIdOption = Some(userId), stateOption = Some(TopicState.approved))
+ val researchersTopics = StewardDatabase.db.selectTopicsForResearcher(queryParameters)
- complete(researchersTopics)
+ complete(researchersTopics)
}
}
def researcherRoute(user:User):Route = authorize(Authorizer.authorizeResearcher(user)) {
pathPrefix("topics") { getUserTopics(user.username) } ~
pathPrefix("queryHistory") { getUserQueryHistory(Some(user.username)) } ~
pathPrefix("requestTopicAccess") { requestTopicAccess(user) } ~
pathPrefix("editTopicRequest") { editTopicRequest(user) }
}
def getUserTopics(userId:UserName):Route = get {
//lookup topics for this user in the db
- matchQueryParameters(Some(userId)){queryParameters:QueryParameters =>
- val researchersTopics = StewardDatabase.db.selectTopicsForResearcher(queryParameters)
- complete(researchersTopics)
- }
+ matchQueryParameters(Some(userId)) { queryParameters: QueryParameters =>
+ detach() {
+ val researchersTopics = StewardDatabase.db.selectTopicsForResearcher(queryParameters)
+ complete(researchersTopics)
+ }
+ }
}
def matchQueryParameters(userName: Option[UserName])(parameterRoute:QueryParameters => Route): Route = {
parameters('state.?,'skip.as[Int].?,'limit.as[Int].?,'sortBy.as[String].?,'sortDirection.as[String].?,'minDate.as[Date].?,'maxDate.as[Date].?) { (stateStringOption,skipOption,limitOption,sortByOption,sortOption,minDate,maxDate) =>
val stateTry = TopicState.stateForStringOption(stateStringOption)
stateTry match {
case Success(stateOption) =>
val qp = QueryParameters(userName,
stateOption,
skipOption,
limitOption,
sortByOption,
SortOrder.sortOrderForStringOption(sortOption),
minDate,
maxDate
)
parameterRoute(qp)
case Failure(ex) => badStateRoute(stateStringOption)
}
}
}
def badStateRoute(stateStringOption:Option[String]):Route = {
respondWithStatus(StatusCodes.UnprocessableEntity) {
complete(s"Topic state ${stateStringOption.getOrElse(s"$stateStringOption (stateStringOption should never be None at this point)")} unknown. Please specify one of ${TopicState.namesToStates.keySet}")
}
}
def getUserQueryHistory(userIdOption:Option[UserName]):Route = get {
parameter('asJson.as[Boolean].?) { asJson =>
path("topic" / IntNumber) { topicId: TopicId =>
getQueryHistoryForUserByTopic(userIdOption, Some(topicId), asJson)
} ~
getQueryHistoryForUserByTopic(userIdOption, None, asJson)
}
}
def getQueryHistoryForUserByTopic(userIdOption: Option[UserName],
topicIdOption: Option[TopicId],
asJson: Option[Boolean]) =
get {
- matchQueryParameters(userIdOption) { queryParameters: QueryParameters =>
- val queryHistory = StewardDatabase.db.selectQueryHistory(queryParameters, topicIdOption)
-
- if (asJson.getOrElse(false))
- complete(queryHistory.convertToJson)
- else
- complete(queryHistory)
+ detach() {
+ matchQueryParameters(userIdOption) { queryParameters: QueryParameters =>
+ val queryHistory = StewardDatabase.db.selectQueryHistory(queryParameters, topicIdOption)
+
+ if (asJson.getOrElse(false))
+ complete(queryHistory.convertToJson)
+ else
+ complete(queryHistory)
+ }
}
}
def requestTopicAccess(user:User):Route = post {
entity(as[InboundTopicRequest]) { topicRequest: InboundTopicRequest =>
//todo notify the data stewards
- StewardDatabase.db.createRequestForTopicAccess(user,topicRequest)
-
- complete(StatusCodes.Accepted)
+ detach() {
+ StewardDatabase.db.createRequestForTopicAccess(user, topicRequest)
+ complete(StatusCodes.Accepted)
+ }
}
}
def editTopicRequest(user:User):Route = post {
path(IntNumber) { topicId =>
entity(as[InboundTopicRequest]) { topicRequest: InboundTopicRequest =>
//todo notify the data stewards
- val updatedTopicTry:Try[OutboundTopic] = StewardDatabase.db.updateRequestForTopicAccess(user, topicId, topicRequest)
-
- updatedTopicTry match {
- case Success(updatedTopic) =>
- respondWithStatus(StatusCodes.Accepted) {
- complete(updatedTopic)
+ detach() {
+ val updatedTopicTry: Try[OutboundTopic] = StewardDatabase.db.updateRequestForTopicAccess(user, topicId, topicRequest)
+
+ updatedTopicTry match {
+ case Success(updatedTopic) =>
+ respondWithStatus(StatusCodes.Accepted) {
+ complete(updatedTopic)
+ }
+
+ case Failure(x) => x match {
+ case x: TopicDoesNotExist => respondWithStatus(StatusCodes.NotFound) {
+ complete(x.getMessage)
+ }
+ case x: ApprovedTopicCanNotBeChanged => respondWithStatus(StatusCodes.Forbidden) {
+ complete(x.getMessage)
+ }
+ case x: DetectedAttemptByWrongUserToChangeTopic => respondWithStatus(StatusCodes.Forbidden) {
+ complete(x.getMessage)
+ }
+ case _ => throw x
}
-
- case Failure(x) => x match {
- case x:TopicDoesNotExist => respondWithStatus(StatusCodes.NotFound) {
- complete(x.getMessage)
- }
- case x:ApprovedTopicCanNotBeChanged => respondWithStatus(StatusCodes.Forbidden) {
- complete(x.getMessage)
- }
- case x:DetectedAttemptByWrongUserToChangeTopic => respondWithStatus(StatusCodes.Forbidden) {
- complete(x.getMessage)
- }
- case _ => throw x
}
}
}
}
}
def stewardRoute(user:User):Route = authorize(Authorizer.authorizeSteward(user)) {
pathPrefix("queryHistory" / "user") {getUserQueryHistory } ~
pathPrefix("queryHistory") {getQueryHistory} ~
pathPrefix("topics" / "user")(getUserTopicsForSteward) ~
path("topics"){getTopicsForSteward} ~
pathPrefix("approveTopic")(approveTopicForUser(user)) ~
pathPrefix("rejectTopic")(rejectTopicForUser(user)) ~
pathPrefix("statistics"){getStatistics}
}
lazy val getUserQueryHistory:Route = pathPrefix(Segment) { userId =>
getUserQueryHistory(Some(userId))
}
lazy val getQueryHistory:Route = getUserQueryHistory(None)
lazy val getTopicsForSteward:Route = getTopicsForSteward(None)
lazy val getUserTopicsForSteward:Route = path(Segment) { userId =>
getTopicsForSteward(Some(userId))
}
def getTopicsForSteward(userIdOption:Option[UserName]):Route = get {
//lookup topics for this user in the db
matchQueryParameters(userIdOption) { queryParameters: QueryParameters =>
- val stewardsTopics:StewardsTopics = StewardDatabase.db.selectTopicsForSteward(queryParameters)
+ detach() {
+ val stewardsTopics: StewardsTopics = StewardDatabase.db.selectTopicsForSteward(queryParameters)
- complete(stewardsTopics)
+ complete(stewardsTopics)
+ }
}
}
def approveTopicForUser(user:User):Route = changeStateForTopic(TopicState.approved,user)
def rejectTopicForUser(user:User):Route = changeStateForTopic(TopicState.rejected,user)
def changeStateForTopic(state:TopicState,user:User):Route = post {
path("topic" / IntNumber) { topicId =>
- StewardDatabase.db.changeTopicState(topicId, state, user.username).fold(respondWithStatus(StatusCodes.UnprocessableEntity){
- complete(s"No topic found for $topicId")
- })(topic => complete(StatusCodes.OK))
+ detach() {
+ StewardDatabase.db.changeTopicState(topicId, state, user.username).fold(respondWithStatus(StatusCodes.UnprocessableEntity) {
+ complete(s"No topic found for $topicId")
+ })(topic => complete(StatusCodes.OK))
+ }
}
}
def getStatistics:Route = pathPrefix("queriesPerUser"){getQueriesPerUser} ~
pathPrefix("topicsPerState"){getTopicsPerState}
def getQueriesPerUser:Route = get{
matchQueryParameters(None) { queryParameters: QueryParameters =>
- val result = StewardDatabase.db.selectShrineQueryCountsPerUser(queryParameters)
+ detach() {
+ val result = StewardDatabase.db.selectShrineQueryCountsPerUser(queryParameters)
- complete(result)
+ complete(result)
+ }
}
}
def getTopicsPerState:Route = get{
matchQueryParameters(None) { queryParameters: QueryParameters =>
- val result = StewardDatabase.db.selectTopicCountsPerState(queryParameters)
- complete(result)
+ detach() {
+ val result = StewardDatabase.db.selectTopicCountsPerState(queryParameters)
+ complete(result)
+ }
}
}
}
//adapted from https://gist.github.com/joseraya/176821d856b43b1cfe19
object gruntWatchCorsSupport extends Directive0 with RouteConcatenation {
import spray.http.HttpHeaders.{`Access-Control-Allow-Methods`, `Access-Control-Max-Age`, `Access-Control-Allow-Headers`,`Access-Control-Allow-Origin`}
import spray.routing.directives.RespondWithDirectives.respondWithHeaders
import spray.routing.directives.MethodDirectives.options
import spray.routing.directives.RouteDirectives.complete
import spray.http.HttpMethods.{OPTIONS,GET,POST}
import spray.http.AllOrigins
private val allowOriginHeader = `Access-Control-Allow-Origin`(AllOrigins)
private val optionsCorsHeaders = List(
`Access-Control-Allow-Headers`("Origin, X-Requested-With, Content-Type, Accept, Accept-Encoding, Accept-Language, Host, Referer, User-Agent, Authorization"),
`Access-Control-Max-Age`(1728000)) //20 days
val gruntWatch:Boolean = ConfigSource.config.getBoolean("shrine.steward.gruntWatch")
override def happly(f: (HNil) => Route): Route = {
if(gruntWatch) {
options {
respondWithHeaders(`Access-Control-Allow-Methods`(OPTIONS, GET, POST) :: allowOriginHeader :: optionsCorsHeaders){
complete(StatusCodes.OK)
}
} ~ f(HNil)
}
else f(HNil)
}
}
diff --git a/apps/steward-app/src/main/scala/net/shrine/steward/db/StewardDatabase.scala b/apps/steward-app/src/main/scala/net/shrine/steward/db/StewardDatabase.scala
index 90ad3ec10..68631f4c4 100644
--- a/apps/steward-app/src/main/scala/net/shrine/steward/db/StewardDatabase.scala
+++ b/apps/steward-app/src/main/scala/net/shrine/steward/db/StewardDatabase.scala
@@ -1,767 +1,764 @@
package net.shrine.steward.db
import java.sql.SQLException
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import javax.sql.DataSource
import com.typesafe.config.Config
import net.shrine.authorization.steward.{Date, ExternalQueryId, InboundShrineQuery, InboundTopicRequest, OutboundShrineQuery, OutboundTopic, OutboundUser, QueriesPerUser, QueryContents, QueryHistory, ResearcherToAudit, ResearchersTopics, StewardQueryId, StewardsTopics, TopicId, TopicIdAndName, TopicState, TopicStateName, TopicsPerState, UserName, researcherRole, stewardRole}
import net.shrine.i2b2.protocol.pm.User
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemSources}
import net.shrine.slick.{CouldNotRunDbIoActionException, DbIoActionException, NeedsWarmUp, TestableDataSourceCreator, TimeoutInDbIoActionException}
import net.shrine.source.ConfigSource
import net.shrine.steward.CreateTopicsMode
import slick.dbio.Effect.Read
import slick.driver.JdbcProfile
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, Future, blocking}
import scala.language.postfixOps
import scala.util.Try
import scala.util.control.NonFatal
/**
* Database access code for the data steward service.
*
* I'm not letting Slick handle foreign key resolution for now. I want to keep that logic separate to handle dirty data with some grace.
*
* @author dwalend
* @since 1.19
*/
case class StewardDatabase(schemaDef:StewardSchema,dataSource: DataSource) extends Loggable {
import schemaDef._
import jdbcProfile.api._
val database = Database.forDataSource(dataSource)
def createTables() = schemaDef.createTables(database)
def dropTables() = schemaDef.dropTables(database)
//todo share code from DashboardProblemDatabase.scala . It's a lot richer. See SHRINE-1835
def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = {
val timeout = 10 seconds ;
try {
val future: Future[R] = database.run(action)
blocking {
Await.result(future, timeout)
}
} catch {
case tax:TopicAcessException => throw tax
case tx:TimeoutException =>
val x = TimeoutInDbIoActionException(dataSource, timeout, tx)
StewardDatabaseProblem(x)
throw x
case NonFatal(nfx) =>
val x = CouldNotRunDbIoActionException(dataSource, nfx)
StewardDatabaseProblem(x)
throw x
}
}
def warmUp = {
dbRun(allUserQuery.size.result)
}
def selectUsers:Seq[UserRecord] = {
dbRun(allUserQuery.result)
}
// todo use whenever a shrine query is logged
def upsertUser(user:User):Unit = {
val userRecord = UserRecord(user)
dbRun(allUserQuery.insertOrUpdate(userRecord))
}
def createRequestForTopicAccess(user:User,topicRequest:InboundTopicRequest):TopicRecord = {
val createInState = CreateTopicsMode.createTopicsInState
val now = System.currentTimeMillis()
val topicRecord = TopicRecord(Some(nextTopicId.getAndIncrement),topicRequest.name,topicRequest.description,user.username,now,createInState.topicState)
val userTopicRecord = UserTopicRecord(user.username,topicRecord.id.get,TopicState.approved,user.username,now)
dbRun(for{
_ <- allTopicQuery += topicRecord
_ <- allUserTopicQuery += userTopicRecord
} yield topicRecord)
}
def updateRequestForTopicAccess(user:User,topicId:TopicId,topicRequest:InboundTopicRequest):Try[OutboundTopic] = Try {
-
- dbRun(mostRecentTopicQuery.filter(_.id === topicId).result.headOption.flatMap{ option =>
+ dbRun(mostRecentTopicQuery.filter(_.id === topicId).result.headOption.flatMap { option =>
val oldTopicRecord = option.getOrElse(throw TopicDoesNotExist(topicId = topicId))
- if(user.username != oldTopicRecord.createdBy) throw DetectedAttemptByWrongUserToChangeTopic(topicId,user.username,oldTopicRecord.createdBy)
- if(oldTopicRecord.state == TopicState.approved) throw ApprovedTopicCanNotBeChanged(topicId)
+ if (user.username != oldTopicRecord.createdBy) throw DetectedAttemptByWrongUserToChangeTopic(topicId, user.username, oldTopicRecord.createdBy)
+ if (oldTopicRecord.state == TopicState.approved) throw ApprovedTopicCanNotBeChanged(topicId)
val updatedTopic = oldTopicRecord.copy(name = topicRequest.name,
- description = topicRequest.description,
- changedBy = user.username,
- changeDate = System.currentTimeMillis())
- (allTopicQuery += updatedTopic).flatMap{_ =>
- outboundUsersForNamesAction(Set(updatedTopic.createdBy,updatedTopic.changedBy)).map(updatedTopic.toOutboundTopic)
+ description = topicRequest.description,
+ changedBy = user.username,
+ changeDate = System.currentTimeMillis())
+ (allTopicQuery += updatedTopic).flatMap { _ =>
+ outboundUsersForNamesAction(Set(updatedTopic.createdBy, updatedTopic.changedBy)).map(updatedTopic.toOutboundTopic)
}
- }
- )
+ })
}
def selectTopicsForResearcher(parameters:QueryParameters):ResearchersTopics = {
require(parameters.researcherIdOption.isDefined,"A researcher's parameters must supply a user id")
val (count,topics,userNamesToOutboundUsers) = dbRun(
for{
count <- topicCountQuery(parameters).length.result
topics <- topicSelectQuery(parameters).result
userNamesToOutboundUsers <- outboundUsersForNamesAction((topics.map(_.createdBy) ++ topics.map(_.changedBy)).to[Set])
} yield (count, topics,userNamesToOutboundUsers))
ResearchersTopics(parameters.researcherIdOption.get,
count,
parameters.skipOption.getOrElse(0),
topics.map(_.toOutboundTopic(userNamesToOutboundUsers)))
}
//treat as private (currently used in test)
def selectTopics(queryParameters: QueryParameters):Seq[TopicRecord] = {
dbRun(topicSelectQuery(queryParameters).result)
}
def selectTopicsForSteward(queryParameters: QueryParameters):StewardsTopics = {
- val (count,topics,userNamesToOutboundUsers) = dbRun{
- for{
- count <- topicCountQuery(queryParameters).length.result
- topics <- topicSelectQuery(queryParameters).result
- userNamesToOutboundUsers <- outboundUsersForNamesAction((topics.map(_.createdBy) ++ topics.map(_.changedBy)).to[Set])
- } yield (count,topics,userNamesToOutboundUsers)
- }
+ val (count, topics, userNamesToOutboundUsers) = dbRun {
+ for {
+ count <- topicCountQuery(queryParameters).length.result
+ topics <- topicSelectQuery(queryParameters).result
+ userNamesToOutboundUsers <- outboundUsersForNamesAction((topics.map(_.createdBy) ++ topics.map(_.changedBy)).to[Set])
+ } yield (count, topics, userNamesToOutboundUsers)
+ }
StewardsTopics(count,
queryParameters.skipOption.getOrElse(0),
topics.map(_.toOutboundTopic(userNamesToOutboundUsers)))
}
private def topicSelectQuery(queryParameters: QueryParameters):Query[TopicTable, TopicTable#TableElementType, Seq] = {
val countFilter = topicCountQuery(queryParameters)
//todo is there some way to do something with a map from column names to columns that I don't have to update? I couldn't find one.
// val orderByQuery = queryParameters.sortByOption.fold(countFilter)(
// columnName => limitFilter.sortBy(x => queryParameters.sortOrder.orderForColumn(countFilter.columnForName(columnName))))
val orderByQuery = queryParameters.sortByOption.fold(countFilter)(
columnName => countFilter.sortBy(x => queryParameters.sortOrder.orderForColumn(columnName match {
case "id" => x.id
case "name" => x.name
case "description" => x.description
case "createdBy" => x.createdBy
case "createDate" => x.createDate
case "state" => x.state
case "changedBy" => x.changedBy
case "changeDate" => x.changeDate
})))
val skipFilter = queryParameters.skipOption.fold(orderByQuery)(skip => orderByQuery.drop(skip))
val limitFilter = queryParameters.limitOption.fold(skipFilter)(limit => skipFilter.take(limit))
limitFilter
}
private def topicCountQuery(queryParameters: QueryParameters):Query[TopicTable, TopicTable#TableElementType, Seq] = {
val allTopics:Query[TopicTable, TopicTable#TableElementType, Seq] = mostRecentTopicQuery
val researcherFilter = queryParameters.researcherIdOption.fold(allTopics)(userId => allTopics.filter(_.createdBy === userId))
val stateFilter = queryParameters.stateOption.fold(researcherFilter)(state => researcherFilter.filter(_.state === state.name))
val minDateFilter = queryParameters.minDate.fold(stateFilter)(minDate => stateFilter.filter(_.changeDate >= minDate))
val maxDateFilter = queryParameters.maxDate.fold(minDateFilter)(maxDate => minDateFilter.filter(_.changeDate <= maxDate))
maxDateFilter
}
def changeTopicState(topicId:TopicId,state:TopicState,userId:UserName):Option[TopicRecord] = {
val noTopicRecord:Option[TopicRecord] = None
val noOpDBIO:DBIOAction[Option[TopicRecord], NoStream, Effect.Write] = DBIO.successful(noTopicRecord)
-
- dbRun(mostRecentTopicQuery.filter(_.id === topicId).result.headOption.flatMap(
- _.fold(noOpDBIO){ originalTopic =>
- val updatedTopic = originalTopic.copy(state = state, changedBy = userId, changeDate = System.currentTimeMillis())
- (allTopicQuery += updatedTopic).map(_ => Option(updatedTopic))
- }
- ))
+ dbRun(mostRecentTopicQuery.filter(_.id === topicId).result.headOption.flatMap(
+ _.fold(noOpDBIO) { originalTopic =>
+ val updatedTopic = originalTopic.copy(state = state, changedBy = userId, changeDate = System.currentTimeMillis())
+ (allTopicQuery += updatedTopic).map(_ => Option(updatedTopic))
+ }
+ ))
}
def selectTopicCountsPerState(queryParameters: QueryParameters):TopicsPerState = {
dbRun(for{
totalTopics <- topicCountQuery(queryParameters).length.result
topicsPerStateName <- topicCountsPerState(queryParameters).result
} yield TopicsPerState(totalTopics,topicsPerStateName))
}
private def topicCountsPerState(queryParameters: QueryParameters): Query[(Rep[TopicStateName], Rep[Int]), (TopicStateName, Int), Seq] = {
val groupedByState = topicCountQuery(queryParameters).groupBy(topicRecord => topicRecord.state)
groupedByState.map{case (state,result) => (state,result.length)}
}
def logAndCheckQuery(userId:UserName,topicId:Option[TopicId],shrineQuery:InboundShrineQuery):(TopicState,Option[TopicIdAndName]) = {
//todo upsertUser(user) when the info is available from the PM
val noOpDBIOForState: DBIOAction[TopicState, NoStream, Effect.Read] = DBIO.successful {
if (CreateTopicsMode.createTopicsInState == CreateTopicsMode.TopicsIgnoredJustLog) TopicState.approved
else TopicState.createTopicsModeRequiresTopic
}
val noOpDBIOForTopicName: DBIOAction[Option[String], NoStream, Read] = DBIO.successful{None}
val (state,topicName) = dbRun(for{
state <- topicId.fold(noOpDBIOForState)( someTopicId =>
mostRecentTopicQuery.filter(_.id === someTopicId).filter(_.createdBy === userId).map(_.state).result.headOption.map(
_.fold(TopicState.unknownForUser)(state => TopicState.namesToStates(state)))
)
topicName <- topicId.fold(noOpDBIOForTopicName)( someTopicId =>
mostRecentTopicQuery.filter(_.id === someTopicId).filter(_.createdBy === userId).map(_.name).result.headOption
)
_ <- allQueryTable += ShrineQueryRecord(userId,topicId,shrineQuery,state)
} yield (state,topicName))
val topicIdAndName:Option[TopicIdAndName] = (topicId,topicName) match {
case (Some(id),Some(name)) => Option(TopicIdAndName(id.toString,name))
case (None,None) => None
case (Some(id),None) =>
if(state == TopicState.unknownForUser) None
else throw new IllegalStateException(s"How did you get here for $userId with $id and $state for $shrineQuery")
case (None,Some(name)) =>
if(state == TopicState.unknownForUser) None
else throw new IllegalStateException(s"How did you get here for $userId with no topic id but a topic name of $name and $state for $shrineQuery")
}
(state,topicIdAndName)
}
def selectQueryHistory(queryParameters: QueryParameters, topicParameter:Option[TopicId]):
QueryHistory = {
val topicQuery = for {
count <- shrineQueryCountQuery(queryParameters, topicParameter).length.result
shrineQueries <- shrineQuerySelectQuery(queryParameters, topicParameter).result
topics <- mostRecentTopicQuery.filter(_.id.inSet(shrineQueries.map(_.topicId).to[Set].flatten)).result
userNamesToOutboundUsers <- outboundUsersForNamesAction(shrineQueries.map(_.userId).to[Set] ++ (topics.map(_.createdBy) ++ topics.map(_.changedBy)).to[Set])
} yield (count, shrineQueries, topics, userNamesToOutboundUsers)
val (count, shrineQueries, topics, userNamesToOutboundUsers) = dbRun(topicQuery)
val topicIdsToTopics: Map[Option[TopicId], TopicRecord] = topics.map(x => (x.id, x)).toMap
def toOutboundShrineQuery(queryRecord: ShrineQueryRecord): OutboundShrineQuery = {
val topic = topicIdsToTopics.get(queryRecord.topicId)
val outboundTopic: Option[OutboundTopic] = topic.map(_.toOutboundTopic(userNamesToOutboundUsers))
val outboundUserOption = userNamesToOutboundUsers.get(queryRecord.userId)
//todo if a user is unknown and the system is in a mode that requires everyone to log into the data steward notify the data steward
val outboundUser: OutboundUser = outboundUserOption.getOrElse(OutboundUser.createUnknownUser(queryRecord.userId))
queryRecord.createOutboundShrineQuery(outboundTopic, outboundUser)
}
QueryHistory(count, queryParameters.skipOption.getOrElse(0), shrineQueries.map(toOutboundShrineQuery))
}
private def outboundUsersForNamesAction(userNames:Set[UserName]):DBIOAction[Map[UserName, OutboundUser], NoStream, Read] = {
allUserQuery.filter(_.userName.inSet(userNames)).result.map(_.map(x => (x.userName,x.asOutboundUser)).toMap)
}
private def shrineQuerySelectQuery(queryParameters: QueryParameters,topicParameter:Option[TopicId]):Query[QueryTable, QueryTable#TableElementType, Seq] = {
val countQuery = shrineQueryCountQuery(queryParameters,topicParameter)
//todo is there some way to do something with a map from column names to columns that I don't have to update? I couldn't find one.
// val orderByQuery = queryParameters.sortByOption.fold(limitFilter)(
// columnName => limitFilter.sortBy(x => queryParameters.sortOrder.orderForColumn(allQueryTable.columnForName(columnName))))
val orderByQuery = queryParameters.sortByOption.fold(countQuery) {
case "topicName" =>
val joined = countQuery.join(mostRecentTopicQuery).on(_.topicId === _.id)
joined.sortBy(x => queryParameters.sortOrder.orderForColumn(x._2.name)).map(x => x._1)
case columnName => countQuery.sortBy(x => queryParameters.sortOrder.orderForColumn(columnName match {
case "stewardId" => x.stewardId
case "externalId" => x.externalId
case "researcherId" => x.researcherId
case "name" => x.name
case "topic" => x.topicId
case "queryContents" => x.queryContents
case "stewardResponse" => x.stewardResponse
case "date" => x.date
}))
}
val skipFilter = queryParameters.skipOption.fold(orderByQuery)(skip => orderByQuery.drop(skip))
val limitFilter = queryParameters.limitOption.fold(skipFilter)(limit => skipFilter.take(limit))
limitFilter
}
private def shrineQueryCountQuery(queryParameters: QueryParameters,topicParameter:Option[TopicId]):Query[QueryTable, QueryTable#TableElementType, Seq] = {
- val allShrineQueries:Query[QueryTable, QueryTable#TableElementType, Seq] = allQueryTable
+ val allShrineQueries: Query[QueryTable, QueryTable#TableElementType, Seq] = allQueryTable
- val topicFilter:Query[QueryTable, QueryTable#TableElementType, Seq] = topicParameter.fold(allShrineQueries)(topicId => allShrineQueries.filter(_.topicId === topicId))
+ val topicFilter: Query[QueryTable, QueryTable#TableElementType, Seq] = topicParameter.fold(allShrineQueries)(topicId => allShrineQueries.filter(_.topicId === topicId))
- val researcherFilter:Query[QueryTable, QueryTable#TableElementType, Seq] = queryParameters.researcherIdOption.fold(topicFilter)(researcherId => topicFilter.filter(_.researcherId === researcherId))
- //todo this is probably a binary Approved/Not approved
- val stateFilter:Query[QueryTable, QueryTable#TableElementType, Seq] = queryParameters.stateOption.fold(researcherFilter)(stewardResponse => researcherFilter.filter(_.stewardResponse === stewardResponse.name))
+ val researcherFilter: Query[QueryTable, QueryTable#TableElementType, Seq] = queryParameters.researcherIdOption.fold(topicFilter)(researcherId => topicFilter.filter(_.researcherId === researcherId))
+ //todo this is probably a binary Approved/Not approved
+ val stateFilter: Query[QueryTable, QueryTable#TableElementType, Seq] = queryParameters.stateOption.fold(researcherFilter)(stewardResponse => researcherFilter.filter(_.stewardResponse === stewardResponse.name))
- val minDateFilter = queryParameters.minDate.fold(stateFilter)(minDate => stateFilter.filter(_.date >= minDate))
- val maxDateFilter = queryParameters.maxDate.fold(minDateFilter)(maxDate => minDateFilter.filter(_.date <= maxDate))
+ val minDateFilter = queryParameters.minDate.fold(stateFilter)(minDate => stateFilter.filter(_.date >= minDate))
+ val maxDateFilter = queryParameters.maxDate.fold(minDateFilter)(maxDate => minDateFilter.filter(_.date <= maxDate))
- maxDateFilter
+ maxDateFilter
}
def selectShrineQueryCountsPerUser(queryParameters: QueryParameters):QueriesPerUser = {
val (totalQueries,queriesPerUser,userNamesToOutboundUsers) = dbRun(for {
totalQueries <- shrineQueryCountQuery(queryParameters,None).length.result
queriesPerUser <- shrineQueryCountsPerResearcher(queryParameters).result
userNamesToOutboundUsers <- outboundUsersForNamesAction(queriesPerUser.map(x => x._1).to[Set])
} yield (totalQueries,queriesPerUser,userNamesToOutboundUsers))
val queriesPerOutboundUser:Seq[(OutboundUser,Int)] = queriesPerUser.map(x => (userNamesToOutboundUsers(x._1),x._2))
QueriesPerUser(totalQueries,queriesPerOutboundUser)
}
private def shrineQueryCountsPerResearcher(queryParameters: QueryParameters): Query[(Rep[UserName],Rep[Int]),(UserName,Int),Seq] = {
val filteredShrineQueries:Query[QueryTable, QueryTable#TableElementType, Seq] = shrineQueryCountQuery(queryParameters,None)
val groupedByResearcher = filteredShrineQueries.groupBy(shrineQuery => shrineQuery.researcherId)
groupedByResearcher.map{case (researcher,result) => (researcher,result.length)}
}
lazy val nextTopicId:AtomicInteger = new AtomicInteger({
dbRun(allTopicQuery.map(_.id).max.result).getOrElse(0) + 1
})
def selectAllAuditRequests: Seq[UserAuditRecord] = {
dbRun(allUserAudits.result)
}
def selectMostRecentAuditRequests: Seq[UserAuditRecord] = {
dbRun(mostRecentUserAudits.result)
}
def selectResearchersToAudit(maxQueryCountBetweenAudits:Int,minTimeBetweenAudits:Duration,now:Date):Seq[ResearcherToAudit] = {
//todo one round with the db instead of O(researchers)
//for each researcher
//horizon = if the researcher has had an audit
// date of last audit
// else if no audit yet
// date of first query
val researchersToHorizons: Map[UserName, Date] = dbRun(for{
dateOfFirstQuery: Seq[(UserName, Date)] <- leastRecentUserQuery.map(record => record.researcherId -> record.date).result
mostRecentAudit: Seq[(UserName, Date)] <- mostRecentUserAudits.map(record => record.researcher -> record.changeDate).result
} yield {
dateOfFirstQuery.toMap ++ mostRecentAudit.toMap
})
val researchersToHorizonsAndCounts = researchersToHorizons.map{ researcherDate =>
val queryParameters = QueryParameters(researcherIdOption = Some(researcherDate._1),
minDate = Some(researcherDate._2))
val count:Int = dbRun(shrineQueryCountQuery(queryParameters,None).length.result)
(researcherDate._1,(researcherDate._2,count))
}
//audit if oldest query within the horizon is >= minTimeBetweenAudits in the past and the researcher has run at least one query since.
val oldestAllowed = System.currentTimeMillis() - minTimeBetweenAudits.toMillis
val timeBasedAudit = researchersToHorizonsAndCounts.filter(x => x._2._2 > 0 && x._2._1 <= oldestAllowed)
//audit if the researcher has run >= maxQueryCountBetweenAudits queries since horizon?
val queryBasedAudit = researchersToHorizonsAndCounts.filter(x => x._2._2 >= maxQueryCountBetweenAudits)
val toAudit = timeBasedAudit ++ queryBasedAudit
val namesToOutboundUsers: Map[UserName, OutboundUser] = dbRun(outboundUsersForNamesAction(toAudit.keySet))
toAudit.map(x => ResearcherToAudit(namesToOutboundUsers(x._1),x._2._2,x._2._1,now)).to[Seq]
}
def logAuditRequests(auditRequests:Seq[ResearcherToAudit],now:Date) {
dbRun{
allUserAudits ++= auditRequests.map(x => UserAuditRecord(researcher = x.researcher.userName,
queryCount = x.count,
changeDate = now
))
}
}
}
/**
* Separate class to support schema generation without actually connecting to the database.
*
* @param jdbcProfile Database profile to use for the schema
*/
case class StewardSchema(jdbcProfile: JdbcProfile) extends Loggable {
import jdbcProfile.api._
def ddlForAllTables = {
allUserQuery.schema ++ allTopicQuery.schema ++ allQueryTable.schema ++ allUserTopicQuery.schema ++ allUserAudits.schema
}
//to get the schema, use the REPL
//println(StewardSchema.schema.ddlForAllTables.createStatements.mkString(";\n"))
def createTables(database:Database) = {
try {
val future = database.run(ddlForAllTables.create)
Await.result(future,10 seconds)
} catch {
//I'd prefer to check and create schema only if absent. No way to do that with Oracle.
case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x)
}
}
def dropTables(database:Database) = {
val future = database.run(ddlForAllTables.drop)
//Really wait forever for the cleanup
Await.result(future,Duration.Inf)
}
class UserTable(tag:Tag) extends Table[UserRecord](tag,"users") {
def userName = column[UserName]("userName",O.PrimaryKey)
def fullName = column[String]("fullName")
def isSteward = column[Boolean]("isSteward")
def * = (userName,fullName,isSteward) <> (UserRecord.tupled,UserRecord.unapply)
}
class TopicTable(tag:Tag) extends Table[TopicRecord](tag,"topics") {
def id = column[TopicId]("id")
def name = column[String]("name")
def description = column[String]("description")
def createdBy = column[UserName]("createdBy")
def createDate = column[Date]("createDate")
def state = column[TopicStateName]("state")
def changedBy = column[UserName]("changedBy")
def changeDate = column[Date]("changeDate")
def idIndex = index("idIndex",id,unique = false)
def topicNameIndex = index("topicNameIndex",name,unique = false)
def createdByIndex = index("createdByIndex",createdBy,unique = false)
def createDateIndex = index("createDateIndex",createDate,unique = false)
def stateIndex = index("stateIndex",state,unique = false)
def changedByIndex = index("changedByIndex",changedBy,unique = false)
def changeDateIndex = index("changeDateIndex",changeDate,unique = false)
def * = (id.?, name, description, createdBy, createDate, state, changedBy, changeDate) <> (fromRow, toRow) //(TopicRecord.tupled,TopicRecord.unapply)
def fromRow = (fromParams _).tupled
def fromParams(id:Option[TopicId] = None,
name:String,
description:String,
createdBy:UserName,
createDate:Date,
stateName:String,
changedBy:UserName,
changeDate:Date): TopicRecord = {
TopicRecord(id, name, description, createdBy, createDate, TopicState.namesToStates(stateName), changedBy, changeDate)
}
def toRow(topicRecord: TopicRecord) =
Some((topicRecord.id,
topicRecord.name,
topicRecord.description,
topicRecord.createdBy,
topicRecord.createDate,
topicRecord.state.name,
topicRecord.changedBy,
topicRecord.changeDate
))
}
class UserTopicTable(tag:Tag) extends Table[UserTopicRecord](tag,"userTopic") {
def researcher = column[UserName]("researcher")
def topicId = column[TopicId]("topicId")
def state = column[TopicStateName]("state")
def changedBy = column[UserName]("changedBy")
def changeDate = column[Date]("changeDate")
def researcherTopicIdIndex = index("researcherTopicIdIndex",(researcher,topicId),unique = true)
def * = (researcher, topicId, state, changedBy, changeDate) <> (fromRow, toRow)
def fromRow = (fromParams _).tupled
def fromParams(researcher:UserName,
topicId:TopicId,
stateName:String,
changedBy:UserName,
changeDate:Date): UserTopicRecord = {
UserTopicRecord(researcher,topicId,TopicState.namesToStates(stateName), changedBy, changeDate)
}
def toRow(userTopicRecord: UserTopicRecord):Option[(UserName,TopicId,String,UserName,Date)] =
Some((userTopicRecord.researcher,
userTopicRecord.topicId,
userTopicRecord.state.name,
userTopicRecord.changedBy,
userTopicRecord.changeDate
))
}
class UserAuditTable(tag:Tag) extends Table[UserAuditRecord](tag,"userAudit") {
def researcher = column[UserName]("researcher")
def queryCount = column[Int]("queryCount")
def changeDate = column[Date]("changeDate")
def * = (researcher, queryCount, changeDate) <> (fromRow, toRow)
def fromRow = (fromParams _).tupled
def fromParams(researcher:UserName,
queryCount:Int,
changeDate:Date): UserAuditRecord = {
UserAuditRecord(researcher,queryCount, changeDate)
}
def toRow(record: UserAuditRecord):Option[(UserName,Int,Date)] =
Some((record.researcher,
record.queryCount,
record.changeDate
))
}
class QueryTable(tag:Tag) extends Table[ShrineQueryRecord](tag,"queries") {
def stewardId = column[StewardQueryId]("stewardId",O.PrimaryKey,O.AutoInc)
def externalId = column[ExternalQueryId]("id")
def name = column[String]("name")
def researcherId = column[UserName]("researcher")
def topicId = column[Option[TopicId]]("topic")
def queryContents = column[QueryContents]("queryContents")
def stewardResponse = column[String]("stewardResponse")
def date = column[Date]("date")
def externalIdIndex = index("externalIdIndex",externalId,unique = false)
def queryNameIndex = index("queryNameIndex",name,unique = false)
def researcherIdIndex = index("researcherIdIndex",stewardId,unique = false)
def topicIdIndex = index("topicIdIndex",topicId,unique = false)
def stewardResponseIndex = index("stewardResponseIndex",stewardResponse,unique = false)
def dateIndex = index("dateIndex",date,unique = false)
def * = (stewardId.?,externalId,name,researcherId,topicId,queryContents,stewardResponse,date) <> (fromRow,toRow)
def fromRow = (fromParams _).tupled
def fromParams(stewardId:Option[StewardQueryId],
externalId:ExternalQueryId,
name:String,
userId:UserName,
topicId:Option[TopicId],
queryContents: QueryContents,
stewardResponse:String,
date:Date): ShrineQueryRecord = {
ShrineQueryRecord(stewardId,externalId, name, userId, topicId, queryContents,TopicState.namesToStates(stewardResponse),date)
}
def toRow(queryRecord: ShrineQueryRecord):Option[(
Option[StewardQueryId],
ExternalQueryId,
String,
UserName,
Option[TopicId],
QueryContents,
String,
Date
)] =
Some((queryRecord.stewardId,
queryRecord.externalId,
queryRecord.name,
queryRecord.userId,
queryRecord.topicId,
queryRecord.queryContents,
queryRecord.stewardResponse.name,
queryRecord.date)
)
}
val allUserQuery = TableQuery[UserTable]
val allTopicQuery = TableQuery[TopicTable]
val allQueryTable = TableQuery[QueryTable]
val allUserTopicQuery = TableQuery[UserTopicTable]
val allUserAudits = TableQuery[UserAuditTable]
val mostRecentTopicQuery: Query[TopicTable, TopicRecord, Seq] = for(
topic <- allTopicQuery if !allTopicQuery.filter(_.id === topic.id).filter(_.changeDate > topic.changeDate).exists
) yield topic
val mostRecentUserAudits: Query[UserAuditTable, UserAuditRecord, Seq] = for(
record <- allUserAudits if !allUserAudits.filter(_.researcher === record.researcher).filter(_.changeDate > record.changeDate).exists
) yield record
val leastRecentUserQuery: Query[QueryTable, ShrineQueryRecord, Seq] = for(
record <- allQueryTable if !allQueryTable.filter(_.researcherId === record.researcherId).filter(_.date < record.date).exists
) yield record
}
object StewardSchema {
val allConfig:Config = ConfigSource.config
val config:Config = allConfig.getConfig("shrine.steward.database")
val slickProfile:JdbcProfile = ConfigSource.getObject("slickProfileClassName", config)
val schema = StewardSchema(slickProfile)
}
object StewardDatabase extends NeedsWarmUp {
val dataSource:DataSource = TestableDataSourceCreator.dataSource(StewardSchema.config)
val db = StewardDatabase(StewardSchema.schema,dataSource)
val createTablesOnStart = StewardSchema.config.getBoolean("createTablesOnStart")
if(createTablesOnStart) StewardDatabase.db.createTables()
override def warmUp() = StewardDatabase.db.warmUp
}
//API help
sealed case class SortOrder(name:String){
import slick.lifted.ColumnOrdered
def orderForColumn[T](column:ColumnOrdered[T]):ColumnOrdered[T] = {
if(this == SortOrder.ascending) column.asc
else column.desc
}
}
object SortOrder {
val ascending = SortOrder("ascending")
val descending = SortOrder("descending")
val sortOrders = Seq(ascending,descending)
val namesToSortOrders = sortOrders.map(x => (x.name,x)).toMap
def sortOrderForStringOption(option:Option[String]) = option.fold(ascending)(namesToSortOrders(_))
}
case class QueryParameters(researcherIdOption:Option[UserName] = None,
stateOption:Option[TopicState] = None,
skipOption:Option[Int] = None,
limitOption:Option[Int] = None,
sortByOption:Option[String] = None,
sortOrder:SortOrder = SortOrder.ascending,
minDate:Option[Date] = None,
maxDate:Option[Date] = None
)
//DAO case classes, exposed for testing only
case class ShrineQueryRecord(stewardId: Option[StewardQueryId],
externalId:ExternalQueryId,
name:String,
userId:UserName,
topicId:Option[TopicId],
queryContents: QueryContents,
stewardResponse:TopicState,
date:Date) {
def createOutboundShrineQuery(outboundTopic:Option[OutboundTopic],outboundUser:OutboundUser): OutboundShrineQuery = {
OutboundShrineQuery(stewardId.get,externalId,name,outboundUser,outboundTopic,queryContents,stewardResponse.name,date)
}
}
object ShrineQueryRecord extends ((Option[StewardQueryId],ExternalQueryId,String,UserName,Option[TopicId],QueryContents,TopicState,Date) => ShrineQueryRecord) {
def apply(userId:UserName,topicId:Option[TopicId],shrineQuery: InboundShrineQuery,stewardResponse:TopicState): ShrineQueryRecord = {
ShrineQueryRecord(
None,
shrineQuery.externalId,
shrineQuery.name,
userId,
topicId,
shrineQuery.queryContents,
stewardResponse,
System.currentTimeMillis())
}
}
case class UserRecord(userName:UserName,fullName:String,isSteward:Boolean) {
lazy val asOutboundUser:OutboundUser = OutboundUser(userName,fullName,if(isSteward) Set(stewardRole,researcherRole)
else Set(researcherRole))
}
object UserRecord extends ((UserName,String,Boolean) => UserRecord) {
def apply(user:User):UserRecord = UserRecord(user.username,user.fullName,user.params.toList.contains((stewardRole,"true")))
}
case class TopicRecord(id:Option[TopicId] = None,
name:String,
description:String,
createdBy:UserName,
createDate:Date,
state:TopicState,
changedBy:UserName,
changeDate:Date) {
def toOutboundTopic(userNamesToOutboundUsers: Map[UserName, OutboundUser]): OutboundTopic = {
OutboundTopic(id.get,
name,
description,
userNamesToOutboundUsers(createdBy),
createDate,
state.name,
userNamesToOutboundUsers(changedBy),
changeDate)
}
}
object TopicRecord {
def apply(id:Option[TopicId],
name:String,
description:String,
createdBy:UserName,
createDate:Date,
state:TopicState
):TopicRecord = TopicRecord(id,
name,
description,
createdBy,
createDate,
state,
createdBy,
createDate)
}
case class UserTopicRecord(researcher:UserName,
topicId:TopicId,
state:TopicState,
changedBy:UserName,
changeDate:Date)
case class UserAuditRecord(researcher:UserName,
queryCount:Int,
changeDate:Date) {
def sameExceptForTimes(userAuditRecord: UserAuditRecord):Boolean = {
(researcher == userAuditRecord.researcher) &&
(queryCount == userAuditRecord.queryCount)
}
}
abstract class TopicAcessException(topicId: TopicId,message:String) extends IllegalArgumentException(message)
case class TopicDoesNotExist(topicId:TopicId) extends TopicAcessException(topicId,s"No topic for id $topicId")
case class ApprovedTopicCanNotBeChanged(topicId:TopicId) extends TopicAcessException(topicId,s"Topic $topicId has been ${TopicState.approved}")
case class DetectedAttemptByWrongUserToChangeTopic(topicId:TopicId,userId:UserName,ownerId:UserName) extends TopicAcessException(topicId,s"$userId does not own $topicId; $ownerId owns it.")
case class StewardDatabaseProblem(dbioax:DbIoActionException) extends AbstractProblem(ProblemSources.Dsa) {
override def summary: String = "The DSA's database failed due to an exception."
override def description: String = s"TThe DSAs database failed due to $dbioax"
override def throwable = Some(dbioax)
}
\ No newline at end of file
diff --git a/messagequeue/hornetqmom/src/main/resources/reference.conf b/messagequeue/hornetqmom/src/main/resources/reference.conf
index 540242046..f6c2ac407 100644
--- a/messagequeue/hornetqmom/src/main/resources/reference.conf
+++ b/messagequeue/hornetqmom/src/main/resources/reference.conf
@@ -1,6 +1,9 @@
shrine {
messagequeue {
hornetq {
}
+ hornetQWebApi {
+ enabled = false
+ }
}
}
\ No newline at end of file
diff --git a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala
index 8884288b4..af57973a4 100644
--- a/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala
+++ b/messagequeue/hornetqmom/src/main/scala/net/shrine/hornetqmom/HornetQMomWebApi.scala
@@ -1,171 +1,191 @@
package net.shrine.hornetqmom
-import akka.event.Logging
import net.shrine.log.Loggable
import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue, QueueSerializer}
import net.shrine.problem.{AbstractProblem, ProblemSources}
+import net.shrine.source.ConfigSource
import org.json4s.native.Serialization
import org.json4s.native.Serialization.{read, write}
import org.json4s.{Formats, NoTypeHints}
import spray.http.StatusCodes
-import spray.routing.directives.LogEntry
import spray.routing.{HttpService, Route}
import scala.collection.immutable.Seq
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}
/**
* A web API that provides access to the internal HornetQMom library.
* Allows client to createQueue, deleteQueue, sendMessage, receiveMessage, getQueues, and sendReceipt
*
* Created by yifan on 7/24/17.
*/
trait HornetQMomWebApi extends HttpService
with Loggable {
+ val enabled: Boolean = ConfigSource.config.getString("shrine.messagequeue.hornetQWebApi.enabled").toBoolean
+ val warningMessage: String = "If you intend for this node to serve as this SHRINE network's messaging hub " +
+ "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." +
+ " You do not want to do this unless you are the hub admin!"
+
def momRoute: Route = pathPrefix("mom") {
- put {
- createQueue ~
- sendMessage ~
- acknowledge
- } ~ receiveMessage ~ getQueues ~ deleteQueue
+
+ if (!enabled) {
+ val configProblem: CannotUseHornetQMomWebApiProblem = CannotUseHornetQMomWebApiProblem(new UnsupportedOperationException)
+ warn(s"HornetQMomWebApi is not available to use due to configProblem ${configProblem.description}!")
+ respondWithStatus(StatusCodes.NotFound) {
+ complete(warningMessage)
+ }
+ } else {
+ put {
+ createQueue ~
+ sendMessage ~
+ acknowledge
+ } ~ receiveMessage ~ getQueues ~ deleteQueue
+ }
}
// SQS returns CreateQueueResult, which contains queueUrl: String
def createQueue: Route =
path("createQueue" / Segment) { queueName =>
detach() {
val createdQueueTry: Try[Queue] = LocalHornetQMom.createQueueIfAbsent(queueName)
createdQueueTry match {
case Success(queue) => {
implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer
val response: String = write[Queue](queue)(formats)
respondWithStatus(StatusCodes.Created) {
complete(response)
}
}
case Failure(x) => {
internalServerErrorOccured(x, "createQueue")
}
}
}
}
// SQS takes in DeleteMessageRequest, which contains a queueUrl: String and a ReceiptHandle: String
// returns a DeleteMessageResult, toString for debugging
def deleteQueue: Route = path("deleteQueue" / Segment) { queueName =>
put {
detach() {
val deleteQueueTry: Try[Unit] = LocalHornetQMom.deleteQueue(queueName)
deleteQueueTry match {
case Success(v) => {
complete(StatusCodes.OK)
}
case Failure(x) => {
internalServerErrorOccured(x, "deleteQueue")
}
}
}
}
}
// SQS sendMessage(String queueUrl, String messageBody) => SendMessageResult
def sendMessage: Route = path("sendMessage" / Segment) { toQueue =>
requestInstance { request =>
val messageContent = request.entity.asString
detach() {
val sendTry: Try[Unit] = LocalHornetQMom.send(messageContent, Queue(toQueue))
sendTry match {
case Success(v) => {
complete(StatusCodes.Accepted)
}
case Failure(x) => {
internalServerErrorOccured(x, "sendMessage")
}
}
}
}
}
// SQS ReceiveMessageResult receiveMessage(String queueUrl)
def receiveMessage: Route =
get {
path("receiveMessage" / Segment) { fromQueue =>
parameter('timeOutSeconds ? 20) { timeOutSeconds =>
val timeout: Duration = Duration.create(timeOutSeconds, "seconds")
detach() {
val receiveTry: Try[Option[Message]] = LocalHornetQMom.receive(Queue(fromQueue), timeout)
receiveTry match {
case Success(optMessage) => {
implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer
optMessage.fold(complete(StatusCodes.NotFound))(msg => complete(write(optMessage)(formats)))
}
case Failure(x) => {
internalServerErrorOccured(x, "receiveMessage")
}
}
}
}
}
}
// SQS has DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle)
def acknowledge: Route = path("acknowledge") {
entity(as[String]) { messageJSON =>
implicit val formats: Formats = Serialization.formats(NoTypeHints) + new MessageSerializer
detach() {
val msg: Message = read[Message](messageJSON)(formats, manifest[Message])
val acknowledgeTry: Try[Unit] = LocalHornetQMom.completeMessage(msg)
acknowledgeTry match {
case Success(v) => {
complete(StatusCodes.ResetContent)
}
case Failure(x) => {
internalServerErrorOccured(x, "acknowledge")
}
}
}
}
}
// Returns the names of the queues created on this server. Seq[Any]
def getQueues: Route = path("getQueues") {
get {
detach() {
implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer
respondWithStatus(StatusCodes.OK) {
val getQueuesTry: Try[Seq[Queue]] = LocalHornetQMom.queues
getQueuesTry match {
case Success(seqQueue) => {
complete(write[Seq[Queue]](LocalHornetQMom.queues.get)(formats))
}
case Failure(x) => {
internalServerErrorOccured(x, "getQueues")
}
}
}
}
}
}
-
def internalServerErrorOccured(x: Throwable, function: String): Route = {
respondWithStatus(StatusCodes.InternalServerError) {
val serverErrorProblem: HornetQMomServerErrorProblem = HornetQMomServerErrorProblem(x, function)
debug(s"HornetQ encountered a Problem during $function, Problem Details: $serverErrorProblem")
- complete(s"HornetQ throws an exception while trying to $function. HornetQ Server response: ${x.getMessage} from ${x.getClass}")
+ complete(s"HornetQ throws an exception while trying to $function. HornetQ Server response: ${x.getMessage}" +
+ s"Exception is from ${x.getClass}")
}
}
}
-
-
-case class HornetQMomServerErrorProblem(x:Throwable, function:String) extends AbstractProblem(ProblemSources.Adapter) {
+case class HornetQMomServerErrorProblem(x:Throwable, function:String) extends AbstractProblem(ProblemSources.Hub) {
override val throwable = Some(x)
override val summary: String = "SHRINE cannot use HornetQMomWebApi due to a server error occurred in hornetQ."
override val description: String = s"HornetQ throws an exception while trying to $function," +
s" the server's response is: ${x.getMessage} from ${x.getClass}."
}
+
+case class CannotUseHornetQMomWebApiProblem(x:Throwable) extends AbstractProblem(ProblemSources.Hub) {
+
+ override val throwable = Some(x)
+ override val summary: String = "SHRINE cannot use HornetQMomWebApi due to configuration in shrine.conf."
+ override val description: String = "If you intend for this node to serve as this SHRINE network's messaging hub " +
+ "set shrine.messagequeue.hornetQWebApi.enabled to true in your shrine.conf." +
+ " You do not want to do this unless you are the hub admin!"
+}
diff --git a/messagequeue/hornetqmom/src/test/resources/shrine.conf b/messagequeue/hornetqmom/src/test/resources/shrine.conf
index e69de29bb..3c174f1c5 100644
--- a/messagequeue/hornetqmom/src/test/resources/shrine.conf
+++ b/messagequeue/hornetqmom/src/test/resources/shrine.conf
@@ -0,0 +1,10 @@
+shrine {
+ messagequeue {
+ hornetQWebApiTest {
+ enabled = false
+ }
+ }
+ problem {
+ problemHandler = "net.shrine.problem.NoOpProblemHandler$"
+ }
+}
\ No newline at end of file
diff --git a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala
index e9ebd6eef..01bfe8485 100644
--- a/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala
+++ b/messagequeue/hornetqmom/src/test/scala/net/shrine/hornetqmom/HornetQMomWebApiTest.scala
@@ -1,105 +1,181 @@
package net.shrine.hornetqmom
import akka.actor.ActorRefFactory
import net.shrine.messagequeueservice.{Message, MessageSerializer, Queue, QueueSerializer}
+import net.shrine.source.ConfigSource
import org.json4s.NoTypeHints
import org.json4s.native.Serialization
import org.json4s.native.Serialization.read
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
import spray.http.HttpEntity
import spray.http.StatusCodes._
import spray.testkit.ScalatestRouteTest
import scala.collection.immutable.Seq
/**
* Created by yifan on 7/27/17.
*/
@RunWith(classOf[JUnitRunner])
class HornetQMomWebApiTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi {
override def actorRefFactory: ActorRefFactory = system
private val proposedQueueName = "test Queue"
private val queue: Queue = Queue(proposedQueueName)
private val queueName: String = queue.name // "testQueue"
private val messageContent = "test Content"
private var receivedMessage: String = ""
"HornetQMomWebApi" should "create/delete the given queue, send/receive message, get queues" in {
Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check {
val response = new String(body.data.toByteArray)
- implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer
- val jsonToQueue = read[Queue](response)(formats, manifest[Queue])
- val responseQueueName = jsonToQueue.name
- assertResult(Created)(status)
- assertResult(queueName)(responseQueueName)
+ if (!enabled) {
+ assertResult(NotFound)(status)
+ assertResult(warningMessage)(response)
+ } else {
+ implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer
+ println(response)
+ val jsonToQueue = read[Queue](response)(formats, manifest[Queue])
+ val responseQueueName = jsonToQueue.name
+ assertResult(Created)(status)
+ assertResult(queueName)(responseQueueName)
+ }
}
// should be OK to create a queue twice
Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check {
val response = new String(body.data.toByteArray)
- implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer
- val jsonToQueue = read[Queue](response)(formats, manifest[Queue])
- val responseQueueName = jsonToQueue.name
- assertResult(Created)(status)
- assertResult(queueName)(responseQueueName)
+ if (!enabled) {
+ assertResult(NotFound)(status)
+ assertResult(warningMessage)(response)
+ } else {
+ implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer
+ val jsonToQueue = read[Queue](response)(formats, manifest[Queue])
+ val responseQueueName = jsonToQueue.name
+ assertResult(Created)(status)
+ assertResult(queueName)(responseQueueName)
+ }
}
Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check {
- assertResult(Accepted)(status)
+ val response = new String(body.data.toByteArray)
+ if (!enabled) {
+ assertResult(NotFound)(status)
+ assertResult(warningMessage)(response)
+ } else {
+ assertResult(Accepted)(status)
+ }
}
Get(s"/mom/getQueues") ~> momRoute ~> check {
-
- implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer
val response: String = new String(body.data.toByteArray)
- val jsonToSeq: Seq[Queue] = read[Seq[Queue]](response, false)(formats, manifest[Seq[Queue]])
-
- assertResult(OK)(status)
- assertResult(queueName)(jsonToSeq.head.name)
+ if (!enabled) {
+ assertResult(NotFound)(status)
+ assertResult(warningMessage)(response)
+ } else {
+ implicit val formats = Serialization.formats(NoTypeHints) + new QueueSerializer
+ val jsonToSeq: Seq[Queue] = read[Seq[Queue]](response, false)(formats, manifest[Seq[Queue]])
+
+ assertResult(OK)(status)
+ assertResult(queueName)(jsonToSeq.head.name)
+ }
}
// given timeout is 2 seconds
Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=2") ~> momRoute ~> check {
val response = new String(body.data.toByteArray)
receivedMessage = response
-
- implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer
- val responseToMessage: Message = read[Message](response)(formats, manifest[Message])
-
- assertResult(OK)(status)
- assert(responseToMessage.isInstanceOf[Message])
+ if (!enabled) {
+ assertResult(NotFound)(status)
+ assertResult(warningMessage)(response)
+ } else {
+ implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer
+ val responseToMessage: Message = read[Message](response)(formats, manifest[Message])
+
+ assertResult(OK)(status)
+ assert(responseToMessage.isInstanceOf[Message])
+ }
}
- Put("/mom/acknowledge", HttpEntity(s"$receivedMessage")) ~>
- momRoute ~> check {
- implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer
- assertResult(ResetContent)(status)
+ Put("/mom/acknowledge", HttpEntity(s"$receivedMessage")) ~> momRoute ~> check {
+ val response = new String(body.data.toByteArray)
+ if (!enabled) {
+ assertResult(NotFound)(status)
+ assertResult(warningMessage)(response)
+ } else {
+ implicit val formats = Serialization.formats(NoTypeHints) + new MessageSerializer
+ assertResult(ResetContent)(status)
+ }
}
Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check {
- assertResult(OK)(status)
+ val response = new String(body.data.toByteArray)
+ if (!enabled) {
+ assertResult(NotFound)(status)
+ assertResult(warningMessage)(response)
+ } else {
+ assertResult(OK)(status)
+ }
}
}
- "HornetQMomWebApi" should "respond Internal server error with the corresponding error message when " +
- "failures occur while creating/deleting the given queue, sending/receiving message, getting queues" in {
-
- Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check {
- assertResult(InternalServerError)(status)
+ "HornetQMomWebApi" should "respond Internal server error with the corresponding error message when " +
+ "failures occur while creating/deleting the given queue, sending/receiving message, getting queues" in {
+
+ Put(s"/mom/deleteQueue/$queueName") ~> momRoute ~> check {
+ val response = new String(body.data.toByteArray)
+ if (!enabled) {
+ assertResult(NotFound)(status)
+ assertResult(warningMessage)(response)
+ } else {
+ assertResult(InternalServerError)(status)
+ }
+ }
+
+ Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check {
+ val response = new String(body.data.toByteArray)
+ if (!enabled) {
+ assertResult(NotFound)(status)
+ assertResult(warningMessage)(response)
+ } else {
+ assertResult(InternalServerError)(status)
+ }
+ }
+
+ // given timeout is 1 seconds
+ Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=1") ~> momRoute ~> check {
+ val response = new String(body.data.toByteArray)
+ if (!enabled) {
+ assertResult(NotFound)(status)
+ assertResult(warningMessage)(response)
+ } else {
+ assertResult(InternalServerError)(status)
+ }
+ }
}
- Put(s"/mom/sendMessage/$queueName", HttpEntity(s"$messageContent")) ~> momRoute ~> check {
- assertResult(InternalServerError)(status)
- }
+}
+
+@RunWith(classOf[JUnitRunner])
+class HornetQMomWebApiConfigTest extends FlatSpec with ScalatestRouteTest with HornetQMomWebApi {
+ override def actorRefFactory: ActorRefFactory = system
+
+ private val queueName = "testQueue"
+
+
+ override val enabled: Boolean = ConfigSource.config.getString("shrine.messagequeue.hornetQWebApiTest.enabled").toBoolean
+
+ "HornetQMomWebApi" should "block user from using the API and return a 404 response" in {
+
+ Put(s"/mom/createQueue/$queueName") ~> momRoute ~> check {
+ val response = new String(body.data.toByteArray)
- // given timeout is 1 seconds
- Get(s"/mom/receiveMessage/$queueName?timeOutSeconds=1") ~> momRoute ~> check {
- assertResult(InternalServerError)(status)
+ assertResult(warningMessage)(response)
+ assertResult(NotFound)(status)
}
}
}
\ No newline at end of file
diff --git a/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala
index f707d3fbe..ccd1c46d1 100644
--- a/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala
+++ b/messagequeue/messagequeueservice/src/main/scala/net/shrine/messagequeueservice/MessageQueueService.scala
@@ -1,103 +1,102 @@
package net.shrine.messagequeueservice
import net.shrine.source.ConfigSource
import net.shrine.spray.DefaultJsonSupport
import org.hornetq.api.core.client.ClientMessage
import org.hornetq.core.client.impl.ClientMessageImpl
import org.json4s.JsonAST.{JField, JObject}
import org.json4s.{CustomSerializer, DefaultFormats, Formats, _}
import scala.collection.immutable.Seq
import scala.concurrent.duration.Duration
import scala.util.Try
/**
* This object mostly imitates AWS SQS' API via an embedded HornetQ. See http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/examples-sqs.html
*
* @author david
* @since 7/18/17
*/
//todo in 1.23 all but the server side will use the client RemoteHornetQ implementation (which will call to the server at the hub)
//todo in 1.24, create an AwsSqs implementation of the trait
trait MessageQueueService {
def createQueueIfAbsent(queueName:String): Try[Queue]
def deleteQueue(queueName:String): Try[Unit]
def queues: Try[Seq[Queue]]
def send(contents:String,to:Queue): Try[Unit]
def receive(from:Queue,timeout:Duration): Try[Option[Message]]
def completeMessage(message:Message): Try[Unit]
}
object MessageQueueService {
lazy val service:MessageQueueService = {
import scala.reflect.runtime.universe.runtimeMirror
val momClassName = ConfigSource.config.getString("shrine.messagequeue.implementation")
val classLoaderMirror = runtimeMirror(getClass.getClassLoader)
val module = classLoaderMirror.staticModule(momClassName)
classLoaderMirror.reflectModule(module).instance.asInstanceOf[MessageQueueService]
}
}
case class Message(hornetQMessage:ClientMessage) extends DefaultJsonSupport {
override implicit def json4sFormats: Formats = DefaultFormats
val propName = "contents"
def getClientMessage = hornetQMessage
def contents = hornetQMessage.getStringProperty(propName)
def getMessageID = hornetQMessage.getMessageID
def complete() = hornetQMessage.acknowledge()
}
case class Queue(var name:String) extends DefaultJsonSupport {
// filter all (Unicode) characters that are not letters
// filter neither letters nor (decimal) digits, replaceAll("[^\\p{L}]+", "")
name = name.filterNot(c => c.isWhitespace).replaceAll("[^\\p{L}\\p{Nd}]+", "")
if (name.length == 0) {
throw new IllegalArgumentException("ERROR: A valid Queue name must contain at least one letter!")
}
}
class QueueSerializer extends CustomSerializer[Queue](format => (
{
case JObject(JField("name", JString(s)) :: Nil) => Queue(s)
},
{
case queue: Queue =>
JObject(JField("name", JString(queue.name)) :: Nil)
}
))
class MessageSerializer extends CustomSerializer[Message](format => (
{
//JObject(List((hornetQMessage,JObject(List((type,JInt(0)), (durable,JBool(false)), (expiration,JInt(0)), (timestamp,JInt(1502218873012)), (priority,JInt(4)))))))
// type, durable, expiration, timestamp, priority, initialMessageBufferSize
case JObject(JField("hornetQMessage", JObject(JField("type", JInt(s)) :: JField("durable", JBool(d)) :: JField("expiration", JInt(e))
:: JField("timestamp", JInt(t)) :: JField("priority", JInt(p)) :: Nil)) :: Nil) =>
new Message(new ClientMessageImpl(s.toByte, d, e.toLong, t.toLong, p.toByte, 0))
},
{
case msg: Message =>
JObject(JField("hornetQMessage",
JObject(JField("type", JLong(msg.getClientMessage.getType)) ::
JField("durable", JBool(msg.getClientMessage.isDurable)) ::
JField("expiration", JLong(msg.getClientMessage.getExpiration)) ::
JField("timestamp", JLong(msg.getClientMessage.getTimestamp)) ::
JField("priority", JLong(msg.getClientMessage.getPriority)) :: Nil)) :: Nil)
}
))
-// todo test MessageSerializer
case class NoSuchQueueExistsInHornetQ(proposedQueue: Queue) extends Exception {
override def getMessage: String = {
s"Given Queue ${proposedQueue.name} does not exist in HornetQ server! Please create the queue first!"
}
}
diff --git a/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala
index 6a22c3f65..f944efc62 100644
--- a/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala
+++ b/qep/service/src/main/scala/net/shrine/qep/querydb/QepQueryDb.scala
@@ -1,612 +1,610 @@
package net.shrine.qep.querydb
import java.sql.SQLException
import java.util.concurrent.TimeoutException
import javax.sql.DataSource
import com.typesafe.config.Config
import net.shrine.audit.{NetworkQueryId, QueryName, Time, UserName}
import net.shrine.log.Loggable
import net.shrine.problem.{AbstractProblem, ProblemDigest, ProblemSources}
import net.shrine.protocol.{DefaultBreakdownResultOutputTypes, DeleteQueryRequest, FlagQueryRequest, I2b2ResultEnvelope, QueryMaster, QueryResult, ReadPreviousQueriesRequest, ReadPreviousQueriesResponse, RenameQueryRequest, ResultOutputType, ResultOutputTypes, RunQueryRequest, UnFlagQueryRequest}
import net.shrine.slick.{CouldNotRunDbIoActionException, TestableDataSourceCreator, TimeoutInDbIoActionException}
import net.shrine.source.ConfigSource
import net.shrine.util.XmlDateHelper
import slick.driver.JdbcProfile
import scala.collection.immutable.Iterable
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, Future, blocking}
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.control.NonFatal
import scala.xml.XML
/**
* DB code for the QEP's query instances and query results.
*
* @author david
* @since 1/19/16
*/
case class QepQueryDb(schemaDef:QepQuerySchema,dataSource: DataSource,timeout:Duration) extends Loggable {
import schemaDef._
import jdbcProfile.api._
val database = Database.forDataSource(dataSource)
def createTables() = schemaDef.createTables(database)
def dropTables() = schemaDef.dropTables(database)
def dbRun[R](action: DBIOAction[R, NoStream, Nothing]):R = {
val future: Future[R] = database.run(action)
try {
- blocking {
Await.result(future, timeout)
- }
}
catch {
case tx:TimeoutException => throw TimeoutInDbIoActionException(dataSource, timeout, tx)
case NonFatal(x) => throw CouldNotRunDbIoActionException(dataSource,x)
}
}
def insertQepQuery(runQueryRequest: RunQueryRequest):Unit = {
debug(s"insertQepQuery $runQueryRequest")
insertQepQuery(QepQuery(runQueryRequest))
}
def insertQepQuery(qepQuery: QepQuery):Unit = {
dbRun(allQepQueryQuery += qepQuery)
}
def selectAllQepQueries:Seq[QepQuery] = {
dbRun(mostRecentVisibleQepQueries.result)
}
def selectPreviousQueries(request: ReadPreviousQueriesRequest):ReadPreviousQueriesResponse = {
val previousQueries: Seq[QepQuery] = selectPreviousQueriesByUserAndDomain(
request.authn.username,
request.authn.domain,
None,
Some(request.fetchSize))
val flags:Map[NetworkQueryId,QepQueryFlag] = selectMostRecentQepQueryFlagsFor(previousQueries.map(_.networkId).to[Set])
val queriesAndFlags = previousQueries.map(x => (x,flags.get(x.networkId)))
ReadPreviousQueriesResponse(queriesAndFlags.map(x => x._1.toQueryMaster(x._2)))
}
def countPreviousQueriesByUserAndDomain(userName: UserName, domain: String):Int = {
val q = mostRecentVisibleQepQueries.filter(r => r.userName === userName && r.userDomain === domain)
dbRun(q.size.result)
}
def selectQueryById(networkQueryId: NetworkQueryId): Option[QepQuery] =
dbRun(mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result).lastOption
def selectPreviousQueriesByUserAndDomain(userName: UserName, domain: String, skip:Option[Int] = None, limit:Option[Int] = None):Seq[QepQuery] = {
debug(s"start selectPreviousQueriesByUserAndDomain $userName $domain")
val q = mostRecentVisibleQepQueries.filter(r => r.userName === userName && r.userDomain === domain).sortBy(x => x.changeDate.desc)
val qWithSkip = skip.fold(q)(q.drop)
val qWithLimit = limit.fold(qWithSkip)(qWithSkip.take)
val result = dbRun(qWithLimit.result)
debug(s"finished selectPreviousQueriesByUserAndDomain with $result")
result
}
def renamePreviousQuery(request:RenameQueryRequest):Unit = {
val networkQueryId = request.networkQueryId
dbRun(
for {
queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
_ <- allQepQueryQuery ++= queryResults.map(_.copy(queryName = request.queryName,changeDate = System.currentTimeMillis()))
} yield queryResults
)
}
def markDeleted(request:DeleteQueryRequest):Unit = {
val networkQueryId = request.networkQueryId
dbRun(
for {
queryResults <- mostRecentVisibleQepQueries.filter(_.networkId === networkQueryId).result
_ <- allQepQueryQuery ++= queryResults.map(_.copy(deleted = true,changeDate = System.currentTimeMillis()))
} yield queryResults
)
}
def insertQepQueryFlag(flagQueryRequest: FlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(flagQueryRequest))
}
def insertQepQueryFlag(unflagQueryRequest: UnFlagQueryRequest):Unit = {
insertQepQueryFlag(QepQueryFlag(unflagQueryRequest))
}
def insertQepQueryFlag(qepQueryFlag: QepQueryFlag):Unit = {
dbRun(allQepQueryFlags += qepQueryFlag)
}
def selectMostRecentQepQueryFlagsFor(networkIds:Set[NetworkQueryId]):Map[NetworkQueryId,QepQueryFlag] = {
val flags:Seq[QepQueryFlag] = dbRun(mostRecentQueryFlags.filter(_.networkId inSet networkIds).result)
flags.map(x => x.networkQueryId -> x).toMap
}
def selectMostRecentQepQueryFlagFor(networkQueryId: NetworkQueryId): Option[QepQueryFlag] =
dbRun(mostRecentQueryFlags.filter(_.networkId === networkQueryId).result).lastOption
def insertQepResultRow(qepQueryRow:QueryResultRow) = {
dbRun(allQueryResultRows += qepQueryRow)
}
def insertQueryResult(networkQueryId:NetworkQueryId,result:QueryResult) = {
val adapterNode = result.description.getOrElse(throw new IllegalStateException("description is empty, does not have an adapter node"))
val queryResultRow = QueryResultRow(networkQueryId,result)
val breakdowns: Iterable[QepQueryBreakdownResultsRow] = result.breakdowns.flatMap(QepQueryBreakdownResultsRow.breakdownRowsFor(networkQueryId,adapterNode,result.resultId,_))
val problem: Seq[QepProblemDigestRow] = result.problemDigest.map(p => QepProblemDigestRow(networkQueryId,adapterNode,p.codec,p.stampText,p.summary,p.description,p.detailsXml.toString,System.currentTimeMillis())).to[Seq]
dbRun(
for {
_ <- allQueryResultRows += queryResultRow
_ <- allBreakdownResultsRows ++= breakdowns
_ <- allProblemDigestRows ++= problem
} yield ()
)
}
//todo only used in tests. Is that OK?
def selectMostRecentQepResultRowsFor(networkId:NetworkQueryId): Seq[QueryResultRow] = {
dbRun(mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result)
}
def selectMostRecentFullQueryResultsFor(networkId:NetworkQueryId): Seq[FullQueryResult] = {
val (queryResults, breakdowns,problems) = dbRun(
for {
queryResults <- mostRecentQueryResultRows.filter(_.networkQueryId === networkId).result
breakdowns: Seq[QepQueryBreakdownResultsRow] <- mostRecentBreakdownResultsRows.filter(_.networkQueryId === networkId).result
problems <- mostRecentProblemDigestRows.filter(_.networkQueryId === networkId).result
} yield (queryResults, breakdowns, problems)
)
val breakdownTypeToResults: Map[ResultOutputType, Seq[QepQueryBreakdownResultsRow]] = breakdowns.groupBy(_.resultType)
def seqOfOneProblemRowToProblemDigest(problemSeq:Seq[QepProblemDigestRow]):ProblemDigest = {
if(problemSeq.size == 1) problemSeq.head.toProblemDigest
else throw new IllegalStateException(s"problemSeq size was not 1. $problemSeq")
}
val adapterNodesToProblemDigests: Map[String, ProblemDigest] = problems.groupBy(_.adapterNode).map(nodeToProblem => nodeToProblem._1 -> seqOfOneProblemRowToProblemDigest(nodeToProblem._2) )
queryResults.map(r => FullQueryResult(
r,
breakdownTypeToResults,
adapterNodesToProblemDigests.get(r.adapterNode)
))
}
def selectMostRecentQepResultsFor(networkId:NetworkQueryId): Seq[QueryResult] = {
val fullQueryResults = selectMostRecentFullQueryResultsFor(networkId)
fullQueryResults.map(_.toQueryResult)
}
def insertQueryBreakdown(breakdownResultsRow:QepQueryBreakdownResultsRow) = {
dbRun(allBreakdownResultsRows += breakdownResultsRow)
}
def selectAllBreakdownResultsRows: Seq[QepQueryBreakdownResultsRow] = {
dbRun(allBreakdownResultsRows.result)
}
def selectDistinctAdaptersWithResults:Seq[String] = {
dbRun(allQueryResultRows.map(_.adapterNode).distinct.result).sorted
}
}
object QepQueryDb extends Loggable {
val dataSource:DataSource = TestableDataSourceCreator.dataSource(QepQuerySchema.config)
val timeout = QepQuerySchema.config.getInt("timeout") seconds
val db = QepQueryDb(QepQuerySchema.schema,dataSource,timeout)
val createTablesOnStart = QepQuerySchema.config.getBoolean("createTablesOnStart")
if(createTablesOnStart) QepQueryDb.db.createTables()
}
/**
* Separate class to support schema generation without actually connecting to the database.
*
* @param jdbcProfile Database profile to use for the schema
*/
case class QepQuerySchema(jdbcProfile: JdbcProfile,moreBreakdowns: Set[ResultOutputType]) extends Loggable {
import jdbcProfile.api._
def ddlForAllTables: jdbcProfile.DDL = {
allQepQueryQuery.schema ++ allQepQueryFlags.schema ++ allQueryResultRows.schema ++ allBreakdownResultsRows.schema ++ allProblemDigestRows.schema
}
//to get the schema, use the REPL
//println(QepQuerySchema.schema.ddlForAllTables.createStatements.mkString(";\n"))
def createTables(database:Database) = {
try {
val future = database.run(ddlForAllTables.create)
Await.result(future,10 seconds)
} catch {
//I'd prefer to check and create schema only if absent. No way to do that with Oracle.
case x:SQLException => info("Caught exception while creating tables. Recover by assuming the tables already exist.",x)
}
}
def dropTables(database:Database) = {
val future = database.run(ddlForAllTables.drop)
//Really wait forever for the cleanup
Await.result(future,Duration.Inf)
}
class QepQueries(tag:Tag) extends Table[QepQuery](tag,"previousQueries") {
def networkId = column[NetworkQueryId]("networkId")
def userName = column[UserName]("userName")
def userDomain = column[String]("domain")
def queryName = column[QueryName]("queryName")
def expression = column[Option[String]]("expression")
def dateCreated = column[Time]("dateCreated")
def deleted = column[Boolean]("deleted")
def queryXml = column[String]("queryXml")
def changeDate = column[Long]("changeDate")
def * = (networkId,userName,userDomain,queryName,expression,dateCreated,deleted,queryXml,changeDate) <> (QepQuery.tupled,QepQuery.unapply)
}
val allQepQueryQuery = TableQuery[QepQueries]
val mostRecentQepQueryQuery: Query[QepQueries, QepQuery, Seq] = for(
queries <- allQepQueryQuery if !allQepQueryQuery.filter(_.networkId === queries.networkId).filter(_.changeDate > queries.changeDate).exists
) yield queries
val mostRecentVisibleQepQueries = mostRecentQepQueryQuery.filter(_.deleted === false)
class QepQueryFlags(tag:Tag) extends Table[QepQueryFlag](tag,"queryFlags") {
def networkId = column[NetworkQueryId]("networkId")
def flagged = column[Boolean]("flagged")
def flagMessage = column[String]("flagMessage")
def changeDate = column[Long]("changeDate")
def * = (networkId,flagged,flagMessage,changeDate) <> (QepQueryFlag.tupled,QepQueryFlag.unapply)
}
val allQepQueryFlags = TableQuery[QepQueryFlags]
val mostRecentQueryFlags: Query[QepQueryFlags, QepQueryFlag, Seq] = for(
queryFlags <- allQepQueryFlags if !allQepQueryFlags.filter(_.networkId === queryFlags.networkId).filter(_.changeDate > queryFlags.changeDate).exists
) yield queryFlags
val qepQueryResultTypes = DefaultBreakdownResultOutputTypes.toSet ++ ResultOutputType.values ++ moreBreakdowns
val stringsToQueryResultTypes: Map[String, ResultOutputType] = qepQueryResultTypes.map(x => (x.name,x)).toMap
val queryResultTypesToString: Map[ResultOutputType, String] = stringsToQueryResultTypes.map(_.swap)
implicit val qepQueryResultTypesColumnType = MappedColumnType.base[ResultOutputType,String] ({
(resultType: ResultOutputType) => queryResultTypesToString(resultType)
},{
(string: String) => stringsToQueryResultTypes(string)
})
implicit val queryStatusColumnType = MappedColumnType.base[QueryResult.StatusType,String] ({
statusType => statusType.name
},{
name => QueryResult.StatusType.valueOf(name).getOrElse(throw new IllegalStateException(s"$name is not one of ${QueryResult.StatusType.values.map(_.name).mkString(", ")}"))
})
class QepQueryResults(tag:Tag) extends Table[QueryResultRow](tag,"queryResults") {
def resultId = column[Long]("resultId")
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def instanceId = column[Long]("instanceId")
def adapterNode = column[String]("adapterNode")
def resultType = column[Option[ResultOutputType]]("resultType")
def size = column[Long]("size")
def startDate = column[Option[Long]]("startDate")
def endDate = column[Option[Long]]("endDate")
def status = column[QueryResult.StatusType]("status")
def statusMessage = column[Option[String]]("statusMessage")
def changeDate = column[Long]("changeDate")
def * = (resultId,networkQueryId,instanceId,adapterNode,resultType,size,startDate,endDate,status,statusMessage,changeDate) <> (QueryResultRow.tupled,QueryResultRow.unapply)
}
val allQueryResultRows = TableQuery[QepQueryResults]
//Most recent query result rows for each queryId from each adapter
val mostRecentQueryResultRows: Query[QepQueryResults, QueryResultRow, Seq] = for(
queryResultRows <- allQueryResultRows if !allQueryResultRows.filter(_.networkQueryId === queryResultRows.networkQueryId).filter(_.adapterNode === queryResultRows.adapterNode).filter(_.changeDate > queryResultRows.changeDate).exists
) yield queryResultRows
class QepQueryBreakdownResults(tag:Tag) extends Table[QepQueryBreakdownResultsRow](tag,"queryBreakdownResults") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def resultId = column[Long]("resultId")
def resultType = column[ResultOutputType]("resultType")
def dataKey = column[String]("dataKey")
def value = column[Long]("value")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,resultId,resultType,dataKey,value,changeDate) <> (QepQueryBreakdownResultsRow.tupled,QepQueryBreakdownResultsRow.unapply)
}
val allBreakdownResultsRows = TableQuery[QepQueryBreakdownResults]
//Most recent query result rows for each queryId from each adapter
val mostRecentBreakdownResultsRows: Query[QepQueryBreakdownResults, QepQueryBreakdownResultsRow, Seq] = for(
breakdownResultsRows <- allBreakdownResultsRows if !allBreakdownResultsRows.filter(_.networkQueryId === breakdownResultsRows.networkQueryId).filter(_.adapterNode === breakdownResultsRows.adapterNode).filter(_.resultId === breakdownResultsRows.resultId).filter(_.changeDate > breakdownResultsRows.changeDate).exists
) yield breakdownResultsRows
/*
case class ProblemDigest(codec: String, stampText: String, summary: String, description: String, detailsXml: NodeSeq) extends XmlMarshaller {
*/
class QepResultProblemDigests(tag:Tag) extends Table [QepProblemDigestRow](tag,"queryResultProblemDigests") {
def networkQueryId = column[NetworkQueryId]("networkQueryId")
def adapterNode = column[String]("adapterNode")
def codec = column[String]("codec")
def stamp = column[String]("stamp")
def summary = column[String]("summary")
def description = column[String]("description")
def details = column[String]("details")
def changeDate = column[Long]("changeDate")
def * = (networkQueryId,adapterNode,codec,stamp,summary,description,details,changeDate) <> (QepProblemDigestRow.tupled,QepProblemDigestRow.unapply)
}
val allProblemDigestRows = TableQuery[QepResultProblemDigests]
val mostRecentProblemDigestRows: Query[QepResultProblemDigests, QepProblemDigestRow, Seq] = for(
problemDigests <- allProblemDigestRows if !allProblemDigestRows.filter(_.networkQueryId === problemDigests.networkQueryId).filter(_.adapterNode === problemDigests.adapterNode).filter(_.changeDate > problemDigests.changeDate).exists
) yield problemDigests
}
object QepQuerySchema {
val allConfig:Config = ConfigSource.config
val config:Config = allConfig.getConfig("shrine.queryEntryPoint.audit.database")
val slickProfile:JdbcProfile = ConfigSource.getObject("slickProfileClassName", config)
import net.shrine.config.ConfigExtensions
val moreBreakdowns: Set[ResultOutputType] = config.getOptionConfigured("breakdownResultOutputTypes",ResultOutputTypes.fromConfig).getOrElse(Set.empty)
val schema = QepQuerySchema(slickProfile,moreBreakdowns)
}
case class QepQuery(
networkId:NetworkQueryId,
userName: UserName,
userDomain: String,
queryName: QueryName,
expression: Option[String],
dateCreated: Time,
deleted: Boolean,
queryXml: String,
changeDate: Time
){
def toQueryMaster(qepQueryFlag:Option[QepQueryFlag]):QueryMaster = {
QueryMaster(
queryMasterId = networkId.toString,
networkQueryId = networkId,
name = queryName,
userId = userName,
groupId = userDomain,
createDate = XmlDateHelper.toXmlGregorianCalendar(dateCreated),
flagged = qepQueryFlag.map(_.flagged),
flagMessage = qepQueryFlag.map(_.flagMessage)
)
}
}
object QepQuery extends ((NetworkQueryId,UserName,String,QueryName,Option[String],Time,Boolean,String,Time) => QepQuery) {
def apply(runQueryRequest: RunQueryRequest):QepQuery = {
new QepQuery(
networkId = runQueryRequest.networkQueryId,
userName = runQueryRequest.authn.username,
userDomain = runQueryRequest.authn.domain,
queryName = runQueryRequest.queryDefinition.name,
expression = runQueryRequest.queryDefinition.expr.map(_.toString),
dateCreated = System.currentTimeMillis(),
deleted = false,
queryXml = runQueryRequest.toXmlString,
changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryFlag(
networkQueryId: NetworkQueryId,
flagged:Boolean,
flagMessage:String,
changeDate:Long
)
object QepQueryFlag extends ((NetworkQueryId,Boolean,String,Long) => QepQueryFlag) {
def apply(flagQueryRequest: FlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = flagQueryRequest.networkQueryId,
flagged = true,
flagMessage = flagQueryRequest.message.getOrElse(""),
changeDate = System.currentTimeMillis()
)
}
def apply(unflagQueryRequest: UnFlagQueryRequest):QepQueryFlag = {
QepQueryFlag(
networkQueryId = unflagQueryRequest.networkQueryId,
flagged = false,
flagMessage = "",
changeDate = System.currentTimeMillis()
)
}
}
//todo replace with a class per state
case class FullQueryResult(
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:Option[ResultOutputType],
count:Long,
startDate:Option[Long],
endDate:Option[Long],
status:QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long,
breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]],
problemDigest:Option[ProblemDigest]
) {
def toQueryResult = {
def resultEnvelopesFrom(breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]]): Map[ResultOutputType, I2b2ResultEnvelope] = {
def resultEnvelopeFrom(resultType:ResultOutputType,breakdowns:Seq[QepQueryBreakdownResultsRow]):I2b2ResultEnvelope = {
val data = breakdowns.map(b => b.dataKey -> b.value).toMap
I2b2ResultEnvelope(resultType,data)
}
breakdownTypeToResults.map(r => r._1 -> resultEnvelopeFrom(r._1,r._2))
}
QueryResult(
resultId = resultId,
instanceId = instanceId,
resultType = resultType,
setSize = count,
startDate = startDate.map(XmlDateHelper.toXmlGregorianCalendar),
endDate = endDate.map(XmlDateHelper.toXmlGregorianCalendar),
description = Some(adapterNode),
statusType = status,
statusMessage = statusMessage,
breakdowns = resultEnvelopesFrom(breakdownTypeToResults),
problemDigest = problemDigest
)
}
}
object FullQueryResult {
def apply(row:QueryResultRow,
breakdownTypeToResults:Map[ResultOutputType,Seq[QepQueryBreakdownResultsRow]],
problemDigest:Option[ProblemDigest]):FullQueryResult = {
FullQueryResult(resultId = row.resultId,
networkQueryId = row.networkQueryId,
instanceId = row.instanceId,
adapterNode = row.adapterNode,
resultType = row.resultType,
count = row.size,
startDate = row.startDate,
endDate = row.endDate,
status = row.status,
statusMessage = row.statusMessage,
changeDate = row.changeDate,
breakdownTypeToResults = breakdownTypeToResults,
problemDigest = problemDigest
)
}
}
case class QueryResultRow(
resultId:Long,
networkQueryId:NetworkQueryId,
instanceId:Long,
adapterNode:String,
resultType:Option[ResultOutputType],
size:Long,
startDate:Option[Long],
endDate:Option[Long],
status:QueryResult.StatusType,
statusMessage:Option[String],
changeDate:Long
) {
}
object QueryResultRow extends ((Long,NetworkQueryId,Long,String,Option[ResultOutputType],Long,Option[Long],Option[Long],QueryResult.StatusType,Option[String],Long) => QueryResultRow)
{
def apply(networkQueryId:NetworkQueryId,result:QueryResult):QueryResultRow = {
new QueryResultRow(
resultId = result.resultId,
networkQueryId = networkQueryId,
instanceId = result.instanceId,
adapterNode = result.description.getOrElse(s"$result has None in its description field, not a name of an adapter node."),
resultType = result.resultType,
size = result.setSize,
startDate = result.startDate.map(_.toGregorianCalendar.getTimeInMillis),
endDate = result.endDate.map(_.toGregorianCalendar.getTimeInMillis),
status = result.statusType,
statusMessage = result.statusMessage,
changeDate = System.currentTimeMillis()
)
}
}
case class QepQueryBreakdownResultsRow(
networkQueryId: NetworkQueryId,
adapterNode:String,
resultId:Long,
resultType: ResultOutputType,
dataKey:String,
value:Long,
changeDate:Long
)
object QepQueryBreakdownResultsRow extends ((NetworkQueryId,String,Long,ResultOutputType,String,Long,Long) => QepQueryBreakdownResultsRow){
def breakdownRowsFor(networkQueryId:NetworkQueryId,
adapterNode:String,
resultId:Long,
breakdown:(ResultOutputType,I2b2ResultEnvelope)): Iterable[QepQueryBreakdownResultsRow] = {
breakdown._2.data.map(b => QepQueryBreakdownResultsRow(networkQueryId,adapterNode,resultId,breakdown._1,b._1,b._2,System.currentTimeMillis()))
}
}
case class QepProblemDigestRow(
networkQueryId: NetworkQueryId,
adapterNode: String,
codec: String,
stampText: String,
summary: String,
description: String,
details: String,
changeDate:Long
){
def toProblemDigest = {
ProblemDigest(
codec,
stampText,
summary,
description,
if(!details.isEmpty) XML.loadString(details)
else
If you have questions about your query results or this SHRINE network, contact the Data Steward at your site.
If you have questions about your query results or this SHRINE network, contact the Data Steward at your site.
If you have questions about your query results or this SHRINE network, contact the Data Steward at your site.
If you have questions about your query results or this SHRINE network, contact the Data Steward at your site.