From 505ac38f35b4528b4420b4622f9883dbf9f0602a Mon Sep 17 00:00:00 2001 From: Gowa2017 <33367355+Gowa2017@users.noreply.github.com> Date: Thu, 1 Aug 2024 15:50:39 +0800 Subject: [PATCH 1/6] fix: recognize QualifiedRef of column condition --- sharding.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sharding.go b/sharding.go index 85b2738..2f29d18 100644 --- a/sharding.go +++ b/sharding.go @@ -500,7 +500,14 @@ func (s *Sharding) insertValue(key string, names []*sqlparser.Ident, exprs []sql func (s *Sharding) nonInsertValue(key string, condition sqlparser.Expr, args ...any) (value any, id int64, keyFind bool, err error) { err = sqlparser.Walk(sqlparser.VisitFunc(func(node sqlparser.Node) error { if n, ok := node.(*sqlparser.BinaryExpr); ok { - if x, ok := n.X.(*sqlparser.Ident); ok { + x, ok := n.X.(*sqlparser.Ident) + if !ok { + if q, ok2 := n.X.(*sqlparser.QualifiedRef); ok2 { + x = q.Column + ok = true + } + } + if ok { if x.Name == key && n.Op == sqlparser.EQ { keyFind = true switch expr := n.Y.(type) { From 472f994d22e0f874947c17425e6490bd0719a19f Mon Sep 17 00:00:00 2001 From: Gowa2017 <33367355+Gowa2017@users.noreply.github.com> Date: Fri, 2 Aug 2024 09:16:22 +0800 Subject: [PATCH 2/6] fix: replace condition which with QualifyRef --- sharding.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sharding.go b/sharding.go index 2f29d18..9cc31a7 100644 --- a/sharding.go +++ b/sharding.go @@ -295,6 +295,20 @@ func (s *Sharding) switchConn(db *gorm.DB) { s.mutex.Unlock() } } +func replaceConditionTableName(key, tableName string, expr sqlparser.Expr) error { + + err := sqlparser.Walk(sqlparser.VisitFunc(func(node sqlparser.Node) error { + if n, ok := node.(*sqlparser.BinaryExpr); ok { + if q, ok2 := n.X.(*sqlparser.QualifiedRef); ok2 { + if q.Column.Name == key { + n.X.(*sqlparser.QualifiedRef).Table.Name = tableName + } + } + } + return nil + }), expr) + return err +} // resolve split the old query to full table query and sharding table query func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableName string, err error) { @@ -438,14 +452,17 @@ func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableNa ftQuery = stmt.String() stmt.FromItems = newTable stmt.OrderBy = replaceOrderByTableName(stmt.OrderBy, tableName, newTable.Name.Name) + replaceConditionTableName(r.ShardingKey, newTable.Name.Name, stmt.Condition) stQuery = stmt.String() case *sqlparser.UpdateStatement: ftQuery = stmt.String() stmt.TableName = newTable + replaceConditionTableName(r.ShardingKey, newTable.Name.Name, stmt.Condition) stQuery = stmt.String() case *sqlparser.DeleteStatement: ftQuery = stmt.String() stmt.TableName = newTable + replaceConditionTableName(r.ShardingKey, newTable.Name.Name, stmt.Condition) stQuery = stmt.String() } } From b5a435bf24626c470eb7778ab6aba711e5115d9e Mon Sep 17 00:00:00 2001 From: Gowa2017 <33367355+Gowa2017@users.noreply.github.com> Date: Thu, 8 Aug 2024 10:17:45 +0800 Subject: [PATCH 3/6] fix: debug output replaced sql --- sharding.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sharding.go b/sharding.go index 9cc31a7..612db87 100644 --- a/sharding.go +++ b/sharding.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "hash/crc32" + "log/slog" "strconv" "strings" "sync" @@ -430,6 +431,7 @@ func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableNa ftQuery = insertStmt.String() insertStmt.TableName = newTable stQuery = insertStmt.String() + slog.Debug(stQuery) } else { var value any @@ -465,6 +467,7 @@ func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableNa replaceConditionTableName(r.ShardingKey, newTable.Name.Name, stmt.Condition) stQuery = stmt.String() } + slog.Debug(stQuery) } return From 04c39bdb38296b298bb0be9d92e91ce1bbc7dab8 Mon Sep 17 00:00:00 2001 From: Gowa2017 <33367355+Gowa2017@users.noreply.github.com> Date: Wed, 14 Aug 2024 12:20:17 +0800 Subject: [PATCH 4/6] fix: shading with multi config --- conn_pool.go | 2 +- dialector.go | 2 +- sharding.go | 20 ++++++++++---------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/conn_pool.go b/conn_pool.go index c83adb6..32c8f59 100644 --- a/conn_pool.go +++ b/conn_pool.go @@ -36,7 +36,7 @@ func (pool ConnPool) ExecContext(ctx context.Context, query string, args ...any) pool.sharding.querys.Store("last_query", stQuery) if table != "" { - if r, ok := pool.sharding.configs[table]; ok { + if r, ok := pool.sharding.Configs[table]; ok { if r.DoubleWrite { pool.sharding.Logger.Trace(ctx, curTime, func() (sql string, rowsAffected int64) { result, _ := pool.ConnPool.ExecContext(ctx, ftQuery, args...) diff --git a/dialector.go b/dialector.go index 455ab4c..383bba1 100644 --- a/dialector.go +++ b/dialector.go @@ -107,7 +107,7 @@ func (m ShardingMigrator) splitShardingDsts(dsts ...any) (shardingDsts []shardin return } - if cfg, ok := m.sharding.configs[stmt.Table]; ok { + if cfg, ok := m.sharding.Configs[stmt.Table]; ok { // support sharding table suffixs := cfg.ShardingSuffixs() if len(suffixs) == 0 { diff --git a/sharding.go b/sharding.go index 612db87..6f369ed 100644 --- a/sharding.go +++ b/sharding.go @@ -28,7 +28,7 @@ var ( type Sharding struct { *gorm.DB ConnPool *ConnPool - configs map[string]Config + Configs map[string]Config querys sync.Map snowflakeNodes []*snowflake.Node @@ -112,23 +112,23 @@ func Register(config Config, tables ...any) *Sharding { } func (s *Sharding) compile() error { - if s.configs == nil { - s.configs = make(map[string]Config) + if s.Configs == nil { + s.Configs = make(map[string]Config) } for _, table := range s._tables { if t, ok := table.(string); ok { - s.configs[t] = s._config + s.Configs[t] = s._config } else { stmt := &gorm.Statement{DB: s.DB} if err := stmt.Parse(table); err == nil { - s.configs[stmt.Table] = s._config + s.Configs[stmt.Table] = s._config } else { return err } } } - for t, c := range s.configs { + for t, c := range s.Configs { if c.NumberOfShards > 1024 && c.PrimaryKeyGenerator == PKSnowflake { panic("Snowflake NumberOfShards should less than 1024") } @@ -217,7 +217,7 @@ func (s *Sharding) compile() error { } } } - s.configs[t] = c + s.Configs[t] = c } return nil @@ -243,7 +243,7 @@ func (s *Sharding) Initialize(db *gorm.DB) error { s.DB = db s.registerCallbacks(db) - for t, c := range s.configs { + for t, c := range s.Configs { if c.PrimaryKeyGenerator == PKPGSequence { err := s.DB.Exec("CREATE SEQUENCE IF NOT EXISTS " + pgSeqName(t)).Error if err != nil { @@ -315,7 +315,7 @@ func replaceConditionTableName(key, tableName string, expr sqlparser.Expr) error func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableName string, err error) { ftQuery = query stQuery = query - if len(s.configs) == 0 { + if len(s.Configs) == 0 { return } @@ -359,7 +359,7 @@ func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableNa } tableName = table.Name.Name - r, ok := s.configs[tableName] + r, ok := s.Configs[tableName] if !ok { return } From e436477907953fc3073a3a491303c704f84d0538 Mon Sep 17 00:00:00 2001 From: Gowa2017 <33367355+Gowa2017@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:45:31 +0800 Subject: [PATCH 5/6] fix: replace hashing select field qulify table name --- sharding.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/sharding.go b/sharding.go index 6f369ed..4153d8f 100644 --- a/sharding.go +++ b/sharding.go @@ -296,12 +296,12 @@ func (s *Sharding) switchConn(db *gorm.DB) { s.mutex.Unlock() } } -func replaceConditionTableName(key, tableName string, expr sqlparser.Expr) error { +func replaceConditionTableName(oldTableName, tableName string, expr sqlparser.Expr) error { err := sqlparser.Walk(sqlparser.VisitFunc(func(node sqlparser.Node) error { if n, ok := node.(*sqlparser.BinaryExpr); ok { if q, ok2 := n.X.(*sqlparser.QualifiedRef); ok2 { - if q.Column.Name == key { + if q.Table.Name == oldTableName { n.X.(*sqlparser.QualifiedRef).Table.Name = tableName } } @@ -310,6 +310,22 @@ func replaceConditionTableName(key, tableName string, expr sqlparser.Expr) error }), expr) return err } +func replaceSelectFieldTableName(oldTableName, tableName string, columns *sqlparser.OutputNames) *sqlparser.OutputNames { + rcs := []*sqlparser.ResultColumn(*columns) + for i := 0; i < len(rcs); i++ { + rc := rcs[i] + if n, ok := rc.Expr.(*sqlparser.BinaryExpr); ok { + if q, ok2 := n.X.(*sqlparser.QualifiedRef); ok2 { + if q.Table.Name == oldTableName { + n.X.(*sqlparser.QualifiedRef).Table.Name = tableName + } + } + } + + } + r := sqlparser.OutputNames(rcs) + return &r +} // resolve split the old query to full table query and sharding table query func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableName string, err error) { @@ -454,20 +470,20 @@ func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableNa ftQuery = stmt.String() stmt.FromItems = newTable stmt.OrderBy = replaceOrderByTableName(stmt.OrderBy, tableName, newTable.Name.Name) - replaceConditionTableName(r.ShardingKey, newTable.Name.Name, stmt.Condition) + replaceConditionTableName(tableName, newTable.Name.Name, stmt.Condition) + stmt.Columns = replaceSelectFieldTableName(tableName, newTable.Name.Name, stmt.Columns) stQuery = stmt.String() case *sqlparser.UpdateStatement: ftQuery = stmt.String() stmt.TableName = newTable - replaceConditionTableName(r.ShardingKey, newTable.Name.Name, stmt.Condition) + replaceConditionTableName(tableName, newTable.Name.Name, stmt.Condition) stQuery = stmt.String() case *sqlparser.DeleteStatement: ftQuery = stmt.String() stmt.TableName = newTable - replaceConditionTableName(r.ShardingKey, newTable.Name.Name, stmt.Condition) + replaceConditionTableName(tableName, newTable.Name.Name, stmt.Condition) stQuery = stmt.String() } - slog.Debug(stQuery) } return From aff971a67ddfa91fcd773b5ba349e8b1cb8a7e0d Mon Sep 17 00:00:00 2001 From: Gowa2017 <33367355+Gowa2017@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:45:31 +0800 Subject: [PATCH 6/6] fix: replace hashing select field qulify table name --- sharding.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sharding.go b/sharding.go index 4153d8f..1d5c3c2 100644 --- a/sharding.go +++ b/sharding.go @@ -314,14 +314,11 @@ func replaceSelectFieldTableName(oldTableName, tableName string, columns *sqlpar rcs := []*sqlparser.ResultColumn(*columns) for i := 0; i < len(rcs); i++ { rc := rcs[i] - if n, ok := rc.Expr.(*sqlparser.BinaryExpr); ok { - if q, ok2 := n.X.(*sqlparser.QualifiedRef); ok2 { - if q.Table.Name == oldTableName { - n.X.(*sqlparser.QualifiedRef).Table.Name = tableName - } + if n, ok := rc.Expr.(*sqlparser.QualifiedRef); ok { + if n.Table.Name == oldTableName { + n.Table.Name = tableName } } - } r := sqlparser.OutputNames(rcs) return &r @@ -484,6 +481,7 @@ func (s *Sharding) resolve(query string, args ...any) (ftQuery, stQuery, tableNa replaceConditionTableName(tableName, newTable.Name.Name, stmt.Condition) stQuery = stmt.String() } + slog.Debug(stQuery) } return