From 95923900f3662bcded83164a2d0c32f8149b871a Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Fri, 13 Dec 2024 13:08:38 -0400 Subject: [PATCH] examples: fix linter complaints --- examples/printer/main.go | 18 +++++++++------ examples/replicator/main.go | 32 +++++++++++--------------- examples/replicator/replicator_test.go | 13 ++++++++--- examples/simple-printer/main.go | 21 ++++++++++------- 4 files changed, 47 insertions(+), 37 deletions(-) diff --git a/examples/printer/main.go b/examples/printer/main.go index c5c9e8f..c8e8c57 100644 --- a/examples/printer/main.go +++ b/examples/printer/main.go @@ -9,6 +9,7 @@ import ( "os/signal" "github.com/gocql/gocql" + scyllacdc "github.com/scylladb/scylla-cdc-go" ) @@ -24,12 +25,17 @@ func main() { flag.StringVar(&source, "source", "127.0.0.1", "address of a node in the cluster") flag.Parse() - // Configure a session first + if err := run(context.Background(), keyspace, table, source); err != nil { + log.Fatal(err) + } +} + +func run(ctx context.Context, source, keyspace, table string) error { cluster := gocql.NewCluster(source) cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc")) session, err := cluster.CreateSession() if err != nil { - log.Fatal(err) + return err } defer session.Close() @@ -40,9 +46,9 @@ func main() { Logger: log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile), } - reader, err := scyllacdc.NewReader(context.Background(), cfg) + reader, err := scyllacdc.NewReader(ctx, cfg) if err != nil { - log.Fatal(err) + return err } // React to Ctrl+C signal, and stop gracefully after the first signal @@ -57,9 +63,7 @@ func main() { }() signal.Notify(signalC, os.Interrupt) - if err := reader.Run(context.Background()); err != nil { - log.Fatal(err) - } + return reader.Run(ctx) } func printerConsumer(ctx context.Context, tableName string, c scyllacdc.Change) error { diff --git a/examples/replicator/main.go b/examples/replicator/main.go index e58105d..3647377 100644 --- a/examples/replicator/main.go +++ b/examples/replicator/main.go @@ -15,15 +15,18 @@ import ( "time" "github.com/gocql/gocql" + scyllacdc "github.com/scylladb/scylla-cdc-go" ) // TODO: Escape field names? -var showTimestamps = false -var debugQueries = false -var maxWaitBetweenRetries = 5 * time.Second +var ( + showTimestamps = false + debugQueries = false + maxWaitBetweenRetries = 5 * time.Second -var reportPeriod = 1 * time.Minute + reportPeriod = 1 * time.Minute +) func main() { var ( @@ -305,16 +308,6 @@ type DeltaReplicator struct { reporter *scyllacdc.PeriodicProgressReporter } -type updateQuerySet struct { - add string - remove string -} - -type udtInfo struct { - setterQuery string - fields []string -} - func NewDeltaReplicator( ctx context.Context, session *gocql.Session, @@ -502,7 +495,7 @@ func (r *DeltaReplicator) processInsertOrUpdate(ctx context.Context, timestamp i }) } - keyColumns := append(r.pkColumns, r.ckColumns...) + keyColumns := append(r.pkColumns, r.ckColumns...) //nolint:gocritic if isInsert { // Insert row to make a row marker @@ -522,7 +515,8 @@ func (r *DeltaReplicator) processInsertOrUpdate(ctx context.Context, timestamp i typ := r.columnTypes[colName] isNonFrozenCollection := !typ.IsFrozen() && typ.Type().IsCollection() - if !isNonFrozenCollection { + switch { + case !isNonFrozenCollection: atomicChange := c.GetAtomicChange(colName) if atomicChange.IsDeleted { // Delete the value from the column @@ -551,7 +545,7 @@ func (r *DeltaReplicator) processInsertOrUpdate(ctx context.Context, timestamp i return err } } - } else if typ.Type() == TypeList { + case typ.Type() == TypeList: listChange := c.GetListChange(colName) if listChange.IsReset { // We can't just do UPDATE SET l = [...], @@ -619,7 +613,7 @@ func (r *DeltaReplicator) processInsertOrUpdate(ctx context.Context, timestamp i } } } - } else if typ.Type() == TypeSet || typ.Type() == TypeMap { + case typ.Type() == TypeSet || typ.Type() == TypeMap: // TODO: Better comment // Fortunately, both cases can be handled by the same code // by using reflection. We are forced to use reflection anyway. @@ -686,7 +680,7 @@ func (r *DeltaReplicator) processInsertOrUpdate(ctx context.Context, timestamp i } } } - } else if typ.Type() == TypeUDT { + case typ.Type() == TypeUDT: udtChange := c.GetUDTChange(colName) if udtChange.IsReset { // The column was overwritten diff --git a/examples/replicator/replicator_test.go b/examples/replicator/replicator_test.go index 777581b..dc3205f 100644 --- a/examples/replicator/replicator_test.go +++ b/examples/replicator/replicator_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/gocql/gocql" + scyllacdc "github.com/scylladb/scylla-cdc-go" "github.com/scylladb/scylla-cdc-go/internal/testutils" ) @@ -393,12 +394,12 @@ func TestReplicator(t *testing.T) { sourceAddress, destinationAddress, schemaNames, - &adv, gocql.Quorum, + &adv, + gocql.Quorum, gocql.Quorum, "", logger, ) - if err != nil { t.Fatal(err) } @@ -470,7 +471,9 @@ func TestReplicator(t *testing.T) { } } -func createSessionAndSetupSchema(t *testing.T, addr string, keyspaceName string, withCdc bool, schemas map[string]string) *gocql.Session { +func createSessionAndSetupSchema(t *testing.T, addr, keyspaceName string, withCdc bool, schemas map[string]string) *gocql.Session { + t.Helper() + testutils.CreateKeyspace(t, addr, keyspaceName) cfg := gocql.NewCluster(addr) @@ -501,6 +504,8 @@ func createSessionAndSetupSchema(t *testing.T, addr string, keyspaceName string, } func execQuery(t *testing.T, session *gocql.Session, query string) { + t.Helper() + t.Logf("executing query %s", query) err := session.Query(query).Exec() if err != nil { @@ -509,6 +514,8 @@ func execQuery(t *testing.T, session *gocql.Session, query string) { } func fetchFullSet(t *testing.T, session *gocql.Session, schemas map[string]string) map[string][]map[string]interface{} { + t.Helper() + groups := make(map[string][]map[string]interface{}) for tbl := range schemas { diff --git a/examples/simple-printer/main.go b/examples/simple-printer/main.go index 8ee1230..3e95049 100644 --- a/examples/simple-printer/main.go +++ b/examples/simple-printer/main.go @@ -7,6 +7,7 @@ import ( "os" "github.com/gocql/gocql" + scyllacdc "github.com/scylladb/scylla-cdc-go" ) @@ -14,8 +15,14 @@ import ( // CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH cdc = {'enabled': 'true'}; func main() { - cluster := gocql.NewCluster("127.0.0.1") - cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc")) + if err := run(context.Background(), []string{"127.0.0.1"}, "local-dc", "ks.tbl"); err != nil { + log.Fatal(err) + } +} + +func run(ctx context.Context, hosts []string, localDC, tableName string) error { + cluster := gocql.NewCluster(hosts...) + cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy(localDC)) session, err := cluster.CreateSession() if err != nil { log.Fatal(err) @@ -24,19 +31,17 @@ func main() { cfg := &scyllacdc.ReaderConfig{ Session: session, - TableNames: []string{"ks.tbl"}, + TableNames: []string{tableName}, ChangeConsumerFactory: changeConsumerFactory, Logger: log.New(os.Stderr, "", log.Ldate|log.Lshortfile), } - reader, err := scyllacdc.NewReader(context.Background(), cfg) + reader, err := scyllacdc.NewReader(ctx, cfg) if err != nil { - log.Fatal(err) + return err } - if err := reader.Run(context.Background()); err != nil { - log.Fatal(err) - } + return reader.Run(ctx) } func consumeChange(ctx context.Context, tableName string, c scyllacdc.Change) error {