Skip to content

Commit

Permalink
Add more logs for clickhouse migrate
Browse files Browse the repository at this point in the history
  • Loading branch information
jhwangalauda committed Sep 30, 2024
1 parent c378583 commit f8b1003
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
15 changes: 15 additions & 0 deletions database/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,42 +134,53 @@ func (ch *ClickHouse) init() error {
}

func (ch *ClickHouse) Run(r io.Reader) error {
fmt.Println(fmt.Sprintf("Try to run clickhouse. MultiStatementEnabled:%+v", ch.config.MultiStatementEnabled))

Check failure on line 137 in database/clickhouse/clickhouse.go

View workflow job for this annotation

GitHub Actions / lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)
if ch.config.MultiStatementEnabled {
var err error
if e := multistmt.Parse(r, multiStmtDelimiter, ch.config.MultiStatementMaxSize, func(m []byte) bool {
fmt.Println(fmt.Sprintf("Try to exec query clickhouse. query:%+v", string(m)))

Check failure on line 141 in database/clickhouse/clickhouse.go

View workflow job for this annotation

GitHub Actions / lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)
tq := strings.TrimSpace(string(m))
if tq == "" {
return true
}
if _, e := ch.conn.Exec(string(m)); e != nil {
fmt.Println(fmt.Errorf("exec query clickhouse error:%+v", err))
err = database.Error{OrigErr: e, Err: "migration failed", Query: m}
return false
}
return true
}); e != nil {
fmt.Println(fmt.Errorf("multistmt parse error:%+v", err))
return e
}
return err
}

fmt.Println(fmt.Sprintf("Try to get clickhouse migration."))

Check failure on line 159 in database/clickhouse/clickhouse.go

View workflow job for this annotation

GitHub Actions / lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)
migration, err := io.ReadAll(r)
if err != nil {
fmt.Println(fmt.Errorf("get clickhouse migration error:%+v", err))
return err
}

fmt.Println(fmt.Sprintf("Try to exec migration clickhouse. migration:%+v", string(migration)))

Check failure on line 166 in database/clickhouse/clickhouse.go

View workflow job for this annotation

GitHub Actions / lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)
if _, err := ch.conn.Exec(string(migration)); err != nil {
fmt.Println(fmt.Errorf("exec migration clickhouse error:%+v", err))
return database.Error{OrigErr: err, Err: "migration failed", Query: migration}
}

