diff --git a/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql b/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql index 53b0f369..8922d691 100644 --- a/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql +++ b/core/src/main/resources/schema/h2/h2-create-schema-legacy.sql @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" ( ); -CREATE TABLE IF NOT EXISTS "durable_state" ( +CREATE TABLE IF NOT EXISTS PUBLIC."durable_state" ( "global_offset" BIGINT NOT NULL AUTO_INCREMENT, "persistence_id" VARCHAR(255) NOT NULL, "revision" BIGINT NOT NULL, @@ -30,5 +30,5 @@ CREATE TABLE IF NOT EXISTS "durable_state" ( PRIMARY KEY("persistence_id") ); -CREATE INDEX "state_tag_idx" on "durable_state" ("tag"); -CREATE INDEX "state_global_offset_idx" on "durable_state" ("global_offset"); +CREATE INDEX "state_tag_idx" on PUBLIC."durable_state" ("tag"); +CREATE INDEX "state_global_offset_idx" on PUBLIC."durable_state" ("global_offset"); diff --git a/core/src/main/resources/schema/h2/h2-create-schema.sql b/core/src/main/resources/schema/h2/h2-create-schema.sql index ca44e876..ee6d0c96 100644 --- a/core/src/main/resources/schema/h2/h2-create-schema.sql +++ b/core/src/main/resources/schema/h2/h2-create-schema.sql @@ -1,4 +1,4 @@ -CREATE TABLE IF NOT EXISTS "event_journal" ( +CREATE TABLE IF NOT EXISTS PUBLIC."event_journal" ( "ordering" BIGINT UNIQUE NOT NULL AUTO_INCREMENT, "deleted" BOOLEAN DEFAULT false NOT NULL, "persistence_id" VARCHAR(255) NOT NULL, @@ -15,9 +15,9 @@ CREATE TABLE IF NOT EXISTS "event_journal" ( PRIMARY KEY("persistence_id","sequence_number") ); -CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" ("ordering"); +CREATE UNIQUE INDEX "event_journal_ordering_idx" on PUBLIC."event_journal" ("ordering"); -CREATE TABLE IF NOT EXISTS "event_tag" ( +CREATE TABLE IF NOT EXISTS PUBLIC."event_tag" ( "event_id" BIGINT NOT NULL, "tag" VARCHAR NOT NULL, PRIMARY KEY("event_id", "tag"), @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS "event_tag" ( ON DELETE CASCADE ); -CREATE TABLE IF NOT EXISTS "snapshot" ( +CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" ( "persistence_id" VARCHAR(255) NOT NULL, "sequence_number" BIGINT NOT NULL, "created" BIGINT NOT NULL,"snapshot_ser_id" INTEGER NOT NULL, @@ -39,10 +39,10 @@ CREATE TABLE IF NOT EXISTS "snapshot" ( PRIMARY KEY("persistence_id","sequence_number") ); -CREATE SEQUENCE IF NOT EXISTS "global_offset_seq"; +CREATE SEQUENCE IF NOT EXISTS PUBLIC."global_offset_seq"; -CREATE TABLE IF NOT EXISTS "durable_state" ( - "global_offset" BIGINT DEFAULT NEXT VALUE FOR "global_offset_seq", +CREATE TABLE IF NOT EXISTS PUBLIC."durable_state" ( + "global_offset" BIGINT DEFAULT NEXT VALUE FOR PUBLIC."global_offset_seq", "persistence_id" VARCHAR(255) NOT NULL, "revision" BIGINT NOT NULL, "state_payload" BLOB NOT NULL, @@ -52,5 +52,5 @@ CREATE TABLE IF NOT EXISTS "durable_state" ( "state_timestamp" BIGINT NOT NULL, PRIMARY KEY("persistence_id") ); -CREATE INDEX IF NOT EXISTS "state_tag_idx" on "durable_state" ("tag"); -CREATE INDEX IF NOT EXISTS "state_global_offset_idx" on "durable_state" ("global_offset"); +CREATE INDEX IF NOT EXISTS "state_tag_idx" on PUBLIC."durable_state" ("tag"); +CREATE INDEX IF NOT EXISTS "state_global_offset_idx" on PUBLIC."durable_state" ("global_offset"); diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala index ed0f1d07..409aa227 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/testkit/internal/SchemaUtilsImpl.scala @@ -96,6 +96,18 @@ private[jdbc] object SchemaUtilsImpl { SchemaUtilsImpl.applyScriptWithSlick(SchemaUtilsImpl.fromClasspathAsString(fileToLoad), separator, logger, db) } + /** + * INTERNAL API + */ + @InternalApi + private[jdbc] def dropWithSlickButChangeSchema(schemaType: SchemaType, logger: Logger, db: Database, + oldSchemaName: String, newSchemaName: String): Done = { + val (fileToLoad, separator) = dropScriptFor(schemaType, false) + val script = SchemaUtilsImpl.fromClasspathAsString(fileToLoad) + .replaceAll(s"$oldSchemaName.", s"$newSchemaName.") + SchemaUtilsImpl.applyScriptWithSlick(script, separator, logger, db) + } + /** * INTERNAL API */ @@ -105,6 +117,19 @@ private[jdbc] object SchemaUtilsImpl { SchemaUtilsImpl.applyScriptWithSlick(SchemaUtilsImpl.fromClasspathAsString(fileToLoad), separator, logger, db) } + /** + * INTERNAL API + */ + @InternalApi + private[jdbc] def createWithSlickButChangeSchema(schemaType: SchemaType, logger: Logger, db: Database, + oldSchemaName: String, newSchemaName: String): Done = { + val (fileToLoad, separator) = createScriptFor(schemaType, false) + val script = SchemaUtilsImpl.fromClasspathAsString(fileToLoad) + .replaceAll(s"$oldSchemaName.", s"$newSchemaName.") + val scriptWithSchemaCreate = s"CREATE SCHEMA IF NOT EXISTS $newSchemaName$separator$script" + SchemaUtilsImpl.applyScriptWithSlick(scriptWithSchemaCreate, separator, logger, db) + } + private def applyScriptWithSlick(script: String, separator: String, logger: Logger, database: Database): Done = { def withStatement(f: Statement => Unit): Done = { @@ -152,7 +177,11 @@ private[jdbc] object SchemaUtilsImpl { } } - private def slickProfileToSchemaType(profile: JdbcProfile): SchemaType = + /** + * INTERNAL API + */ + @InternalApi + private[jdbc] def slickProfileToSchemaType(profile: JdbcProfile): SchemaType = profile match { case PostgresProfile => Postgres case MySQLProfile => MySQL diff --git a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala index 82af13d2..36c7d61d 100644 --- a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala +++ b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala @@ -12,13 +12,22 @@ package org.apache.pekko.persistence.jdbc.state.scaladsl import com.typesafe.config.{ Config, ConfigFactory } import org.apache.pekko import pekko.actor._ +import pekko.persistence.jdbc.config.SlickConfiguration +import pekko.persistence.jdbc.db.SlickDatabase +import pekko.persistence.jdbc.testkit.internal.SchemaUtilsImpl import pekko.persistence.state.DurableStateStoreRegistry +import pekko.util.Timeout +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{ Millis, Seconds, Span } import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.BeforeAndAfterAll -import org.scalatest.concurrent.ScalaFutures +import org.slf4j.LoggerFactory import slick.jdbc.{ H2Profile, JdbcProfile } +import scala.concurrent.duration.DurationInt + abstract class DurableStateStorePluginSpec(config: Config, profile: JdbcProfile) extends AnyWordSpecLike with BeforeAndAfterAll @@ -45,5 +54,87 @@ abstract class DurableStateStorePluginSpec(config: Config, profile: JdbcProfile) } } +abstract class DurableStateStoreSchemaPluginSpec(val config: Config, profile: JdbcProfile) + extends AnyWordSpecLike + with BeforeAndAfterAll + with Matchers + with ScalaFutures + with DataGenerationHelper { + + private val logger = LoggerFactory.getLogger(this.getClass) + protected def defaultSchemaName: String = "public" + val schemaName: String = "pekko" + implicit val timeout: Timeout = Timeout(1.minute) + implicit val defaultPatience: PatienceConfig = + PatienceConfig(timeout = Span(60, Seconds), interval = Span(100, Millis)) + + val customConfig: Config = ConfigFactory.parseString(""" + jdbc-durable-state-store { + tables { + durable_state { + schemaName = "pekko" + } + } + } + """) + + implicit lazy val system: ExtendedActorSystem = + ActorSystem( + "test", + customConfig.withFallback(config) + ).asInstanceOf[ExtendedActorSystem] + + lazy val db = SlickDatabase.database( + config, + new SlickConfiguration(config.getConfig("slick")), "slick.db" + ) + + override def beforeAll(): Unit = + SchemaUtilsImpl.createWithSlickButChangeSchema( + SchemaUtilsImpl.slickProfileToSchemaType(profile), + logger, db, defaultSchemaName, schemaName) + + override def afterAll(): Unit = { + SchemaUtilsImpl.dropWithSlickButChangeSchema( + SchemaUtilsImpl.slickProfileToSchemaType(profile), + logger, db, defaultSchemaName, schemaName) + db.close() + system.terminate().futureValue + } + + "A durable state store plugin" must { + "instantiate a JdbcDurableDataStore successfully" in { + + val store = DurableStateStoreRegistry + .get(system) + .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) + + store shouldBe a[JdbcDurableStateStore[_]] + store.system.settings.config shouldBe system.settings.config + store.profile shouldBe profile + } + + "persist states successfully" in { + + val store = DurableStateStoreRegistry + .get(system) + .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) + + upsertManyForOnePersistenceId(store, "durable_state", "durable-t1", 1, 400).size shouldBe 400 + + eventually { + store.maxStateStoreOffset().futureValue shouldBe 400 + } + } + } + +} + class H2DurableStateStorePluginSpec extends DurableStateStorePluginSpec(ConfigFactory.load("h2-application.conf"), H2Profile) + +class H2DurableStateStorePluginSchemaSpec + extends DurableStateStoreSchemaPluginSpec(ConfigFactory.load("h2-application.conf"), + H2Profile) { + override protected def defaultSchemaName: String = "PUBLIC" +} diff --git a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala index 1fc25d67..b0b1975b 100644 --- a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala +++ b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/PostgresDurableStateStorePluginSpec.scala @@ -9,114 +9,16 @@ package org.apache.pekko.persistence.jdbc.integration -import com.typesafe.config.{ Config, ConfigFactory } -import org.apache.pekko -import pekko.util.Timeout -import pekko.actor.{ ActorSystem, ExtendedActorSystem } -import pekko.persistence.jdbc.config.SlickConfiguration -import pekko.persistence.jdbc.db.SlickDatabase -import pekko.persistence.jdbc.state.scaladsl.{ - DataGenerationHelper, +import com.typesafe.config.ConfigFactory +import slick.jdbc.PostgresProfile +import org.apache.pekko.persistence.jdbc.state.scaladsl.{ DurableStateStorePluginSpec, - JdbcDurableStateStore + DurableStateStoreSchemaPluginSpec } -import pekko.persistence.jdbc.testkit.internal.Postgres -import pekko.persistence.jdbc.util.DropCreate -import pekko.persistence.state.DurableStateStoreRegistry -import slick.jdbc.PostgresProfile -import org.scalatest.concurrent.Eventually.eventually -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.matchers.should.Matchers -import org.scalatest.time.{ Millis, Seconds, Span } -import org.scalatest.wordspec.AnyWordSpecLike -import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } -import scala.concurrent.duration.DurationInt class PostgresDurableStateStorePluginSpec extends DurableStateStorePluginSpec(ConfigFactory.load("postgres-shared-db-application.conf"), PostgresProfile) {} class PostgresDurableStateStorePluginSchemaSpec - extends AnyWordSpecLike - with BeforeAndAfterAll - with BeforeAndAfterEach - with Matchers - with ScalaFutures - with DropCreate - with DataGenerationHelper { - - val profile = PostgresProfile - val config = ConfigFactory.load("postgres-application.conf") - val schemaName: String = "pekko" - implicit val timeout: Timeout = Timeout(1.minute) - implicit val defaultPatience: PatienceConfig = - PatienceConfig(timeout = Span(60, Seconds), interval = Span(100, Millis)) - - val customConfig: Config = ConfigFactory.parseString(s""" - jdbc-durable-state-store { - tables { - durable_state { - schemaName = "pekko" - } - } - } - """) - - implicit lazy val system: ExtendedActorSystem = - ActorSystem( - "test", - customConfig.withFallback(config) - ).asInstanceOf[ExtendedActorSystem] - - lazy val db = SlickDatabase.database( - config, - new SlickConfiguration(config.getConfig("slick")), "slick.db" - ) - - private val createSchema = s"CREATE SCHEMA IF NOT EXISTS $schemaName;" - private val moveDurableStateTableToSchema = s"alter table public.durable_state set schema $schemaName;" - private val moveDurableStateTableToPublic = s"alter table $schemaName.durable_state set schema public;" - private val createSchemaAndMoveTable = s"${createSchema}${moveDurableStateTableToSchema}" - - override def beforeAll(): Unit = { - dropAndCreate(Postgres) - } - - override def beforeEach(): Unit = { - withStatement(_.execute(createSchemaAndMoveTable)) - } - - "A durable state store plugin" must { - "instantiate a JdbcDurableDataStore successfully" in { - - val store = DurableStateStoreRegistry - .get(system) - .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) - - store shouldBe a[JdbcDurableStateStore[_]] - store.system.settings.config shouldBe system.settings.config - store.profile shouldBe profile - } - - "persist states successfully" in { - - val store = DurableStateStoreRegistry - .get(system) - .durableStateStoreFor[JdbcDurableStateStore[String]](JdbcDurableStateStore.Identifier) - - upsertManyForOnePersistenceId(store, "durable_state", "durable-t1", 1, 400).size shouldBe 400 - - eventually { - store.maxStateStoreOffset().futureValue shouldBe 400 - } - } - } - - override def afterEach(): Unit = { - withStatement(_.execute(moveDurableStateTableToPublic)) - } - - override def afterAll(): Unit = { - system.terminate().futureValue - - } -} + extends DurableStateStoreSchemaPluginSpec(ConfigFactory.load("postgres-application.conf"), + PostgresProfile) {}