Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction lock #908

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions database/pgx/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,26 @@ This package is for [pgx/v4](https://pkg.go.dev/github.com/jackc/pgx/v4). A back

`pgx://user:password@host:port/dbname?query`

| URL Query | WithInstance Config | Description |
|------------|---------------------|-------------|
| `x-migrations-table` | `MigrationsTable` | Name of the migrations table |
| `x-migrations-table-quoted` | `MigrationsTableQuoted` | By default, migrate quotes the migration table for SQL injection safety reasons. This option disable quoting and naively checks that you have quoted the migration table name. e.g. `"my_schema"."schema_migrations"` |
| `x-statement-timeout` | `StatementTimeout` | Abort any statement that takes more than the specified number of milliseconds |
| `x-multi-statement` | `MultiStatementEnabled` | Enable multi-statement execution (default: false) |
| `x-multi-statement-max-size` | `MultiStatementMaxSize` | Maximum size of single statement in bytes (default: 10MB) |
| `dbname` | `DatabaseName` | The name of the database to connect to |
| `search_path` | | This variable specifies the order in which schemas are searched when an object is referenced by a simple name with no schema specified. |
| `user` | | The user to sign in as |
| `password` | | The user's password |
| `host` | | The host to connect to. Values that start with / are for unix domain sockets. (default is localhost) |
| `port` | | The port to bind to. (default is 5432) |
| `fallback_application_name` | | An application_name to fall back to if one isn't provided. |
| `connect_timeout` | | Maximum wait for connection, in seconds. Zero or not specified means wait indefinitely. |
| `sslcert` | | Cert file location. The file must contain PEM encoded data. |
| `sslkey` | | Key file location. The file must contain PEM encoded data. |
| `sslrootcert` | | The location of the root certificate file. The file must contain PEM encoded data. |
| `sslmode` | | Whether or not to use SSL (disable\|require\|verify-ca\|verify-full) |
| URL Query | WithInstance Config | Description |
|------------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `x-migrations-table` | `MigrationsTable` | Name of the migrations table |
| `x-migrations-table-quoted` | `MigrationsTableQuoted` | By default, migrate quotes the migration table for SQL injection safety reasons. This option disable quoting and naively checks that you have quoted the migration table name. e.g. `"my_schema"."schema_migrations"` |
| `x-statement-timeout` | `StatementTimeout` | Abort any statement that takes more than the specified number of milliseconds |
| `x-multi-statement` | `MultiStatementEnabled` | Enable multi-statement execution (default: false) |
| `x-multi-statement-max-size` | `MultiStatementMaxSize` | Maximum size of single statement in bytes (default: 10MB) |
| `x-transaction-lock` | `TransactionLock` | Migrate using transaction with exclusive transaction level advisory lock (default: false). It helps when PgBouncer is used to manage connections on multi clustered db and default unlock fails. But 'create index concurrently' can not be used in migrations. |
| `dbname` | `DatabaseName` | The name of the database to connect to |
| `search_path` | | This variable specifies the order in which schemas are searched when an object is referenced by a simple name with no schema specified. |
| `user` | | The user to sign in as |
| `password` | | The user's password |
| `host` | | The host to connect to. Values that start with / are for unix domain sockets. (default is localhost) |
| `port` | | The port to bind to. (default is 5432) |
| `fallback_application_name` | | An application_name to fall back to if one isn't provided. |
| `connect_timeout` | | Maximum wait for connection, in seconds. Zero or not specified means wait indefinitely. |
| `sslcert` | | Cert file location. The file must contain PEM encoded data. |
| `sslkey` | | Key file location. The file must contain PEM encoded data. |
| `sslrootcert` | | The location of the root certificate file. The file must contain PEM encoded data. |
| `sslmode` | | Whether or not to use SSL (disable\ |require\|verify-ca\|verify-full) |


## Upgrading from v1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE users (
user_id integer unique,
name varchar(40),
email varchar(40)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE users DROP COLUMN IF EXISTS city;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE users ADD COLUMN city varchar(100);


Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX IF EXISTS users_email_index;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS books;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE books (
user_id integer,
name varchar(40),
author varchar(40)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS movies;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE movies (
user_id integer,
name varchar(40),
director varchar(40)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.
70 changes: 53 additions & 17 deletions database/pgx/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ import (

"go.uber.org/atomic"

"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/multistmt"
"github.com/hashicorp/go-multierror"
"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
_ "github.com/jackc/pgx/v4/stdlib"

"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/multistmt"
)

func init() {
Expand Down Expand Up @@ -55,12 +56,20 @@ type Config struct {
MigrationsTableQuoted bool
MultiStatementEnabled bool
MultiStatementMaxSize int
TransactionLock bool
}

type Execer interface {
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
}

type Postgres struct {
// Locking and unlocking need to use the same connection
conn *sql.Conn
db *sql.DB
exec Execer
isLocked atomic.Bool

// Open and WithInstance need to guarantee that config is never nil
Expand Down Expand Up @@ -129,6 +138,7 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {

px := &Postgres{
conn: conn,
exec: conn,
db: instance,
config: config,
}
Expand Down Expand Up @@ -195,6 +205,13 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
return nil, fmt.Errorf("Unable to parse option x-multi-statement: %w", err)
}
}
transactionLockParam := false
if s := purl.Query().Get("x-transaction-lock"); len(s) > 0 {
transactionLockParam, err = strconv.ParseBool(s)
if err != nil {
return nil, fmt.Errorf("Unable to parse option x-transaction-lock: %w", err)
}
}

px, err := WithInstance(db, &Config{
DatabaseName: purl.Path,
Expand All @@ -203,6 +220,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
StatementTimeout: time.Duration(statementTimeout) * time.Millisecond,
MultiStatementEnabled: multiStatementEnabled,
MultiStatementMaxSize: multiStatementMaxSize,
TransactionLock: transactionLockParam,
})

if err != nil {
Expand All @@ -228,10 +246,18 @@ func (p *Postgres) Lock() error {
if err != nil {
return err
}

// This will wait indefinitely until the lock can be acquired.
query := `SELECT pg_advisory_lock($1)`
if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {

if p.config.TransactionLock {
p.exec, err = p.conn.BeginTx(context.Background(), nil)
if err != nil {
return err
}
// This will be unlocked on transaction end
query = `SELECT pg_advisory_xact_lock($1)`
}
if _, err := p.exec.ExecContext(context.Background(), query, aid); err != nil {
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
}
return nil
Expand All @@ -240,6 +266,11 @@ func (p *Postgres) Lock() error {

func (p *Postgres) Unlock() error {
return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error {
tx, ok := p.exec.(*sql.Tx)
if ok { // if in transaction lock, simply commit
p.exec = p.conn
return tx.Commit()
}
aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
if err != nil {
return err
Expand Down Expand Up @@ -284,7 +315,7 @@ func (p *Postgres) runStatement(statement []byte) error {
if strings.TrimSpace(query) == "" {
return nil
}
if _, err := p.conn.ExecContext(ctx, query); err != nil {
if _, err := p.exec.ExecContext(ctx, query); err != nil {

if pgErr, ok := err.(*pgconn.PgError); ok {
var line uint
Expand Down Expand Up @@ -341,9 +372,13 @@ func runesLastIndex(input []rune, target rune) int {
}

func (p *Postgres) SetVersion(version int, dirty bool) error {
tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
return &database.Error{OrigErr: err, Err: "transaction start failed"}
var err error
tx, isTxglobal := p.exec.(*sql.Tx)
if !isTxglobal {
tx, err = p.conn.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
return &database.Error{OrigErr: err, Err: "transaction start failed"}
}
}

query := `TRUNCATE ` + quoteIdentifier(p.config.migrationsSchemaName) + `.` + quoteIdentifier(p.config.migrationsTableName)
Expand All @@ -366,17 +401,18 @@ func (p *Postgres) SetVersion(version int, dirty bool) error {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
}

if err := tx.Commit(); err != nil {
return &database.Error{OrigErr: err, Err: "transaction commit failed"}
if !isTxglobal {
if err := tx.Commit(); err != nil {
return &database.Error{OrigErr: err, Err: "transaction commit failed"}
}
}

return nil
}

func (p *Postgres) Version() (version int, dirty bool, err error) {
query := `SELECT version, dirty FROM ` + quoteIdentifier(p.config.migrationsSchemaName) + `.` + quoteIdentifier(p.config.migrationsTableName) + ` LIMIT 1`
err = p.conn.QueryRowContext(context.Background(), query).Scan(&version, &dirty)
err = p.exec.QueryRowContext(context.Background(), query).Scan(&version, &dirty)
switch {
case err == sql.ErrNoRows:
return database.NilVersion, false, nil
Expand All @@ -397,7 +433,7 @@ func (p *Postgres) Version() (version int, dirty bool, err error) {
func (p *Postgres) Drop() (err error) {
// select all tables in current schema
query := `SELECT table_name FROM information_schema.tables WHERE table_schema=(SELECT current_schema()) AND table_type='BASE TABLE'`
tables, err := p.conn.QueryContext(context.Background(), query)
tables, err := p.exec.QueryContext(context.Background(), query)
if err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
Expand Down Expand Up @@ -426,7 +462,7 @@ func (p *Postgres) Drop() (err error) {
// delete one by one ...
for _, t := range tableNames {
query = `DROP TABLE IF EXISTS ` + quoteIdentifier(t) + ` CASCADE`
if _, err := p.conn.ExecContext(context.Background(), query); err != nil {
if _, err := p.exec.ExecContext(context.Background(), query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
}
Expand Down Expand Up @@ -458,7 +494,7 @@ func (p *Postgres) ensureVersionTable() (err error) {
// `CREATE TABLE IF NOT EXISTS...` query would fail because the user does not have the CREATE permission.
// Taken from https://github.com/mattes/migrate/blob/master/database/postgres/postgres.go#L258
query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2 LIMIT 1`
row := p.conn.QueryRowContext(context.Background(), query, p.config.migrationsSchemaName, p.config.migrationsTableName)
row := p.exec.QueryRowContext(context.Background(), query, p.config.migrationsSchemaName, p.config.migrationsTableName)

var count int
err = row.Scan(&count)
Expand All @@ -471,7 +507,7 @@ func (p *Postgres) ensureVersionTable() (err error) {
}

query = `CREATE TABLE IF NOT EXISTS ` + quoteIdentifier(p.config.migrationsSchemaName) + `.` + quoteIdentifier(p.config.migrationsTableName) + ` (version bigint not null primary key, dirty boolean not null)`
if _, err = p.conn.ExecContext(context.Background(), query); err != nil {
if _, err = p.exec.ExecContext(context.Background(), query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}

Expand Down
Loading
Loading