return nil
}
func (ch *ClickHouse) Version() (int, bool, error) {

var (
version int
dirty uint8
query = "SELECT version, dirty FROM `" + ch.config.MigrationsTable + "` ORDER BY sequence DESC LIMIT 1"
)
fmt.Println(fmt.Sprintf("Try to get version clickhouse. query:%+v", query))

Check failure on line 181 in database/clickhouse/clickhouse.go

View workflow job for this annotation

GitHub Actions / lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)
if err := ch.conn.QueryRow(query).Scan(&version, &dirty); err != nil {
fmt.Println(fmt.Errorf("get version clickhouse err:%+v", err))
if err == sql.ErrNoRows {
return database.NilVersion, false, nil
}
Expand All @@ -179,6 +190,7 @@ func (ch *ClickHouse) Version() (int, bool, error) {
}

func (ch *ClickHouse) SetVersion(version int, dirty bool) error {
fmt.Println(fmt.Sprintf("Try to set version clickhouse. version:%+v dirty:%+v", version, dirty))

Check failure on line 193 in database/clickhouse/clickhouse.go

View workflow job for this annotation

GitHub Actions / lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)
var (
bool = func(v bool) uint8 {
if v {
Expand All @@ -189,11 +201,14 @@ func (ch *ClickHouse) SetVersion(version int, dirty bool) error {
tx, err = ch.conn.Begin()
)
if err != nil {
fmt.Println(fmt.Errorf("set version clickhouse begin error:%+v", err))
return err
}

query := "INSERT INTO " + ch.config.MigrationsTable + " (version, dirty, sequence) VALUES (?, ?, ?)"
fmt.Println(fmt.Sprintf("Try to set version clickhouse exec. query:%+v", query))

Check failure on line 209 in database/clickhouse/clickhouse.go

View workflow job for this annotation

GitHub Actions / lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)
if _, err := tx.Exec(query, version, bool(dirty), time.Now().UnixNano()); err != nil {
fmt.Println(fmt.Errorf("set version clickhouse exec error:%+v", err))
return &database.Error{OrigErr: err, Query: []byte(query)}
}

Expand Down
26 changes: 25 additions & 1 deletion migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func (m *Migrate) Up() error {
if err != nil {
return m.unlockErr(err)
}
fmt.Println(fmt.Sprintf("Try to do up. curVersion:%+v dirty:%+v", curVersion, dirty))

Check failure on line 276 in migrate.go

View workflow job for this annotation

GitHub Actions / lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)

if dirty {
return m.unlockErr(ErrDirty{curVersion})
Expand Down Expand Up @@ -534,9 +535,11 @@ func (m *Migrate) read(from int, to int, ret chan<- interface{}) {
func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) {
defer close(ret)

fmt.Println(fmt.Sprintf("Try to read up. from:%+v limit:%+v", from, limit))

Check failure on line 538 in migrate.go

View workflow job for this annotation

GitHub Actions / lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)
// check if from version exists
if from >= 0 {
if err := m.versionExists(suint(from)); err != nil {
fmt.Println(fmt.Errorf("version exists error:%+v", err))
ret <- err
return
}
Expand All @@ -549,27 +552,34 @@ func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) {

count := 0
for count < limit || limit == -1 {
fmt.Println(fmt.Sprintf("Try to loop read up. count:%+v from:%+v", count, from))

Check failure on line 555 in migrate.go

View workflow job for this annotation

GitHub Actions / lint

S1038: should use fmt.Printf instead of fmt.Println(fmt.Sprintf(...)) (but don't forget the newline) (gosimple)
if m.stop() {
fmt.Println(fmt.Sprintf("Exit loop read up."))
return
}

// apply first migration if from is nil version
if from == -1 {
fmt.Println(fmt.Sprintf("Try to get first version."))
firstVersion, err := m.sourceDrv.First()
if err != nil {
fmt.Println(fmt.Errorf("get first version error:%+v", err))
ret <- err
return
}

fmt.Println(fmt.Sprintf("Try to new first migration. firstVersion: %+v", firstVersion))
migr, err := m.newMigration(firstVersion, int(firstVersion))
if err != nil {
fmt.Println(fmt.Errorf("new first migration error:%+v", err))
ret <- err
return
}

ret <- migr
go func() {
if err := migr.Buffer(); err != nil {
fmt.Println(fmt.Errorf("buffer migr error:%+v", err))
m.logErr(err)
}
}()
Expand All @@ -578,6 +588,7 @@ func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) {
continue
}

fmt.Println(fmt.Sprintf("Try to get next version."))
// apply next migration
next, err := m.sourceDrv.Next(suint(from))
if errors.Is(err, os.ErrNotExist) {
Expand Down Expand Up @@ -605,19 +616,23 @@ func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) {
}
}
if err != nil {
fmt.Println(fmt.Errorf("get next version error:%+v", err))
ret <- err
return
}

fmt.Println(fmt.Sprintf("Try to new next migration. nextVersion: %+v", next))
migr, err := m.newMigration(next, int(next))
if err != nil {
fmt.Println(fmt.Errorf("new next migration error:%+v", err))
ret <- err
return
}

ret <- migr
go func() {
if err := migr.Buffer(); err != nil {
fmt.Println(fmt.Errorf("buffer migr error:%+v", err))
m.logErr(err)
}
}()
Expand Down Expand Up @@ -723,39 +738,48 @@ func (m *Migrate) readDown(from int, limit int, ret chan<- interface{}) {
// to stop execution because it might have received a stop signal on the
// GracefulStop channel.
func (m *Migrate) runMigrations(ret <-chan interface{}) error {
fmt.Println(fmt.Sprintf("Try to run migrations."))
for r := range ret {

if m.stop() {
fmt.Println(fmt.Sprintf("Exit loop run migration."))
return nil
}

switch r := r.(type) {
case error:
fmt.Println(fmt.Errorf("run migration error:%+v", r))
return r

case *Migration:
migr := r

fmt.Println(fmt.Sprintf("Try to set dirty version. TargetVersion:%+v", migr.TargetVersion))
// set version with dirty state
if err := m.databaseDrv.SetVersion(migr.TargetVersion, true); err != nil {
fmt.Println(fmt.Errorf("set dirty version error:%+v", err))
return err
}

if migr.Body != nil {
fmt.Println(fmt.Sprintf("Try to loop run migrations. LogString:%+v", migr.LogString()))
m.logVerbosePrintf("Read and execute %v\n", migr.LogString())
if err := m.databaseDrv.Run(migr.BufferedBody); err != nil {
fmt.Println(fmt.Errorf("run error:%+v", err))
return err
}
}

fmt.Println(fmt.Sprintf("Try to set clenn version. TargetVersion:%+v", migr.TargetVersion))
// set clean state
if err := m.databaseDrv.SetVersion(migr.TargetVersion, false); err != nil {
fmt.Println(fmt.Errorf("set clean version error:%+v", err))
return err
}

endTime := time.Now()
readTime := migr.FinishedReading.Sub(migr.StartedBuffering)
runTime := endTime.Sub(migr.FinishedReading)
fmt.Println(fmt.Sprintf("Finished %+v (read %+v, ran %+v)", migr.LogString(), readTime, runTime))

// log either verbose or normal
if m.Log != nil {
Expand Down

0 comments on commit f8b1003

Please sign in to comment.