diff --git a/commons/json-store/src/main/scala/net/shrine/json/store/db/JsonStoreDatabase.scala b/commons/json-store/src/main/scala/net/shrine/json/store/db/JsonStoreDatabase.scala index 9c03f08d9..effeb3e56 100644 --- a/commons/json-store/src/main/scala/net/shrine/json/store/db/JsonStoreDatabase.scala +++ b/commons/json-store/src/main/scala/net/shrine/json/store/db/JsonStoreDatabase.scala @@ -1,183 +1,184 @@ package net.shrine.json.store.db import java.util.UUID import java.util.concurrent.TimeoutException import javax.sql.DataSource import com.typesafe.config.Config import net.shrine.slick.{CouldNotRunDbIoActionException, NeedsWarmUp, TestableDataSourceCreator} import net.shrine.source.ConfigSource import slick.dbio.SuccessAction import slick.driver.JdbcProfile import slick.jdbc.meta.MTable import scala.concurrent.duration.{Duration, _} import scala.concurrent.{Await, Future} import scala.util.control.NonFatal import scala.concurrent.ExecutionContext.Implicits.global /** * Database access to a json store via slick * * @author david * @since 5/16/17 */ object JsonStoreDatabase extends NeedsWarmUp { val config:Config = ConfigSource.config.getConfig("shrine.jsonStore.database") val slickProfile:JdbcProfile = ConfigSource.getObject("slickProfileClassName", config) val timeout: Duration = ConfigSource.config.getInt("shrine.problem.timeout").seconds import slickProfile.api._ val dataSource: DataSource = TestableDataSourceCreator.dataSource(config) lazy val db = { val db = Database.forDataSource(dataSource) val createTables: String = "createTablesOnStart" if (config.hasPath(createTables) && config.getBoolean(createTables)) { Await.ready(db.run(IOActions.createIfNotExists), timeout) } db } def warmUp() = DatabaseConnector.runBlocking(ShrineResultsQ.selectAll.take(10).result) case class ShrineResultDbEnvelope( id:UUID, version:Int, tableChangeCount:Int, + //todo get shrine version here from the system as a default value queryId:UUID, json:String ) /** * The Results Table. */ class ShrineResultsT(tag: Tag) extends Table[ShrineResultDbEnvelope](tag, ShrineResultsQ.tableName) { def id = column[UUID]("id", O.PrimaryKey) - //def shrineVersion = column[String]("shrineVersion") //todo get shrine version here from the system as a default value + //def shrineVersion = column[String]("shrineVersion") def version = column[Int]("version") //for optimistic locking def tableVersion = column[Int]("tableVersion") //for change detection on a table def queryId = column[UUID]("queryId") //for the first pass we're asking strictly for query ids - def json = column[String]("json") - def * = (id, version, tableVersion, queryId, json) <> (ShrineResultDbEnvelope.tupled, ShrineResultDbEnvelope.unapply) + + //todo indexes } /** * Queries related to the Problems table. */ object ShrineResultsQ extends TableQuery(new ShrineResultsT(_)) { /** * The table name */ - val tableName = "results" + val tableName = "shrineResults" /** * Equivalent to Select * from Problems; */ - val selectAll: ShrineResultsQ.type = this + val selectAll = this def selectLastTableChange = Query(this.map(_.tableVersion).max) //useful queries like "All the changes to results for a subset of queries since table version ..." def withParameters(parameters:ShrineResultQueryParameters) = { val everything: Query[ShrineResultsT, ShrineResultDbEnvelope, Seq] = selectAll val afterTableChange = parameters.afterTableChange.fold(everything){change => everything.filter(_.tableVersion > change)} val justTheseQueries = parameters.forQueryIds.fold(afterTableChange){queryIds => afterTableChange.filter(_.queryId.inSet(queryIds))} justTheseQueries } } case class ShrineResultQueryParameters( afterTableChange:Option[Int] = None, forQueryIds:Option[Set[UUID]] = None, //None interpreted as "all" . todo check set size < 1000 for Oracle safty if it matters. skipOption:Option[Int] = None, limitOption:Option[Int] = None ) /** * DBIO Actions. These are pre-defined IO actions that may be useful. * Using it to centralize the location of DBIOs. */ object IOActions { // For SuccessAction, just a no_op. case object NoOperation - val shrineResults: ShrineResultsQ.type = ShrineResultsQ + val shrineResults = ShrineResultsQ val tableExists = MTable.getTables(ShrineResultsQ.tableName).map(_.nonEmpty) val createIfNotExists = tableExists.flatMap( if (_) SuccessAction(NoOperation) else ShrineResultsQ.schema.create) val dropIfExists = tableExists.flatMap( if (_) ShrineResultsQ.schema.drop else SuccessAction(NoOperation)) val clearTable = createIfNotExists andThen ShrineResultsQ.selectAll.delete val selectAll = ShrineResultsQ.result def countWithParameters(parameters: ShrineResultQueryParameters) = ShrineResultsQ.withParameters(parameters).size.result def selectResultsWithParameters(parameters: ShrineResultQueryParameters) = { val select = ShrineResultsQ.withParameters(parameters).sortBy(_.tableVersion.desc) //newest changes first val skipSelect = parameters.skipOption.fold(select){ skip => select.drop(skip) } val limitSelect = parameters.limitOption.fold(skipSelect){ limit => skipSelect.take(limit)} limitSelect } def selectLastTableChange = ShrineResultsQ.selectLastTableChange.result def upsertShrineResult(shrineResult:ShrineResultDbEnvelope) = ShrineResultsQ.insertOrUpdate(shrineResult) } /** * Entry point for interacting with the database. Runs IO actions. */ object DatabaseConnector { val IO = IOActions /** * Executes a series of IO actions as a single transactions */ def executeTransaction(actions: DBIOAction[_, NoStream, _]*): Future[Unit] = { db.run(DBIO.seq(actions:_*).transactionally) } /** * Executes a series of IO actions as a single transaction, synchronous */ def executeTransactionBlocking(actions: DBIOAction[_, NoStream, _]*)(implicit timeout: Duration): Unit = { try { Await.ready(this.executeTransaction(actions: _*), timeout) } catch { case tx:TimeoutException => throw CouldNotRunDbIoActionException(JsonStoreDatabase.dataSource, tx) case NonFatal(x) => throw CouldNotRunDbIoActionException(JsonStoreDatabase.dataSource, x) } } /** * Straight copy of db.run */ def run[R](dbio: DBIOAction[R, NoStream, _]): Future[R] = { db.run(dbio) } /** * Synchronized copy of db.run */ def runBlocking[R](dbio: DBIOAction[R, NoStream, _], timeout: Duration = timeout): R = { try { Await.result(this.run(dbio), timeout) } catch { case tx:TimeoutException => throw CouldNotRunDbIoActionException(JsonStoreDatabase.dataSource, tx) case NonFatal(x) => throw CouldNotRunDbIoActionException(JsonStoreDatabase.dataSource, x) } } } } \ No newline at end of file diff --git a/commons/json-store/src/test/scala/net/shrine/json/store/db/JsonStoreDatabaseTest.scala b/commons/json-store/src/test/scala/net/shrine/json/store/db/JsonStoreDatabaseTest.scala index b7d11310d..a03b20e8a 100644 --- a/commons/json-store/src/test/scala/net/shrine/json/store/db/JsonStoreDatabaseTest.scala +++ b/commons/json-store/src/test/scala/net/shrine/json/store/db/JsonStoreDatabaseTest.scala @@ -1,96 +1,69 @@ package net.shrine.json.store.db import java.util.UUID import net.shrine.json.store.db.JsonStoreDatabase.IOActions.NoOperation import net.shrine.json.store.db.JsonStoreDatabase.ShrineResultDbEnvelope import org.junit.runner.RunWith import org.scalatest.concurrent.ScalaFutures import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} -//import slick.driver.H2Driver.api._ +import slick.driver.H2Driver.api._ //todo do without this import import scala.concurrent.duration._ /** * Test creation, insertion, querying, and deletion of ProblemDigest values into an * in-memory H2 database. Demonstrates proof of concept mapping of ProblemDigest * case class into a database. */ @RunWith(classOf[JUnitRunner]) class JsonStoreDatabaseTest extends FlatSpec with BeforeAndAfter with ScalaFutures with Matchers { implicit val timeout = 10.seconds val connector = JsonStoreDatabase.DatabaseConnector val IO = connector.IO val testShrineResults = Seq( ShrineResultDbEnvelope(id = UUID.randomUUID(),version = 0,tableChangeCount = 0,queryId = UUID.randomUUID(),json = "todo"), ShrineResultDbEnvelope(id = UUID.randomUUID(),version = 0,tableChangeCount = 0,queryId = UUID.randomUUID(),json = "todo"), ShrineResultDbEnvelope(id = UUID.randomUUID(),version = 0,tableChangeCount = 0,queryId = UUID.randomUUID(),json = "todo"), ShrineResultDbEnvelope(id = UUID.randomUUID(),version = 0,tableChangeCount = 0,queryId = UUID.randomUUID(),json = "todo"), ShrineResultDbEnvelope(id = UUID.randomUUID(),version = 0,tableChangeCount = 0,queryId = UUID.randomUUID(),json = "todo") ) before { connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false connector.runBlocking(IO.createIfNotExists >> IO.tableExists) shouldBe true connector.runBlocking(IO.createIfNotExists) shouldBe NoOperation connector.runBlocking(IO.selectAll) shouldBe empty } after { connector.runBlocking(IO.tableExists) shouldBe true connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false connector.runBlocking(IO.dropIfExists) shouldBe NoOperation } "The Database" should "Connect without any problems" in { - // Insert the suppliers and ProblemDigests -// connector.executeTransactionBlocking(connector.IO.shrineResults ++= testShrineResults) -/* - // Test that they are all in the table - var * = connector.runBlocking(IO.selectAll) - * should contain theSameElementsAs problemDigests - * should have length problemDigests.length - - // Reset the table - connector.runBlocking(IO.resetTable >> IO.selectAll) shouldBe empty + // Insert the test records + connector.executeTransactionBlocking(IO.shrineResults ++= testShrineResults) - // Run the test again - connector.executeTransactionBlocking(IO.problems += problemDigests.head, - IO.problems += problemDigests(1), - IO.problems += problemDigests(2), - IO.problems += problemDigests(3)) // Test that they are all in the table - * = connector.runBlocking(IO.selectAll) - * should contain theSameElementsAs problemDigests - * should have length problemDigests.length - + var shrineResultContents = connector.runBlocking(IO.selectAll) + shrineResultContents should contain theSameElementsAs testShrineResults + shrineResultContents should have length testShrineResults.length - // Test that the simple select and filter queries work - val filtered = connector.runBlocking(IO.problems.filter(_.codec === "code").map(_.description).result) - filtered should have length 1 - filtered.head shouldBe problemDigests(3).description - - // This also tests that our conversion from xml to strings works - val xml = connector.runBlocking(IO.problems.map(_.xml).result) - xml should have length problemDigests.length - xml should contain theSameElementsAs problemDigests.map(_.detailsXml.toString()) - - val result = connector.runBlocking(IO.sizeAndProblemDigest(2)) - result._1 should have length 2 - result._2 shouldBe problemDigests.length - result._1.head shouldBe problemDigests(3) - result._1(1) shouldBe problemDigests.head + // Reset the table + connector.runBlocking(IO.clearTable >> IO.selectAll) shouldBe empty - val resultOverLength = connector.runBlocking(IO.sizeAndProblemDigest(10)) - resultOverLength._1 should have length 4 - resultOverLength._1 should contain theSameElementsAs problemDigests + //Insert one at a time in a transaction + val actions = testShrineResults.map(IO.shrineResults += _) - connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size + connector.executeTransactionBlocking(actions:_*) + // Test that they are all in the table + shrineResultContents = connector.runBlocking(IO.selectAll) + shrineResultContents should contain theSameElementsAs testShrineResults + shrineResultContents should have length testShrineResults.length - val testProblem = ProblemDatabaseTestProblem(ProblemSources.Unknown) - Thread.sleep(200) - connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size + 1 -*/ + //todo test more low-level queries } } diff --git a/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala b/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala index ce6a902a4..fb8426f68 100644 --- a/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala +++ b/commons/util/src/test/scala/net/shrine/problem/DashboardProblemDatabaseTest.scala @@ -1,94 +1,94 @@ package net.shrine.problem import org.junit.runner.RunWith import org.scalatest.concurrent.ScalaFutures import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} -import slick.driver.H2Driver.api._ +import slick.driver.H2Driver.api._ //todo do without this import import scala.concurrent.duration._ /** * Test creation, insertion, querying, and deletion of ProblemDigest values into an * in-memory H2 database. Demonstrates proof of concept mapping of ProblemDigest * case class into a database. */ @RunWith(classOf[JUnitRunner]) class DashboardProblemDatabaseTest extends FlatSpec with BeforeAndAfter with ScalaFutures with Matchers { implicit val timeout = 10.seconds val connector = Problems.DatabaseConnector val IO = connector.IO val problemDigests = Seq( ProblemDigest("MJPG", "01:01:01", "summary here", "description here" ,
uh not sure
, 2), ProblemDigest("wewu", "01:02:01", "coffee spill", "coffee everywhere" ,
He chose decaf
, 1), ProblemDigest("wuwu", "02:01:01", "squirrel" , "chewed all the cables",
Like ALL of them
, 0), ProblemDigest("code", "10:01:02", "such summary", "such description" ,
Wow
, 3)) before { connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false connector.runBlocking(IO.createIfNotExists >> IO.tableExists) shouldBe true connector.runBlocking(IO.createIfNotExists) shouldBe NoOperation connector.runBlocking(IO.selectAll) shouldBe empty } after { connector.runBlocking(IO.tableExists) shouldBe true connector.runBlocking(IO.dropIfExists >> IO.tableExists) shouldBe false connector.runBlocking(IO.dropIfExists) shouldBe NoOperation } "The Database" should "Connect without any problems" in { // Insert the suppliers and ProblemDigests connector.executeTransactionBlocking(IO.problems ++= problemDigests) // Test that they are all in the table var * = connector.runBlocking(IO.selectAll) * should contain theSameElementsAs problemDigests * should have length problemDigests.length // Reset the table connector.runBlocking(IO.resetTable >> IO.selectAll) shouldBe empty // Run the test again connector.executeTransactionBlocking(IO.problems += problemDigests.head, IO.problems += problemDigests(1), IO.problems += problemDigests(2), IO.problems += problemDigests(3)) // Test that they are all in the table * = connector.runBlocking(IO.selectAll) * should contain theSameElementsAs problemDigests * should have length problemDigests.length // Test that the simple select and filter queries work val filtered = connector.runBlocking(IO.problems.filter(_.codec === "code").map(_.description).result) filtered should have length 1 filtered.head shouldBe problemDigests(3).description // This also tests that our conversion from xml to strings works val xml = connector.runBlocking(IO.problems.map(_.xml).result) xml should have length problemDigests.length xml should contain theSameElementsAs problemDigests.map(_.detailsXml.toString()) val result = connector.runBlocking(IO.sizeAndProblemDigest(2)) result._1 should have length 2 result._2 shouldBe problemDigests.length result._1.head shouldBe problemDigests(3) result._1(1) shouldBe problemDigests.head val resultOverLength = connector.runBlocking(IO.sizeAndProblemDigest(10)) resultOverLength._1 should have length 4 resultOverLength._1 should contain theSameElementsAs problemDigests connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size val testProblem = ProblemDatabaseTestProblem(ProblemSources.Unknown) Thread.sleep(200) connector.runBlocking(IO.problems.size.result) shouldBe problemDigests.size + 1 } } case class ProblemDatabaseTestProblem(source: ProblemSources.ProblemSource) extends AbstractProblem(source: ProblemSources.ProblemSource) { override def summary: String = "This is a test problem! No user should ever see this." override def description: String = "Wow, this is a nice looking problem. I mean really, just look at it." } \ No newline at end of file