Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add H2 version of new durable state tests #280

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions core/src/main/resources/schema/h2/h2-create-schema-legacy.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" (
);


CREATE TABLE IF NOT EXISTS "durable_state" (
CREATE TABLE IF NOT EXISTS PUBLIC."durable_state" (
"global_offset" BIGINT NOT NULL AUTO_INCREMENT,
"persistence_id" VARCHAR(255) NOT NULL,
"revision" BIGINT NOT NULL,
Expand All @@ -30,5 +30,5 @@ CREATE TABLE IF NOT EXISTS "durable_state" (
PRIMARY KEY("persistence_id")
);

CREATE INDEX "state_tag_idx" on "durable_state" ("tag");
CREATE INDEX "state_global_offset_idx" on "durable_state" ("global_offset");
CREATE INDEX "state_tag_idx" on PUBLIC."durable_state" ("tag");
CREATE INDEX "state_global_offset_idx" on PUBLIC."durable_state" ("global_offset");
18 changes: 9 additions & 9 deletions core/src/main/resources/schema/h2/h2-create-schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE IF NOT EXISTS "event_journal" (
CREATE TABLE IF NOT EXISTS PUBLIC."event_journal" (
"ordering" BIGINT UNIQUE NOT NULL AUTO_INCREMENT,
"deleted" BOOLEAN DEFAULT false NOT NULL,
"persistence_id" VARCHAR(255) NOT NULL,
Expand All @@ -15,9 +15,9 @@ CREATE TABLE IF NOT EXISTS "event_journal" (
PRIMARY KEY("persistence_id","sequence_number")
);

CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" ("ordering");
CREATE UNIQUE INDEX "event_journal_ordering_idx" on PUBLIC."event_journal" ("ordering");

CREATE TABLE IF NOT EXISTS "event_tag" (
CREATE TABLE IF NOT EXISTS PUBLIC."event_tag" (
"event_id" BIGINT NOT NULL,
"tag" VARCHAR NOT NULL,
PRIMARY KEY("event_id", "tag"),
Expand All @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS "event_tag" (
ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS "snapshot" (
CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" (
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"created" BIGINT NOT NULL,"snapshot_ser_id" INTEGER NOT NULL,
Expand All @@ -39,10 +39,10 @@ CREATE TABLE IF NOT EXISTS "snapshot" (
PRIMARY KEY("persistence_id","sequence_number")
);

CREATE SEQUENCE IF NOT EXISTS "global_offset_seq";
CREATE SEQUENCE IF NOT EXISTS PUBLIC."global_offset_seq";

CREATE TABLE IF NOT EXISTS "durable_state" (
"global_offset" BIGINT DEFAULT NEXT VALUE FOR "global_offset_seq",
CREATE TABLE IF NOT EXISTS PUBLIC."durable_state" (
"global_offset" BIGINT DEFAULT NEXT VALUE FOR PUBLIC."global_offset_seq",
"persistence_id" VARCHAR(255) NOT NULL,
"revision" BIGINT NOT NULL,
"state_payload" BLOB NOT NULL,
Expand All @@ -52,5 +52,5 @@ CREATE TABLE IF NOT EXISTS "durable_state" (
"state_timestamp" BIGINT NOT NULL,
PRIMARY KEY("persistence_id")
);
CREATE INDEX IF NOT EXISTS "state_tag_idx" on "durable_state" ("tag");
CREATE INDEX IF NOT EXISTS "state_global_offset_idx" on "durable_state" ("global_offset");
CREATE INDEX IF NOT EXISTS "state_tag_idx" on PUBLIC."durable_state" ("tag");
CREATE INDEX IF NOT EXISTS "state_global_offset_idx" on PUBLIC."durable_state" ("global_offset");
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ private[jdbc] object SchemaUtilsImpl {
SchemaUtilsImpl.applyScriptWithSlick(SchemaUtilsImpl.fromClasspathAsString(fileToLoad), separator, logger, db)
}

/**
* INTERNAL API
*/
@InternalApi
private[jdbc] def dropWithSlickButChangeSchema(schemaType: SchemaType, logger: Logger, db: Database,
oldSchemaName: String, newSchemaName: String): Done = {
val (fileToLoad, separator) = dropScriptFor(schemaType, false)
val script = SchemaUtilsImpl.fromClasspathAsString(fileToLoad)
.replaceAll(s"$oldSchemaName.", s"$newSchemaName.")
SchemaUtilsImpl.applyScriptWithSlick(script, separator, logger, db)
}

/**
* INTERNAL API
*/
Expand All @@ -105,6 +117,19 @@ private[jdbc] object SchemaUtilsImpl {
SchemaUtilsImpl.applyScriptWithSlick(SchemaUtilsImpl.fromClasspathAsString(fileToLoad), separator, logger, db)
}

/**
* INTERNAL API
*/
@InternalApi
private[jdbc] def createWithSlickButChangeSchema(schemaType: SchemaType, logger: Logger, db: Database,
oldSchemaName: String, newSchemaName: String): Done = {
val (fileToLoad, separator) = createScriptFor(schemaType, false)
val script = SchemaUtilsImpl.fromClasspathAsString(fileToLoad)
.replaceAll(s"$oldSchemaName.", s"$newSchemaName.")
val scriptWithSchemaCreate = s"CREATE SCHEMA IF NOT EXISTS $newSchemaName$separator$script"
SchemaUtilsImpl.applyScriptWithSlick(scriptWithSchemaCreate, separator, logger, db)
}

private def applyScriptWithSlick(script: String, separator: String, logger: Logger, database: Database): Done = {

def withStatement(f: Statement => Unit): Done = {
Expand Down Expand Up @@ -152,7 +177,11 @@ private[jdbc] object SchemaUtilsImpl {
}
}

private def slickProfileToSchemaType(profile: JdbcProfile): SchemaType =
/**
* INTERNAL API
*/
@InternalApi
private[jdbc] def slickProfileToSchemaType(profile: JdbcProfile): SchemaType =
profile match {
case PostgresProfile => Postgres
case MySQLProfile => MySQL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,22 @@ package org.apache.pekko.persistence.jdbc.state.scaladsl
import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.pekko
import pekko.actor._
import pekko.persistence.jdbc.config.SlickConfiguration
import pekko.persistence.jdbc.db.SlickDatabase
import pekko.persistence.jdbc.testkit.internal.SchemaUtilsImpl
import pekko.persistence.state.DurableStateStoreRegistry
import pekko.util.Timeout
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.slf4j.LoggerFactory
import slick.jdbc.{ H2Profile, JdbcProfile }

import scala.concurrent.duration.DurationInt

abstract class DurableStateStorePluginSpec(config: Config, profile: JdbcProfile)
extends AnyWordSpecLike
with BeforeAndAfterAll
Expand All @@ -45,5 +54,87 @@ abstract class DurableStateStorePluginSpec(config: Config, profile: JdbcProfile)
}
}

abstract class DurableStateStoreSchemaPluginSpec(val config: Config, profile: JdbcProfile)
extends AnyWordSpecLike
with BeforeAndAfterAll
with Matchers
with ScalaFutures
with DataGenerationHelper {

private val logger = LoggerFactory.getLogger(this.getClass)
protected def defaultSchemaName: String = "public"
val schemaName: String = "pekko"
implicit val timeout: Timeout = Timeout(1.minute)
implicit val defaultPatience: PatienceConfig =
PatienceConfig(timeout = Span(60, Seconds), interval = Span(100, Millis))

val customConfig: Config = ConfigFactory.parseString("""
jdbc-durable-state-store {
tables {
durable_state {
schemaName = "pekko"
}
}
}
""")

implicit lazy val system: ExtendedActorSystem =
ActorSystem(
"test",
customConfig.withFallback(config)
).asInstanceOf[ExtendedActorSystem]

lazy val db = SlickDatabase.database(
config,
new SlickConfiguration(config.getConfig("slick")), "slick.db"
)

override def beforeAll(): Unit =
SchemaUtilsImpl.createWithSlickButChangeSchema(
SchemaUtilsImpl.slickProfileToSchemaType(profile),
logger, db, defaultSchemaName, schemaName)

override def afterAll(): Unit = {
SchemaUtilsImpl.dropWithSlickButChangeSchema(
SchemaUtilsImpl.slickProfileToSchemaType(profile),
logger, db, defaultSchemaName, schemaName)
db.close()
system.terminate().futureValue
}

"A durable state store plugin" must {
"instantiate a JdbcDurableDataStore successfully" in {

val store = DurableStateStoreRegistry
.get(system)
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)

store shouldBe a[JdbcDurableStateStore[_]]
store.system.settings.config shouldBe system.settings.config
store.profile shouldBe profile
}

"persist states successfully" in {

val store = DurableStateStoreRegistry
.get(system)
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)

upsertManyForOnePersistenceId(store, "durable_state", "durable-t1", 1, 400).size shouldBe 400

eventually {
store.maxStateStoreOffset().futureValue shouldBe 400
}
}
}

}

class H2DurableStateStorePluginSpec
extends DurableStateStorePluginSpec(ConfigFactory.load("h2-application.conf"), H2Profile)

class H2DurableStateStorePluginSchemaSpec
extends DurableStateStoreSchemaPluginSpec(ConfigFactory.load("h2-application.conf"),
H2Profile) {
override protected def defaultSchemaName: String = "PUBLIC"
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,114 +9,16 @@

package org.apache.pekko.persistence.jdbc.integration

import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.pekko
import pekko.util.Timeout
import pekko.actor.{ ActorSystem, ExtendedActorSystem }
import pekko.persistence.jdbc.config.SlickConfiguration
import pekko.persistence.jdbc.db.SlickDatabase
import pekko.persistence.jdbc.state.scaladsl.{
DataGenerationHelper,
import com.typesafe.config.ConfigFactory
import slick.jdbc.PostgresProfile
import org.apache.pekko.persistence.jdbc.state.scaladsl.{
DurableStateStorePluginSpec,
JdbcDurableStateStore
DurableStateStoreSchemaPluginSpec
}
import pekko.persistence.jdbc.testkit.internal.Postgres
import pekko.persistence.jdbc.util.DropCreate
import pekko.persistence.state.DurableStateStoreRegistry
import slick.jdbc.PostgresProfile
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{ Millis, Seconds, Span }
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import scala.concurrent.duration.DurationInt

class PostgresDurableStateStorePluginSpec
extends DurableStateStorePluginSpec(ConfigFactory.load("postgres-shared-db-application.conf"), PostgresProfile) {}

class PostgresDurableStateStorePluginSchemaSpec
extends AnyWordSpecLike
with BeforeAndAfterAll
with BeforeAndAfterEach
with Matchers
with ScalaFutures
with DropCreate
with DataGenerationHelper {

val profile = PostgresProfile
val config = ConfigFactory.load("postgres-application.conf")
val schemaName: String = "pekko"
implicit val timeout: Timeout = Timeout(1.minute)
implicit val defaultPatience: PatienceConfig =
PatienceConfig(timeout = Span(60, Seconds), interval = Span(100, Millis))

val customConfig: Config = ConfigFactory.parseString(s"""
jdbc-durable-state-store {
tables {
durable_state {
schemaName = "pekko"
}
}
}
""")

implicit lazy val system: ExtendedActorSystem =
ActorSystem(
"test",
customConfig.withFallback(config)
).asInstanceOf[ExtendedActorSystem]

lazy val db = SlickDatabase.database(
config,
new SlickConfiguration(config.getConfig("slick")), "slick.db"
)

private val createSchema = s"CREATE SCHEMA IF NOT EXISTS $schemaName;"
private val moveDurableStateTableToSchema = s"alter table public.durable_state set schema $schemaName;"
private val moveDurableStateTableToPublic = s"alter table $schemaName.durable_state set schema public;"
private val createSchemaAndMoveTable = s"${createSchema}${moveDurableStateTableToSchema}"

override def beforeAll(): Unit = {
dropAndCreate(Postgres)
}

override def beforeEach(): Unit = {
withStatement(_.execute(createSchemaAndMoveTable))
}

"A durable state store plugin" must {
"instantiate a JdbcDurableDataStore successfully" in {

val store = DurableStateStoreRegistry
.get(system)
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)

store shouldBe a[JdbcDurableStateStore[_]]
store.system.settings.config shouldBe system.settings.config
store.profile shouldBe profile
}

"persist states successfully" in {

val store = DurableStateStoreRegistry
.get(system)
.durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier)

upsertManyForOnePersistenceId(store, "durable_state", "durable-t1", 1, 400).size shouldBe 400

eventually {
store.maxStateStoreOffset().futureValue shouldBe 400
}
}
}

override def afterEach(): Unit = {
withStatement(_.execute(moveDurableStateTableToPublic))
}

override def afterAll(): Unit = {
system.terminate().futureValue

}
}
extends DurableStateStoreSchemaPluginSpec(ConfigFactory.load("postgres-application.conf"),
PostgresProfile) {}
Loading