diff --git a/commons/json-store/src/main/scala/net/shrine/json/store/db/JsonStoreDatabase.scala b/commons/json-store/src/main/scala/net/shrine/json/store/db/JsonStoreDatabase.scala
index 9c03f08d9..effeb3e56 100644
--- a/commons/json-store/src/main/scala/net/shrine/json/store/db/JsonStoreDatabase.scala
+++ b/commons/json-store/src/main/scala/net/shrine/json/store/db/JsonStoreDatabase.scala
@@ -1,183 +1,184 @@
package net.shrine.json.store.db
import java.util.UUID
import java.util.concurrent.TimeoutException
import javax.sql.DataSource
import com.typesafe.config.Config
import net.shrine.slick.{CouldNotRunDbIoActionException, NeedsWarmUp, TestableDataSourceCreator}
import net.shrine.source.ConfigSource
import slick.dbio.SuccessAction
import slick.driver.JdbcProfile
import slick.jdbc.meta.MTable
import scala.concurrent.duration.{Duration, _}
import scala.concurrent.{Await, Future}
import scala.util.control.NonFatal
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Database access to a json store via slick
*
* @author david
* @since 5/16/17
*/
object JsonStoreDatabase extends NeedsWarmUp {
val config:Config = ConfigSource.config.getConfig("shrine.jsonStore.database")
val slickProfile:JdbcProfile = ConfigSource.getObject("slickProfileClassName", config)
val timeout: Duration = ConfigSource.config.getInt("shrine.problem.timeout").seconds
import slickProfile.api._
val dataSource: DataSource = TestableDataSourceCreator.dataSource(config)
lazy val db = {
val db = Database.forDataSource(dataSource)
val createTables: String = "createTablesOnStart"
if (config.hasPath(createTables) && config.getBoolean(createTables)) {
Await.ready(db.run(IOActions.createIfNotExists), timeout)
}
db
}
def warmUp() = DatabaseConnector.runBlocking(ShrineResultsQ.selectAll.take(10).result)
case class ShrineResultDbEnvelope(
id:UUID,
version:Int,
tableChangeCount:Int,
+ //todo get shrine version here from the system as a default value
queryId:UUID,
json:String
)
/**
* The Results Table.
*/
class ShrineResultsT(tag: Tag) extends Table[ShrineResultDbEnvelope](tag, ShrineResultsQ.tableName) {
def id = column[UUID]("id", O.PrimaryKey)
- //def shrineVersion = column[String]("shrineVersion") //todo get shrine version here from the system as a default value
+ //def shrineVersion = column[String]("shrineVersion")
def version = column[Int]("version") //for optimistic locking
def tableVersion = column[Int]("tableVersion") //for change detection on a table
def queryId = column[UUID]("queryId") //for the first pass we're asking strictly for query ids
-
def json = column[String]("json")
-
def * = (id, version, tableVersion, queryId, json) <> (ShrineResultDbEnvelope.tupled, ShrineResultDbEnvelope.unapply)
+
+ //todo indexes
}
/**
* Queries related to the Problems table.
*/
object ShrineResultsQ extends TableQuery(new ShrineResultsT(_)) {
/**
* The table name
*/
- val tableName = "results"
+ val tableName = "shrineResults"
/**
* Equivalent to Select * from Problems;
*/
- val selectAll: ShrineResultsQ.type = this
+ val selectAll = this
def selectLastTableChange = Query(this.map(_.tableVersion).max)
//useful queries like "All the changes to results for a subset of queries since table version ..."
def withParameters(parameters:ShrineResultQueryParameters) = {
val everything: Query[ShrineResultsT, ShrineResultDbEnvelope, Seq] = selectAll
val afterTableChange = parameters.afterTableChange.fold(everything){change => everything.filter(_.tableVersion > change)}
val justTheseQueries = parameters.forQueryIds.fold(afterTableChange){queryIds => afterTableChange.filter(_.queryId.inSet(queryIds))}
justTheseQueries
}
}
case class ShrineResultQueryParameters(
afterTableChange:Option[Int] = None,
forQueryIds:Option[Set[UUID]] = None, //None interpreted as "all" . todo check set size < 1000 for Oracle safty if it matters.
skipOption:Option[Int] = None,
limitOption:Option[Int] = None
)
/**
* DBIO Actions. These are pre-defined IO actions that may be useful.
* Using it to centralize the location of DBIOs.
*/
object IOActions {
// For SuccessAction, just a no_op.
case object NoOperation
- val shrineResults: ShrineResultsQ.type = ShrineResultsQ
+ val shrineResults = ShrineResultsQ
val tableExists = MTable.getTables(ShrineResultsQ.tableName).map(_.nonEmpty)
val createIfNotExists = tableExists.flatMap(
if (_) SuccessAction(NoOperation) else ShrineResultsQ.schema.create)
val dropIfExists = tableExists.flatMap(
if (_) ShrineResultsQ.schema.drop else SuccessAction(NoOperation))
val clearTable = createIfNotExists andThen ShrineResultsQ.selectAll.delete
val selectAll = ShrineResultsQ.result
def countWithParameters(parameters: ShrineResultQueryParameters) = ShrineResultsQ.withParameters(parameters).size.result
def selectResultsWithParameters(parameters: ShrineResultQueryParameters) = {
val select = ShrineResultsQ.withParameters(parameters).sortBy(_.tableVersion.desc) //newest changes first
val skipSelect = parameters.skipOption.fold(select){ skip =>
select.drop(skip)
}
val limitSelect = parameters.limitOption.fold(skipSelect){ limit => skipSelect.take(limit)}
limitSelect
}
def selectLastTableChange = ShrineResultsQ.selectLastTableChange.result
def upsertShrineResult(shrineResult:ShrineResultDbEnvelope) = ShrineResultsQ.insertOrUpdate(shrineResult)
}
/**
* Entry point for interacting with the database. Runs IO actions.
*/
object DatabaseConnector {
val IO = IOActions
/**
* Executes a series of IO actions as a single transactions
*/
def executeTransaction(actions: DBIOAction[_, NoStream, _]*): Future[Unit] = {
db.run(DBIO.seq(actions:_*).transactionally)
}
/**
* Executes a series of IO actions as a single transaction, synchronous
*/
def executeTransactionBlocking(actions: DBIOAction[_, NoStream, _]*)(implicit timeout: Duration): Unit = {
try {
Await.ready(this.executeTransaction(actions: _*), timeout)
} catch {
case tx:TimeoutException => throw CouldNotRunDbIoActionException(JsonStoreDatabase.dataSource, tx)
case NonFatal(x) => throw CouldNotRunDbIoActionException(JsonStoreDatabase.dataSource, x)
}
}
/**
* Straight copy of db.run
*/
def run[R](dbio: DBIOAction[R, NoStream, _]): Future[R] = {
db.run(dbio)
}
/**
* Synchronized copy of db.run
*/
def runBlocking[R](dbio: DBIOAction[R, NoStream, _], timeout: Duration = timeout): R = {
try {
Await.result(this.run(dbio), timeout)
} catch {
case tx:TimeoutException => throw CouldNotRunDbIoActionException(JsonStoreDatabase.dataSource, tx)
case NonFatal(x) => throw CouldNotRunDbIoActionException(JsonStoreDatabase.dataSource, x)
}
}
}
}
\ No newline at end of file
diff --git a/commons/json-store/src/test/scala/net/shrine/json/store/db/JsonStoreDatabaseTest.scala b/commons/json-store/src/test/scala/net/shrine/json/store/db/JsonStoreDatabaseTest.scala
index b7d11310d..a03b20e8a 100644
--- a/commons/json-store/src/test/scala/net/shrine/json/store/db/JsonStoreDatabaseTest.scala
+++ b/commons/json-store/src/test/scala/net/shrine/json/store/db/JsonStoreDatabaseTest.scala
@@ -1,96 +1,69 @@
package net.shrine.json.store.db
import java.util.UUID
import net.shrine.json.store.db.JsonStoreDatabase.IOActions.NoOperation
import net.shrine.json.store.db.JsonStoreDatabase.ShrineResultDbEnvelope
import org.junit.runner.RunWith
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
-//import slick.driver.H2Driver.api._
+import slick.driver.H2Driver.api._ //todo do without this import
import scala.concurrent.duration._
/**
* Test creation, insertion, querying, and deletion of ProblemDigest values into an
* in-memory H2 database. Demonstrates proof of concept mapping of ProblemDigest
* case class into a database.
*/
@RunWith(classOf[JUnitRunner])
class JsonStoreDatabaseTest extends FlatSpec with BeforeAndAfter with ScalaFutures with Matchers {
implicit val timeout = 10.seconds
val connector = JsonStoreDatabase.DatabaseConnector
val IO = connector.IO
val testShrineResults = Seq(
ShrineResultDbEnvelope(id = UUID.randomUUID(),version = 0,tableChangeCount = 0,queryId = UUID.randomUUID(),json = "todo"),
ShrineResultDbEnvelope(id = UUID.randomUUID(),version = 0,tableChangeCount = 0,queryId = UUID.randomUUID(),json = "todo"),
ShrineResultDbEnvelope(id = UUID.randomUUID(),version = 0,tableChangeCount = 0,queryId = UUID.randomUUID(),json = "todo"),
ShrineResultDbEnvelope(id = UUID.randomUUID(),version = 0,tableChangeCount = 0,queryId = UUID.randomUUID(),json = "todo"),
ShrineResultDbEnvelope(id = UUID.randomUUID(),version = 0,tableChangeCount = 0,queryId = UUID.randomUUID(),json = "todo")
)
before {
connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false
connector.runBlocking(IO.createIfNotExists >> IO.tableExists) shouldBe true
connector.runBlocking(IO.createIfNotExists) shouldBe NoOperation
connector.runBlocking(IO.selectAll) shouldBe empty
}
after {
connector.runBlocking(IO.tableExists) shouldBe true
connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false
connector.runBlocking(IO.dropIfExists) shouldBe NoOperation
}
"The Database" should "Connect without any problems" in {
- // Insert the suppliers and ProblemDigests
-// connector.executeTransactionBlocking(connector.IO.shrineResults ++= testShrineResults)
-/*
- // Test that they are all in the table
- var * = connector.runBlocking(IO.selectAll)
- * should contain theSameElementsAs problemDigests
- * should have length problemDigests.length
-
- // Reset the table
- connector.runBlocking(IO.resetTable >> IO.selectAll) shouldBe empty
+ // Insert the test records
+ connector.executeTransactionBlocking(IO.shrineResults ++= testShrineResults)
- // Run the test again
- connector.executeTransactionBlocking(IO.problems += problemDigests.head,
- IO.problems += problemDigests(1),
- IO.problems += problemDigests(2),
- IO.problems += problemDigests(3))
// Test that they are all in the table
- * = connector.runBlocking(IO.selectAll)
- * should contain theSameElementsAs problemDigests
- * should have length problemDigests.length
-
+ var shrineResultContents = connector.runBlocking(IO.selectAll)
+ shrineResultContents should contain theSameElementsAs testShrineResults
+ shrineResultContents should have length testShrineResults.length
- // Test that the simple select and filter queries work
- val filtered = connector.runBlocking(IO.problems.filter(_.codec === "code").map(_.description).result)
- filtered should have length 1
- filtered.head shouldBe problemDigests(3).description
-
- // This also tests that our conversion from xml to strings works
- val xml = connector.runBlocking(IO.problems.map(_.xml).result)
- xml should have length problemDigests.length
- xml should contain theSameElementsAs problemDigests.map(_.detailsXml.toString())
-
- val result = connector.runBlocking(IO.sizeAndProblemDigest(2))
- result._1 should have length 2
- result._2 shouldBe problemDigests.length
- result._1.head shouldBe problemDigests(3)
- result._1(1) shouldBe problemDigests.head
+ // Reset the table
+ connector.runBlocking(IO.clearTable >> IO.selectAll) shouldBe empty
- val resultOverLength = connector.runBlocking(IO.sizeAndProblemDigest(10))
- resultOverLength._1 should have length 4
- resultOverLength._1 should contain theSameElementsAs problemDigests
+ //Insert one at a time in a transaction
+ val actions = testShrineResults.map(IO.shrineResults += _)
- connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size
+ connector.executeTransactionBlocking(actions:_*)
+ // Test that they are all in the table
+ shrineResultContents = connector.runBlocking(IO.selectAll)
+ shrineResultContents should contain theSameElementsAs testShrineResults
+ shrineResultContents should have length testShrineResults.length
- val testProblem = ProblemDatabaseTestProblem(ProblemSources.Unknown)
- Thread.sleep(200)
- connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size + 1
-*/
+ //todo test more low-level queries
}
}
diff --git a/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala b/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala
index ce6a902a4..fb8426f68 100644
--- a/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala
+++ b/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala
@@ -1,94 +1,94 @@
package net.shrine.problem
import org.junit.runner.RunWith
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
-import slick.driver.H2Driver.api._
+import slick.driver.H2Driver.api._ //todo do without this import
import scala.concurrent.duration._
/**
* Test creation, insertion, querying, and deletion of ProblemDigest values into an
* in-memory H2 database. Demonstrates proof of concept mapping of ProblemDigest
* case class into a database.
*/
@RunWith(classOf[JUnitRunner])
class DashboardProblemDatabaseTest extends FlatSpec with BeforeAndAfter with ScalaFutures with Matchers {
implicit val timeout = 10.seconds
val connector = Problems.DatabaseConnector
val IO = connector.IO
val problemDigests = Seq(
ProblemDigest("MJPG", "01:01:01", "summary here", "description here" , uh not sure , 2),
ProblemDigest("wewu", "01:02:01", "coffee spill", "coffee everywhere" , He chose decaf , 1),
ProblemDigest("wuwu", "02:01:01", "squirrel" , "chewed all the cables", Like ALL of them , 0),
ProblemDigest("code", "10:01:02", "such summary", "such description" , Wow , 3))
before {
connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false
connector.runBlocking(IO.createIfNotExists >> IO.tableExists) shouldBe true
connector.runBlocking(IO.createIfNotExists) shouldBe NoOperation
connector.runBlocking(IO.selectAll) shouldBe empty
}
after {
connector.runBlocking(IO.tableExists) shouldBe true
connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false
connector.runBlocking(IO.dropIfExists) shouldBe NoOperation
}
"The Database" should "Connect without any problems" in {
// Insert the suppliers and ProblemDigests
connector.executeTransactionBlocking(IO.problems ++= problemDigests)
// Test that they are all in the table
var * = connector.runBlocking(IO.selectAll)
* should contain theSameElementsAs problemDigests
* should have length problemDigests.length
// Reset the table
connector.runBlocking(IO.resetTable >> IO.selectAll) shouldBe empty
// Run the test again
connector.executeTransactionBlocking(IO.problems += problemDigests.head,
IO.problems += problemDigests(1),
IO.problems += problemDigests(2),
IO.problems += problemDigests(3))
// Test that they are all in the table
* = connector.runBlocking(IO.selectAll)
* should contain theSameElementsAs problemDigests
* should have length problemDigests.length
// Test that the simple select and filter queries work
val filtered = connector.runBlocking(IO.problems.filter(_.codec === "code").map(_.description).result)
filtered should have length 1
filtered.head shouldBe problemDigests(3).description
// This also tests that our conversion from xml to strings works
val xml = connector.runBlocking(IO.problems.map(_.xml).result)
xml should have length problemDigests.length
xml should contain theSameElementsAs problemDigests.map(_.detailsXml.toString())
val result = connector.runBlocking(IO.sizeAndProblemDigest(2))
result._1 should have length 2
result._2 shouldBe problemDigests.length
result._1.head shouldBe problemDigests(3)
result._1(1) shouldBe problemDigests.head
val resultOverLength = connector.runBlocking(IO.sizeAndProblemDigest(10))
resultOverLength._1 should have length 4
resultOverLength._1 should contain theSameElementsAs problemDigests
connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size
val testProblem = ProblemDatabaseTestProblem(ProblemSources.Unknown)
Thread.sleep(200)
connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size + 1
}
}
case class ProblemDatabaseTestProblem(source: ProblemSources.ProblemSource) extends AbstractProblem(source: ProblemSources.ProblemSource) {
override def summary: String = "This is a test problem! No user should ever see this."
override def description: String = "Wow, this is a nice looking problem. I mean really, just look at it."
}
\ No newline at end of file