Skip to content

Commit

Permalink
sql/migrate: emit LogError when failed to write revisions state
Browse files Browse the repository at this point in the history
  • Loading branch information
a8m committed Dec 30, 2024
1 parent 5f66e5d commit 6f099b2
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 4 deletions.
28 changes: 24 additions & 4 deletions sql/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,12 +834,16 @@ func (e *Executor) Execute(ctx context.Context, m File) (err error) {
}
// Save once to mark as started in the database.
if err = e.writeRevision(ctx, r); err != nil {
e.log.Log(LogError{Error: err})
return err
}
// Make sure to store the Revision information.
// Make sure to store the Revision information,
// if the executor was not failed to store it.
defer func(ctx context.Context, e *Executor, r *Revision) {
if err2 := e.writeRevision(ctx, r); err2 != nil {
err = errors.Join(err, err2)
if !errors.As(err, new(*WriteRevisionError)) {
if err2 := e.writeRevision(ctx, r); err2 != nil {
err = errors.Join(err, err2)
}
}
}(ctx, e, r)
if r.Applied > 0 {
Expand Down Expand Up @@ -872,6 +876,7 @@ func (e *Executor) Execute(ctx context.Context, m File) (err error) {
r.PartialHashes = append(r.PartialHashes, "h1:"+sums[r.Applied])
r.Applied++
if err = e.writeRevision(ctx, r); err != nil {
e.log.Log(LogError{Error: err})
return err
}
}
Expand All @@ -883,11 +888,26 @@ func (e *Executor) writeRevision(ctx context.Context, r *Revision) error {
r.ExecutedAt = time.Now()
r.OperatorVersion = e.operator
if err := e.rrw.WriteRevision(ctx, r); err != nil {
return fmt.Errorf("sql/migrate: write revision: %w", err)
return &WriteRevisionError{Err: err, Revision: r}
}
return nil
}

// WriteRevisionError is reported when writing a
// revision to the RevisionReadWriter fails.
type WriteRevisionError struct {
Err error
Revision *Revision
}

func (e WriteRevisionError) Error() string {
return "sql/migrate: write revision: " + e.Err.Error()
}

func (e WriteRevisionError) Unwrap() error {
return e.Err
}

// HistoryChangedError is returned if between two execution attempts already applied statements of a file have changed.
type HistoryChangedError struct {
File string
Expand Down
61 changes: 61 additions & 0 deletions sql/migrate/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,51 @@ func TestExecutor(t *testing.T) {
require.EqualError(t, ex.ExecuteTo(context.Background(), ""), "sql/migrate: migration with version \"\" not found")
require.NoError(t, ex.ExecuteTo(context.Background(), "2.10.x-20"))
requireEqualRevisions(t, []*migrate.Revision{rev1, rev2}, *rrw)

// Failed storing initial revision state in the database.
log = &mockLogger{}
*rrw = []*migrate.Revision{}
ex, err = migrate.NewExecutor(
&mockDriver{}, dir,
&mockWriteRevisionError{
mockRevisionReadWriter: *rrw,
errinit: errors.New("init error"),
},
migrate.WithLogger(log),
)
require.NoError(t, err)
err = ex.ExecuteTo(context.Background(), "2.10.x-20")
require.EqualError(t, err, `sql/migrate: write revision: init error`)
require.Len(t, *log, 2, "fail on init")
require.IsType(t, migrate.LogExecution{}, (*log)[0])
require.IsType(t, migrate.LogError{}, (*log)[1])
e1 := (*log)[1].(migrate.LogError)
require.EqualError(t, e1.Error, `sql/migrate: write revision: init error`)

// Failed storing applied revision state in the database.
log = &mockLogger{}
*rrw = []*migrate.Revision{}
ex, err = migrate.NewExecutor(
&mockDriver{}, dir,
&mockWriteRevisionError{
mockRevisionReadWriter: *rrw,
errdone: errors.New("done error"),
},
migrate.WithLogger(log),
)
require.NoError(t, err)
err = ex.ExecuteTo(context.Background(), "2.10.x-20")
require.EqualError(t, err, `sql/migrate: write revision: done error`)
// Logs are: Intro/Execution, File, 2 Stmts (1.a_sub.up.sql),
// and Error when writing the revision of the first file.
require.Len(t, *log, 5, "expect 5 logs to be fired")
require.IsType(t, migrate.LogExecution{}, (*log)[0])
require.IsType(t, migrate.LogFile{}, (*log)[1])
require.IsType(t, migrate.LogStmt{}, (*log)[2])
require.IsType(t, migrate.LogStmt{}, (*log)[3])
e1 = (*log)[4].(migrate.LogError)
require.EqualError(t, e1.Error, `sql/migrate: write revision: done error`)
require.EqualError(t, errors.Unwrap(e1.Error), `done error`)
}

func TestExecutor_Baseline(t *testing.T) {
Expand Down Expand Up @@ -890,6 +935,22 @@ func (rrw *mockRevisionReadWriter) clean() {
*rrw = []*migrate.Revision{}
}

type mockWriteRevisionError struct {
mockRevisionReadWriter
errinit, errdone error // error on init and done
}

func (m *mockWriteRevisionError) WriteRevision(ctx context.Context, r *migrate.Revision) error {
switch {
case r.Applied == 0 && m.errinit != nil:
return m.errinit
case r.Applied == r.Total && m.errdone != nil:
return m.errdone
default:
return m.mockRevisionReadWriter.WriteRevision(ctx, r)
}
}

type mockLogger []migrate.LogEntry

func (m *mockLogger) Log(e migrate.LogEntry) { *m = append(*m, e) }
Expand Down

0 comments on commit 6f099b2

Please sign in to comment.