From 6f099b22ed82b1c45800f026592e15a576d0a894 Mon Sep 17 00:00:00 2001 From: Ariel Mashraki Date: Mon, 30 Dec 2024 15:05:21 +0200 Subject: [PATCH] sql/migrate: emit LogError when failed to write revisions state --- sql/migrate/migrate.go | 28 ++++++++++++++--- sql/migrate/migrate_test.go | 61 +++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/sql/migrate/migrate.go b/sql/migrate/migrate.go index 47255ec84aa..9595aa0eabb 100644 --- a/sql/migrate/migrate.go +++ b/sql/migrate/migrate.go @@ -834,12 +834,16 @@ func (e *Executor) Execute(ctx context.Context, m File) (err error) { } // Save once to mark as started in the database. if err = e.writeRevision(ctx, r); err != nil { + e.log.Log(LogError{Error: err}) return err } - // Make sure to store the Revision information. + // Make sure to store the Revision information, + // if the executor was not failed to store it. defer func(ctx context.Context, e *Executor, r *Revision) { - if err2 := e.writeRevision(ctx, r); err2 != nil { - err = errors.Join(err, err2) + if !errors.As(err, new(*WriteRevisionError)) { + if err2 := e.writeRevision(ctx, r); err2 != nil { + err = errors.Join(err, err2) + } } }(ctx, e, r) if r.Applied > 0 { @@ -872,6 +876,7 @@ func (e *Executor) Execute(ctx context.Context, m File) (err error) { r.PartialHashes = append(r.PartialHashes, "h1:"+sums[r.Applied]) r.Applied++ if err = e.writeRevision(ctx, r); err != nil { + e.log.Log(LogError{Error: err}) return err } } @@ -883,11 +888,26 @@ func (e *Executor) writeRevision(ctx context.Context, r *Revision) error { r.ExecutedAt = time.Now() r.OperatorVersion = e.operator if err := e.rrw.WriteRevision(ctx, r); err != nil { - return fmt.Errorf("sql/migrate: write revision: %w", err) + return &WriteRevisionError{Err: err, Revision: r} } return nil } +// WriteRevisionError is reported when writing a +// revision to the RevisionReadWriter fails. +type WriteRevisionError struct { + Err error + Revision *Revision +} + +func (e WriteRevisionError) Error() string { + return "sql/migrate: write revision: " + e.Err.Error() +} + +func (e WriteRevisionError) Unwrap() error { + return e.Err +} + // HistoryChangedError is returned if between two execution attempts already applied statements of a file have changed. type HistoryChangedError struct { File string diff --git a/sql/migrate/migrate_test.go b/sql/migrate/migrate_test.go index ee2f503bf0a..d4890152617 100644 --- a/sql/migrate/migrate_test.go +++ b/sql/migrate/migrate_test.go @@ -708,6 +708,51 @@ func TestExecutor(t *testing.T) { require.EqualError(t, ex.ExecuteTo(context.Background(), ""), "sql/migrate: migration with version \"\" not found") require.NoError(t, ex.ExecuteTo(context.Background(), "2.10.x-20")) requireEqualRevisions(t, []*migrate.Revision{rev1, rev2}, *rrw) + + // Failed storing initial revision state in the database. + log = &mockLogger{} + *rrw = []*migrate.Revision{} + ex, err = migrate.NewExecutor( + &mockDriver{}, dir, + &mockWriteRevisionError{ + mockRevisionReadWriter: *rrw, + errinit: errors.New("init error"), + }, + migrate.WithLogger(log), + ) + require.NoError(t, err) + err = ex.ExecuteTo(context.Background(), "2.10.x-20") + require.EqualError(t, err, `sql/migrate: write revision: init error`) + require.Len(t, *log, 2, "fail on init") + require.IsType(t, migrate.LogExecution{}, (*log)[0]) + require.IsType(t, migrate.LogError{}, (*log)[1]) + e1 := (*log)[1].(migrate.LogError) + require.EqualError(t, e1.Error, `sql/migrate: write revision: init error`) + + // Failed storing applied revision state in the database. + log = &mockLogger{} + *rrw = []*migrate.Revision{} + ex, err = migrate.NewExecutor( + &mockDriver{}, dir, + &mockWriteRevisionError{ + mockRevisionReadWriter: *rrw, + errdone: errors.New("done error"), + }, + migrate.WithLogger(log), + ) + require.NoError(t, err) + err = ex.ExecuteTo(context.Background(), "2.10.x-20") + require.EqualError(t, err, `sql/migrate: write revision: done error`) + // Logs are: Intro/Execution, File, 2 Stmts (1.a_sub.up.sql), + // and Error when writing the revision of the first file. + require.Len(t, *log, 5, "expect 5 logs to be fired") + require.IsType(t, migrate.LogExecution{}, (*log)[0]) + require.IsType(t, migrate.LogFile{}, (*log)[1]) + require.IsType(t, migrate.LogStmt{}, (*log)[2]) + require.IsType(t, migrate.LogStmt{}, (*log)[3]) + e1 = (*log)[4].(migrate.LogError) + require.EqualError(t, e1.Error, `sql/migrate: write revision: done error`) + require.EqualError(t, errors.Unwrap(e1.Error), `done error`) } func TestExecutor_Baseline(t *testing.T) { @@ -890,6 +935,22 @@ func (rrw *mockRevisionReadWriter) clean() { *rrw = []*migrate.Revision{} } +type mockWriteRevisionError struct { + mockRevisionReadWriter + errinit, errdone error // error on init and done +} + +func (m *mockWriteRevisionError) WriteRevision(ctx context.Context, r *migrate.Revision) error { + switch { + case r.Applied == 0 && m.errinit != nil: + return m.errinit + case r.Applied == r.Total && m.errdone != nil: + return m.errdone + default: + return m.mockRevisionReadWriter.WriteRevision(ctx, r) + } +} + type mockLogger []migrate.LogEntry func (m *mockLogger) Log(e migrate.LogEntry) { *m = append(*m, e) }