Skip to content

Commit

Permalink
examples: fix linter complaints
Browse files Browse the repository at this point in the history
  • Loading branch information
dkropachev committed Dec 13, 2024
1 parent cb76fb1 commit 9592390
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 37 deletions.
18 changes: 11 additions & 7 deletions examples/printer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/signal"

"github.com/gocql/gocql"

scyllacdc "github.com/scylladb/scylla-cdc-go"
)

Expand All @@ -24,12 +25,17 @@ func main() {
flag.StringVar(&source, "source", "127.0.0.1", "address of a node in the cluster")
flag.Parse()

// Configure a session first
if err := run(context.Background(), keyspace, table, source); err != nil {
log.Fatal(err)
}
}

func run(ctx context.Context, source, keyspace, table string) error {
cluster := gocql.NewCluster(source)
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc"))
session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
return err
}
defer session.Close()

Expand All @@ -40,9 +46,9 @@ func main() {
Logger: log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile),
}

reader, err := scyllacdc.NewReader(context.Background(), cfg)
reader, err := scyllacdc.NewReader(ctx, cfg)
if err != nil {
log.Fatal(err)
return err
}

// React to Ctrl+C signal, and stop gracefully after the first signal
Expand All @@ -57,9 +63,7 @@ func main() {
}()
signal.Notify(signalC, os.Interrupt)

if err := reader.Run(context.Background()); err != nil {
log.Fatal(err)
}
return reader.Run(ctx)
}

func printerConsumer(ctx context.Context, tableName string, c scyllacdc.Change) error {
Expand Down
32 changes: 13 additions & 19 deletions examples/replicator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ import (
"time"

"github.com/gocql/gocql"

scyllacdc "github.com/scylladb/scylla-cdc-go"
)

// TODO: Escape field names?
var showTimestamps = false
var debugQueries = false
var maxWaitBetweenRetries = 5 * time.Second
var (
showTimestamps = false
debugQueries = false
maxWaitBetweenRetries = 5 * time.Second

var reportPeriod = 1 * time.Minute
reportPeriod = 1 * time.Minute
)

func main() {
var (
Expand Down Expand Up @@ -305,16 +308,6 @@ type DeltaReplicator struct {
reporter *scyllacdc.PeriodicProgressReporter
}

type updateQuerySet struct {
add string
remove string
}

type udtInfo struct {
setterQuery string
fields []string
}

func NewDeltaReplicator(
ctx context.Context,
session *gocql.Session,
Expand Down Expand Up @@ -502,7 +495,7 @@ func (r *DeltaReplicator) processInsertOrUpdate(ctx context.Context, timestamp i
})
}

keyColumns := append(r.pkColumns, r.ckColumns...)
keyColumns := append(r.pkColumns, r.ckColumns...) //nolint:gocritic

if isInsert {
// Insert row to make a row marker
Expand All @@ -522,7 +515,8 @@ func (r *DeltaReplicator) processInsertOrUpdate(ctx context.Context, timestamp i
typ := r.columnTypes[colName]
isNonFrozenCollection := !typ.IsFrozen() && typ.Type().IsCollection()

if !isNonFrozenCollection {
switch {
case !isNonFrozenCollection:
atomicChange := c.GetAtomicChange(colName)
if atomicChange.IsDeleted {
// Delete the value from the column
Expand Down Expand Up @@ -551,7 +545,7 @@ func (r *DeltaReplicator) processInsertOrUpdate(ctx context.Context, timestamp i
return err
}
}
} else if typ.Type() == TypeList {
case typ.Type() == TypeList:
listChange := c.GetListChange(colName)
if listChange.IsReset {
// We can't just do UPDATE SET l = [...],
Expand Down Expand Up @@ -619,7 +613,7 @@ func (r *DeltaReplicator) processInsertOrUpdate(ctx context.Context, timestamp i
}
}
}
} else if typ.Type() == TypeSet || typ.Type() == TypeMap {
case typ.Type() == TypeSet || typ.Type() == TypeMap:
// TODO: Better comment
// Fortunately, both cases can be handled by the same code
// by using reflection. We are forced to use reflection anyway.
Expand Down Expand Up @@ -686,7 +680,7 @@ func (r *DeltaReplicator) processInsertOrUpdate(ctx context.Context, timestamp i
}
}
}
} else if typ.Type() == TypeUDT {
case typ.Type() == TypeUDT:
udtChange := c.GetUDTChange(colName)
if udtChange.IsReset {
// The column was overwritten
Expand Down
13 changes: 10 additions & 3 deletions examples/replicator/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/gocql/gocql"

scyllacdc "github.com/scylladb/scylla-cdc-go"
"github.com/scylladb/scylla-cdc-go/internal/testutils"
)
Expand Down Expand Up @@ -393,12 +394,12 @@ func TestReplicator(t *testing.T) {
sourceAddress,
destinationAddress,
schemaNames,
&adv, gocql.Quorum,
&adv,
gocql.Quorum,
gocql.Quorum,
"",
logger,
)

if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -470,7 +471,9 @@ func TestReplicator(t *testing.T) {
}
}

func createSessionAndSetupSchema(t *testing.T, addr string, keyspaceName string, withCdc bool, schemas map[string]string) *gocql.Session {
func createSessionAndSetupSchema(t *testing.T, addr, keyspaceName string, withCdc bool, schemas map[string]string) *gocql.Session {
t.Helper()

testutils.CreateKeyspace(t, addr, keyspaceName)

cfg := gocql.NewCluster(addr)
Expand Down Expand Up @@ -501,6 +504,8 @@ func createSessionAndSetupSchema(t *testing.T, addr string, keyspaceName string,
}

func execQuery(t *testing.T, session *gocql.Session, query string) {
t.Helper()

t.Logf("executing query %s", query)
err := session.Query(query).Exec()
if err != nil {
Expand All @@ -509,6 +514,8 @@ func execQuery(t *testing.T, session *gocql.Session, query string) {
}

func fetchFullSet(t *testing.T, session *gocql.Session, schemas map[string]string) map[string][]map[string]interface{} {
t.Helper()

groups := make(map[string][]map[string]interface{})

for tbl := range schemas {
Expand Down
21 changes: 13 additions & 8 deletions examples/simple-printer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@ import (
"os"

"github.com/gocql/gocql"

scyllacdc "github.com/scylladb/scylla-cdc-go"
)

// Make sure you create the following table before you run this example:
// CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH cdc = {'enabled': 'true'};

func main() {
cluster := gocql.NewCluster("127.0.0.1")
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc"))
if err := run(context.Background(), []string{"127.0.0.1"}, "local-dc", "ks.tbl"); err != nil {
log.Fatal(err)
}
}

func run(ctx context.Context, hosts []string, localDC, tableName string) error {
cluster := gocql.NewCluster(hosts...)
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy(localDC))
session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
Expand All @@ -24,19 +31,17 @@ func main() {

cfg := &scyllacdc.ReaderConfig{
Session: session,
TableNames: []string{"ks.tbl"},
TableNames: []string{tableName},
ChangeConsumerFactory: changeConsumerFactory,
Logger: log.New(os.Stderr, "", log.Ldate|log.Lshortfile),
}

reader, err := scyllacdc.NewReader(context.Background(), cfg)
reader, err := scyllacdc.NewReader(ctx, cfg)
if err != nil {
log.Fatal(err)
return err
}

if err := reader.Run(context.Background()); err != nil {
log.Fatal(err)
}
return reader.Run(ctx)
}

func consumeChange(ctx context.Context, tableName string, c scyllacdc.Change) error {
Expand Down

0 comments on commit 9592390

Please sign in to comment.