From 6b6a1047a6bd3f16d91d112301841f3adc784d0c Mon Sep 17 00:00:00 2001 From: shipulya Date: Thu, 30 Mar 2023 14:13:24 +0500 Subject: [PATCH 1/2] add trnsaction lock support --- database/pgx/pgx.go | 69 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/database/pgx/pgx.go b/database/pgx/pgx.go index 60d3c791a..da626756d 100644 --- a/database/pgx/pgx.go +++ b/database/pgx/pgx.go @@ -16,13 +16,14 @@ import ( "go.uber.org/atomic" - "github.com/golang-migrate/migrate/v4" - "github.com/golang-migrate/migrate/v4/database" - "github.com/golang-migrate/migrate/v4/database/multistmt" "github.com/hashicorp/go-multierror" "github.com/jackc/pgconn" "github.com/jackc/pgerrcode" _ "github.com/jackc/pgx/v4/stdlib" + + "github.com/golang-migrate/migrate/v4" + "github.com/golang-migrate/migrate/v4/database" + "github.com/golang-migrate/migrate/v4/database/multistmt" ) func init() { @@ -54,12 +55,20 @@ type Config struct { MigrationsTableQuoted bool MultiStatementEnabled bool MultiStatementMaxSize int + TransactionLock bool +} + +type Execer interface { + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) + QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row } type Postgres struct { // Locking and unlocking need to use the same connection conn *sql.Conn db *sql.DB + exec Execer isLocked atomic.Bool // Open and WithInstance need to guarantee that config is never nil @@ -128,6 +137,7 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { px := &Postgres{ conn: conn, + exec: conn, db: instance, config: config, } @@ -194,6 +204,13 @@ func (p *Postgres) Open(url string) (database.Driver, error) { return nil, fmt.Errorf("Unable to parse option x-multi-statement: %w", err) } } + transactionLockParam := false + if s := purl.Query().Get("x-transaction-lock"); len(s) > 0 { + transactionLockParam, err = strconv.ParseBool(s) + if err != nil { + return nil, fmt.Errorf("Unable to parse option x-transaction-lock: %w", err) + } + } px, err := WithInstance(db, &Config{ DatabaseName: purl.Path, @@ -202,6 +219,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) { StatementTimeout: time.Duration(statementTimeout) * time.Millisecond, MultiStatementEnabled: multiStatementEnabled, MultiStatementMaxSize: multiStatementMaxSize, + TransactionLock: transactionLockParam, }) if err != nil { @@ -227,10 +245,18 @@ func (p *Postgres) Lock() error { if err != nil { return err } - // This will wait indefinitely until the lock can be acquired. query := `SELECT pg_advisory_lock($1)` - if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + + if p.config.TransactionLock { + p.exec, err = p.conn.BeginTx(context.Background(), nil) + if err != nil { + return err + } + // This will be unlocked on transaction end + query = `SELECT pg_advisory_xact_lock($1)` + } + if _, err := p.exec.ExecContext(context.Background(), query, aid); err != nil { return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} } return nil @@ -239,6 +265,10 @@ func (p *Postgres) Lock() error { func (p *Postgres) Unlock() error { return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error { + tx, ok := p.exec.(*sql.Tx) + if ok { // if in transaction lock, simply commit + return tx.Commit() + } aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) if err != nil { return err @@ -283,7 +313,7 @@ func (p *Postgres) runStatement(statement []byte) error { if strings.TrimSpace(query) == "" { return nil } - if _, err := p.conn.ExecContext(ctx, query); err != nil { + if _, err := p.exec.ExecContext(ctx, query); err != nil { if pgErr, ok := err.(*pgconn.PgError); ok { var line uint @@ -340,9 +370,13 @@ func runesLastIndex(input []rune, target rune) int { } func (p *Postgres) SetVersion(version int, dirty bool) error { - tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{}) - if err != nil { - return &database.Error{OrigErr: err, Err: "transaction start failed"} + var err error + tx, isTxglobal := p.exec.(*sql.Tx) + if !isTxglobal { + tx, err = p.conn.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + return &database.Error{OrigErr: err, Err: "transaction start failed"} + } } query := `TRUNCATE ` + quoteIdentifier(p.config.migrationsSchemaName) + `.` + quoteIdentifier(p.config.migrationsTableName) @@ -365,9 +399,10 @@ func (p *Postgres) SetVersion(version int, dirty bool) error { return &database.Error{OrigErr: err, Query: []byte(query)} } } - - if err := tx.Commit(); err != nil { - return &database.Error{OrigErr: err, Err: "transaction commit failed"} + if !isTxglobal { + if err := tx.Commit(); err != nil { + return &database.Error{OrigErr: err, Err: "transaction commit failed"} + } } return nil @@ -375,7 +410,7 @@ func (p *Postgres) SetVersion(version int, dirty bool) error { func (p *Postgres) Version() (version int, dirty bool, err error) { query := `SELECT version, dirty FROM ` + quoteIdentifier(p.config.migrationsSchemaName) + `.` + quoteIdentifier(p.config.migrationsTableName) + ` LIMIT 1` - err = p.conn.QueryRowContext(context.Background(), query).Scan(&version, &dirty) + err = p.exec.QueryRowContext(context.Background(), query).Scan(&version, &dirty) switch { case err == sql.ErrNoRows: return database.NilVersion, false, nil @@ -396,7 +431,7 @@ func (p *Postgres) Version() (version int, dirty bool, err error) { func (p *Postgres) Drop() (err error) { // select all tables in current schema query := `SELECT table_name FROM information_schema.tables WHERE table_schema=(SELECT current_schema()) AND table_type='BASE TABLE'` - tables, err := p.conn.QueryContext(context.Background(), query) + tables, err := p.exec.QueryContext(context.Background(), query) if err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } @@ -425,7 +460,7 @@ func (p *Postgres) Drop() (err error) { // delete one by one ... for _, t := range tableNames { query = `DROP TABLE IF EXISTS ` + quoteIdentifier(t) + ` CASCADE` - if _, err := p.conn.ExecContext(context.Background(), query); err != nil { + if _, err := p.exec.ExecContext(context.Background(), query); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } } @@ -457,7 +492,7 @@ func (p *Postgres) ensureVersionTable() (err error) { // `CREATE TABLE IF NOT EXISTS...` query would fail because the user does not have the CREATE permission. // Taken from https://github.com/mattes/migrate/blob/master/database/postgres/postgres.go#L258 query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2 LIMIT 1` - row := p.conn.QueryRowContext(context.Background(), query, p.config.migrationsSchemaName, p.config.migrationsTableName) + row := p.exec.QueryRowContext(context.Background(), query, p.config.migrationsSchemaName, p.config.migrationsTableName) var count int err = row.Scan(&count) @@ -470,7 +505,7 @@ func (p *Postgres) ensureVersionTable() (err error) { } query = `CREATE TABLE IF NOT EXISTS ` + quoteIdentifier(p.config.migrationsSchemaName) + `.` + quoteIdentifier(p.config.migrationsTableName) + ` (version bigint not null primary key, dirty boolean not null)` - if _, err = p.conn.ExecContext(context.Background(), query); err != nil { + if _, err = p.exec.ExecContext(context.Background(), query); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } From 21f72ad387902333f253bc9235f41896cae34f8e Mon Sep 17 00:00:00 2001 From: shipulya Date: Thu, 30 Mar 2023 15:22:50 +0500 Subject: [PATCH 2/2] tests --- database/pgx/README.md | 39 ++-- .../1085649617_create_users_table.down.sql | 1 + .../1085649617_create_users_table.up.sql | 5 + .../1185749658_add_city_to_users.down.sql | 1 + .../1185749658_add_city_to_users.up.sql | 3 + ...85849751_add_index_on_user_emails.down.sql | 1 + .../1385949617_create_books_table.down.sql | 1 + .../1385949617_create_books_table.up.sql | 5 + .../1485949617_create_movies_table.down.sql | 1 + .../1485949617_create_movies_table.up.sql | 5 + .../1585849751_just_a_comment.up.sql | 1 + .../1685849751_another_comment.up.sql | 1 + .../1785849751_another_comment.up.sql | 1 + .../1885849751_another_comment.up.sql | 1 + database/pgx/pgx.go | 1 + database/pgx/pgx_test.go | 207 ++++++++++-------- 16 files changed, 167 insertions(+), 107 deletions(-) create mode 100644 database/pgx/examples/transact_migrations/1085649617_create_users_table.down.sql create mode 100644 database/pgx/examples/transact_migrations/1085649617_create_users_table.up.sql create mode 100644 database/pgx/examples/transact_migrations/1185749658_add_city_to_users.down.sql create mode 100644 database/pgx/examples/transact_migrations/1185749658_add_city_to_users.up.sql create mode 100644 database/pgx/examples/transact_migrations/1285849751_add_index_on_user_emails.down.sql create mode 100644 database/pgx/examples/transact_migrations/1385949617_create_books_table.down.sql create mode 100644 database/pgx/examples/transact_migrations/1385949617_create_books_table.up.sql create mode 100644 database/pgx/examples/transact_migrations/1485949617_create_movies_table.down.sql create mode 100644 database/pgx/examples/transact_migrations/1485949617_create_movies_table.up.sql create mode 100644 database/pgx/examples/transact_migrations/1585849751_just_a_comment.up.sql create mode 100644 database/pgx/examples/transact_migrations/1685849751_another_comment.up.sql create mode 100644 database/pgx/examples/transact_migrations/1785849751_another_comment.up.sql create mode 100644 database/pgx/examples/transact_migrations/1885849751_another_comment.up.sql diff --git a/database/pgx/README.md b/database/pgx/README.md index dfe150a72..bbb31ac54 100644 --- a/database/pgx/README.md +++ b/database/pgx/README.md @@ -2,25 +2,26 @@ `pgx://user:password@host:port/dbname?query` -| URL Query | WithInstance Config | Description | -|------------|---------------------|-------------| -| `x-migrations-table` | `MigrationsTable` | Name of the migrations table | -| `x-migrations-table-quoted` | `MigrationsTableQuoted` | By default, migrate quotes the migration table for SQL injection safety reasons. This option disable quoting and naively checks that you have quoted the migration table name. e.g. `"my_schema"."schema_migrations"` | -| `x-statement-timeout` | `StatementTimeout` | Abort any statement that takes more than the specified number of milliseconds | -| `x-multi-statement` | `MultiStatementEnabled` | Enable multi-statement execution (default: false) | -| `x-multi-statement-max-size` | `MultiStatementMaxSize` | Maximum size of single statement in bytes (default: 10MB) | -| `dbname` | `DatabaseName` | The name of the database to connect to | -| `search_path` | | This variable specifies the order in which schemas are searched when an object is referenced by a simple name with no schema specified. | -| `user` | | The user to sign in as | -| `password` | | The user's password | -| `host` | | The host to connect to. Values that start with / are for unix domain sockets. (default is localhost) | -| `port` | | The port to bind to. (default is 5432) | -| `fallback_application_name` | | An application_name to fall back to if one isn't provided. | -| `connect_timeout` | | Maximum wait for connection, in seconds. Zero or not specified means wait indefinitely. | -| `sslcert` | | Cert file location. The file must contain PEM encoded data. | -| `sslkey` | | Key file location. The file must contain PEM encoded data. | -| `sslrootcert` | | The location of the root certificate file. The file must contain PEM encoded data. | -| `sslmode` | | Whether or not to use SSL (disable\|require\|verify-ca\|verify-full) | +| URL Query | WithInstance Config | Description | +|------------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `x-migrations-table` | `MigrationsTable` | Name of the migrations table | +| `x-migrations-table-quoted` | `MigrationsTableQuoted` | By default, migrate quotes the migration table for SQL injection safety reasons. This option disable quoting and naively checks that you have quoted the migration table name. e.g. `"my_schema"."schema_migrations"` | +| `x-statement-timeout` | `StatementTimeout` | Abort any statement that takes more than the specified number of milliseconds | +| `x-multi-statement` | `MultiStatementEnabled` | Enable multi-statement execution (default: false) | +| `x-multi-statement-max-size` | `MultiStatementMaxSize` | Maximum size of single statement in bytes (default: 10MB) | +| `x-transaction-lock` | `TransactionLock` | Migrate using transaction with exclusive transaction level advisory lock (default: false). It helps when PgBouncer is used to manage connections on multi clustered db and default unlock fails. But 'create index concurrently' can not be used in migrations. | +| `dbname` | `DatabaseName` | The name of the database to connect to | +| `search_path` | | This variable specifies the order in which schemas are searched when an object is referenced by a simple name with no schema specified. | +| `user` | | The user to sign in as | +| `password` | | The user's password | +| `host` | | The host to connect to. Values that start with / are for unix domain sockets. (default is localhost) | +| `port` | | The port to bind to. (default is 5432) | +| `fallback_application_name` | | An application_name to fall back to if one isn't provided. | +| `connect_timeout` | | Maximum wait for connection, in seconds. Zero or not specified means wait indefinitely. | +| `sslcert` | | Cert file location. The file must contain PEM encoded data. | +| `sslkey` | | Key file location. The file must contain PEM encoded data. | +| `sslrootcert` | | The location of the root certificate file. The file must contain PEM encoded data. | +| `sslmode` | | Whether or not to use SSL (disable\ |require\|verify-ca\|verify-full) | ## Upgrading from v1 diff --git a/database/pgx/examples/transact_migrations/1085649617_create_users_table.down.sql b/database/pgx/examples/transact_migrations/1085649617_create_users_table.down.sql new file mode 100644 index 000000000..c99ddcdc8 --- /dev/null +++ b/database/pgx/examples/transact_migrations/1085649617_create_users_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS users; diff --git a/database/pgx/examples/transact_migrations/1085649617_create_users_table.up.sql b/database/pgx/examples/transact_migrations/1085649617_create_users_table.up.sql new file mode 100644 index 000000000..92897dcab --- /dev/null +++ b/database/pgx/examples/transact_migrations/1085649617_create_users_table.up.sql @@ -0,0 +1,5 @@ +CREATE TABLE users ( + user_id integer unique, + name varchar(40), + email varchar(40) +); diff --git a/database/pgx/examples/transact_migrations/1185749658_add_city_to_users.down.sql b/database/pgx/examples/transact_migrations/1185749658_add_city_to_users.down.sql new file mode 100644 index 000000000..940c60712 --- /dev/null +++ b/database/pgx/examples/transact_migrations/1185749658_add_city_to_users.down.sql @@ -0,0 +1 @@ +ALTER TABLE users DROP COLUMN IF EXISTS city; diff --git a/database/pgx/examples/transact_migrations/1185749658_add_city_to_users.up.sql b/database/pgx/examples/transact_migrations/1185749658_add_city_to_users.up.sql new file mode 100644 index 000000000..67823edc9 --- /dev/null +++ b/database/pgx/examples/transact_migrations/1185749658_add_city_to_users.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE users ADD COLUMN city varchar(100); + + diff --git a/database/pgx/examples/transact_migrations/1285849751_add_index_on_user_emails.down.sql b/database/pgx/examples/transact_migrations/1285849751_add_index_on_user_emails.down.sql new file mode 100644 index 000000000..3e87dd229 --- /dev/null +++ b/database/pgx/examples/transact_migrations/1285849751_add_index_on_user_emails.down.sql @@ -0,0 +1 @@ +DROP INDEX IF EXISTS users_email_index; diff --git a/database/pgx/examples/transact_migrations/1385949617_create_books_table.down.sql b/database/pgx/examples/transact_migrations/1385949617_create_books_table.down.sql new file mode 100644 index 000000000..1a0b1a214 --- /dev/null +++ b/database/pgx/examples/transact_migrations/1385949617_create_books_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS books; diff --git a/database/pgx/examples/transact_migrations/1385949617_create_books_table.up.sql b/database/pgx/examples/transact_migrations/1385949617_create_books_table.up.sql new file mode 100644 index 000000000..f1503b518 --- /dev/null +++ b/database/pgx/examples/transact_migrations/1385949617_create_books_table.up.sql @@ -0,0 +1,5 @@ +CREATE TABLE books ( + user_id integer, + name varchar(40), + author varchar(40) +); diff --git a/database/pgx/examples/transact_migrations/1485949617_create_movies_table.down.sql b/database/pgx/examples/transact_migrations/1485949617_create_movies_table.down.sql new file mode 100644 index 000000000..3a5187689 --- /dev/null +++ b/database/pgx/examples/transact_migrations/1485949617_create_movies_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS movies; diff --git a/database/pgx/examples/transact_migrations/1485949617_create_movies_table.up.sql b/database/pgx/examples/transact_migrations/1485949617_create_movies_table.up.sql new file mode 100644 index 000000000..f0ef5943b --- /dev/null +++ b/database/pgx/examples/transact_migrations/1485949617_create_movies_table.up.sql @@ -0,0 +1,5 @@ +CREATE TABLE movies ( + user_id integer, + name varchar(40), + director varchar(40) +); diff --git a/database/pgx/examples/transact_migrations/1585849751_just_a_comment.up.sql b/database/pgx/examples/transact_migrations/1585849751_just_a_comment.up.sql new file mode 100644 index 000000000..9b6b57a61 --- /dev/null +++ b/database/pgx/examples/transact_migrations/1585849751_just_a_comment.up.sql @@ -0,0 +1 @@ +-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere. diff --git a/database/pgx/examples/transact_migrations/1685849751_another_comment.up.sql b/database/pgx/examples/transact_migrations/1685849751_another_comment.up.sql new file mode 100644 index 000000000..9b6b57a61 --- /dev/null +++ b/database/pgx/examples/transact_migrations/1685849751_another_comment.up.sql @@ -0,0 +1 @@ +-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere. diff --git a/database/pgx/examples/transact_migrations/1785849751_another_comment.up.sql b/database/pgx/examples/transact_migrations/1785849751_another_comment.up.sql new file mode 100644 index 000000000..9b6b57a61 --- /dev/null +++ b/database/pgx/examples/transact_migrations/1785849751_another_comment.up.sql @@ -0,0 +1 @@ +-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere. diff --git a/database/pgx/examples/transact_migrations/1885849751_another_comment.up.sql b/database/pgx/examples/transact_migrations/1885849751_another_comment.up.sql new file mode 100644 index 000000000..9b6b57a61 --- /dev/null +++ b/database/pgx/examples/transact_migrations/1885849751_another_comment.up.sql @@ -0,0 +1 @@ +-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere. diff --git a/database/pgx/pgx.go b/database/pgx/pgx.go index da626756d..986d29b61 100644 --- a/database/pgx/pgx.go +++ b/database/pgx/pgx.go @@ -267,6 +267,7 @@ func (p *Postgres) Unlock() error { return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error { tx, ok := p.exec.(*sql.Tx) if ok { // if in transaction lock, simply commit + p.exec = p.conn return tx.Commit() } aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) diff --git a/database/pgx/pgx_test.go b/database/pgx/pgx_test.go index 9d55c4106..9a494eb9d 100644 --- a/database/pgx/pgx_test.go +++ b/database/pgx/pgx_test.go @@ -8,9 +8,8 @@ import ( sqldriver "database/sql/driver" "errors" "fmt" - "log" - "io" + "log" "strconv" "strings" "sync" @@ -86,6 +85,30 @@ func mustRun(t *testing.T, d database.Driver, statements []string) { } func Test(t *testing.T) { + for _, opt := range []string{"", "x-transaction-lock=true"} { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.FirstPort() + if err != nil { + t.Fatal(err) + } + + addr := pgConnectionString(ip, port, opt) + p := &Postgres{} + d, err := p.Open(addr) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + dt.Test(t, d, []byte("SELECT 1")) + }) + } +} + +func TestMigrate(t *testing.T) { dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { ip, port, err := c.FirstPort() if err != nil { @@ -103,18 +126,22 @@ func Test(t *testing.T) { t.Error(err) } }() - dt.Test(t, d, []byte("SELECT 1")) + m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "pgx", d) + if err != nil { + t.Fatal(err) + } + dt.TestMigrate(t, m) }) } -func TestMigrate(t *testing.T) { +func TestMigrateTrnsactionLock(t *testing.T) { dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { ip, port, err := c.FirstPort() if err != nil { t.Fatal(err) } - addr := pgConnectionString(ip, port) + addr := pgConnectionString(ip, port, "x-transaction-lock=true") p := &Postgres{} d, err := p.Open(addr) if err != nil { @@ -125,7 +152,7 @@ func TestMigrate(t *testing.T) { t.Error(err) } }() - m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "pgx", d) + m, err := migrate.NewWithDatabaseInstance("file://./examples/transact_migrations", "pgx", d) if err != nil { t.Fatal(err) } @@ -534,109 +561,113 @@ func TestCheckBeforeCreateTable(t *testing.T) { } func TestParallelSchema(t *testing.T) { - dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { - ip, port, err := c.FirstPort() - if err != nil { - t.Fatal(err) - } + for _, opt := range []string{"", "&x-transaction-lock=true"} { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.FirstPort() + if err != nil { + t.Fatal(err) + } - addr := pgConnectionString(ip, port) - p := &Postgres{} - d, err := p.Open(addr) - if err != nil { - t.Fatal(err) - } - defer func() { - if err := d.Close(); err != nil { - t.Error(err) + addr := pgConnectionString(ip, port, opt) + p := &Postgres{} + d, err := p.Open(addr) + if err != nil { + t.Fatal(err) } - }() + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() - // create foo and bar schemas - if err := d.Run(strings.NewReader("CREATE SCHEMA foo AUTHORIZATION postgres")); err != nil { - t.Fatal(err) - } - if err := d.Run(strings.NewReader("CREATE SCHEMA bar AUTHORIZATION postgres")); err != nil { - t.Fatal(err) - } + // create foo and bar schemas + if err := d.Run(strings.NewReader("CREATE SCHEMA foo AUTHORIZATION postgres")); err != nil { + t.Fatal(err) + } + if err := d.Run(strings.NewReader("CREATE SCHEMA bar AUTHORIZATION postgres")); err != nil { + t.Fatal(err) + } - // re-connect using that schemas - dfoo, err := p.Open(pgConnectionString(ip, port, "search_path=foo")) - if err != nil { - t.Fatal(err) - } - defer func() { - if err := dfoo.Close(); err != nil { - t.Error(err) + // re-connect using that schemas + dfoo, err := p.Open(pgConnectionString(ip, port, "search_path=foo")) + if err != nil { + t.Fatal(err) } - }() + defer func() { + if err := dfoo.Close(); err != nil { + t.Error(err) + } + }() - dbar, err := p.Open(pgConnectionString(ip, port, "search_path=bar")) - if err != nil { - t.Fatal(err) - } - defer func() { - if err := dbar.Close(); err != nil { - t.Error(err) + dbar, err := p.Open(pgConnectionString(ip, port, "search_path=bar")) + if err != nil { + t.Fatal(err) } - }() + defer func() { + if err := dbar.Close(); err != nil { + t.Error(err) + } + }() - if err := dfoo.Lock(); err != nil { - t.Fatal(err) - } + if err := dfoo.Lock(); err != nil { + t.Fatal(err) + } - if err := dbar.Lock(); err != nil { - t.Fatal(err) - } + if err := dbar.Lock(); err != nil { + t.Fatal(err) + } - if err := dbar.Unlock(); err != nil { - t.Fatal(err) - } + if err := dbar.Unlock(); err != nil { + t.Fatal(err) + } - if err := dfoo.Unlock(); err != nil { - t.Fatal(err) - } - }) + if err := dfoo.Unlock(); err != nil { + t.Fatal(err) + } + }) + } } func TestPostgres_Lock(t *testing.T) { - dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { - ip, port, err := c.FirstPort() - if err != nil { - t.Fatal(err) - } + for _, opt := range []string{"", "&x-transaction-lock=true"} { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.FirstPort() + if err != nil { + t.Fatal(err) + } - addr := pgConnectionString(ip, port) - p := &Postgres{} - d, err := p.Open(addr) - if err != nil { - t.Fatal(err) - } + addr := pgConnectionString(ip, port, opt) + p := &Postgres{} + d, err := p.Open(addr) + if err != nil { + t.Fatal(err) + } - dt.Test(t, d, []byte("SELECT 1")) + dt.Test(t, d, []byte("SELECT 1")) - ps := d.(*Postgres) + ps := d.(*Postgres) - err = ps.Lock() - if err != nil { - t.Fatal(err) - } + err = ps.Lock() + if err != nil { + t.Fatal(err) + } - err = ps.Unlock() - if err != nil { - t.Fatal(err) - } + err = ps.Unlock() + if err != nil { + t.Fatal(err) + } - err = ps.Lock() - if err != nil { - t.Fatal(err) - } + err = ps.Lock() + if err != nil { + t.Fatal(err) + } - err = ps.Unlock() - if err != nil { - t.Fatal(err) - } - }) + err = ps.Unlock() + if err != nil { + t.Fatal(err) + } + }) + } } func TestWithInstance_Concurrent(t *testing.T) {