diff --git a/examples/printer/main.go b/examples/printer/main.go new file mode 100644 index 0000000..c5c9e8f --- /dev/null +++ b/examples/printer/main.go @@ -0,0 +1,90 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "os/signal" + + "github.com/gocql/gocql" + scyllacdc "github.com/scylladb/scylla-cdc-go" +) + +func main() { + var ( + keyspace string + table string + source string + ) + + flag.StringVar(&keyspace, "keyspace", "", "keyspace name") + flag.StringVar(&table, "table", "", "table name") + flag.StringVar(&source, "source", "127.0.0.1", "address of a node in the cluster") + flag.Parse() + + // Configure a session first + cluster := gocql.NewCluster(source) + cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc")) + session, err := cluster.CreateSession() + if err != nil { + log.Fatal(err) + } + defer session.Close() + + cfg := &scyllacdc.ReaderConfig{ + Session: session, + ChangeConsumerFactory: changeConsumerFactory, + TableNames: []string{keyspace + "." + table}, + Logger: log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile), + } + + reader, err := scyllacdc.NewReader(context.Background(), cfg) + if err != nil { + log.Fatal(err) + } + + // React to Ctrl+C signal, and stop gracefully after the first signal + // Second signal exits the process + signalC := make(chan os.Signal, 2) + go func() { + <-signalC + reader.Stop() + + <-signalC + os.Exit(1) + }() + signal.Notify(signalC, os.Interrupt) + + if err := reader.Run(context.Background()); err != nil { + log.Fatal(err) + } +} + +func printerConsumer(ctx context.Context, tableName string, c scyllacdc.Change) error { + fmt.Printf("[%s %s]:\n", c.StreamID, c.Time.String()) + if len(c.PreImage) > 0 { + fmt.Println(" PREIMAGE:") + for _, r := range c.PreImage { + fmt.Printf(" %s\n", r) + } + } + if len(c.Delta) > 0 { + fmt.Println(" DELTA:") + for _, r := range c.Delta { + fmt.Printf(" %s\n", r) + } + } + if len(c.PostImage) > 0 { + fmt.Println(" POSTIMAGE:") + for _, r := range c.PostImage { + fmt.Printf(" %s\n", r) + } + } + fmt.Println() + + return nil +} + +var changeConsumerFactory = scyllacdc.MakeChangeConsumerFactoryFromFunc(printerConsumer) diff --git a/examples/replicator/.gitignore b/examples/replicator/.gitignore new file mode 100644 index 0000000..a22508d --- /dev/null +++ b/examples/replicator/.gitignore @@ -0,0 +1 @@ +replicator diff --git a/examples/replicator/main.go b/examples/replicator/main.go new file mode 100644 index 0000000..e58105d --- /dev/null +++ b/examples/replicator/main.go @@ -0,0 +1,957 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "log" + "math/rand" + "os" + "os/signal" + "reflect" + "strings" + "sync/atomic" + "time" + + "github.com/gocql/gocql" + scyllacdc "github.com/scylladb/scylla-cdc-go" +) + +// TODO: Escape field names? +var showTimestamps = false +var debugQueries = false +var maxWaitBetweenRetries = 5 * time.Second + +var reportPeriod = 1 * time.Minute + +func main() { + var ( + keyspace string + table string + source string + destination string + readConsistency string + writeConsistency string + + progressTable string + ) + + flag.StringVar(&keyspace, "keyspace", "", "keyspace name") + flag.StringVar(&table, "table", "", "table name; you can specify multiple table by separating them with a comma") + flag.StringVar(&source, "source", "", "address of a node in source cluster") + flag.StringVar(&destination, "destination", "", "address of a node in destination cluster") + flag.StringVar(&readConsistency, "read-consistency", "", "consistency level used to read from cdc log (one, quorum, all)") + flag.StringVar(&writeConsistency, "write-consistency", "", "consistency level used to write to the destination cluster (one, quorum, all)") + flag.StringVar(&progressTable, "progress-table", "", "fully-qualified name of the table in the destination cluster to use for saving progress; if omitted, the progress won't be saved") + flag.String("mode", "", "mode (ignored)") + flag.Parse() + + clRead := parseConsistency(readConsistency) + clWrite := parseConsistency(writeConsistency) + + adv := scyllacdc.AdvancedReaderConfig{ + ConfidenceWindowSize: 30 * time.Second, + ChangeAgeLimit: 10 * time.Minute, + QueryTimeWindowSize: 60 * time.Second, + PostEmptyQueryDelay: 30 * time.Second, + PostNonEmptyQueryDelay: 10 * time.Second, + PostFailedQueryDelay: 1 * time.Second, + } + + fmt.Println("Parameters:") + fmt.Printf(" Keyspace: %s\n", keyspace) + fmt.Printf(" Table: %s\n", table) + fmt.Printf(" Source cluster IP: %s\n", source) + fmt.Printf(" Destination cluster IP: %s\n", destination) + fmt.Printf(" Consistency for reads: %s\n", clRead) + fmt.Printf(" Consistency for writes: %s\n", clWrite) + fmt.Printf(" Table to use for saving progress: %s\n", progressTable) + fmt.Println("Advanced reader parameters:") + fmt.Printf(" Confidence window size: %s\n", adv.ConfidenceWindowSize) + fmt.Printf(" Change age limit: %s\n", adv.ChangeAgeLimit) + fmt.Printf(" Query window size: %s\n", adv.QueryTimeWindowSize) + fmt.Printf(" Delay after poll with empty results: %s\n", adv.PostEmptyQueryDelay) + fmt.Printf(" Delay after poll with non-empty results: %s\n", adv.PostNonEmptyQueryDelay) + fmt.Printf(" Delay after failed poll: %s\n", adv.PostFailedQueryDelay) + + var fullyQualifiedTables []string + + for _, t := range strings.Split(table, ",") { + fullyQualifiedTables = append(fullyQualifiedTables, keyspace+"."+t) + } + + logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile) + repl, err := newReplicator( + context.Background(), + source, destination, + fullyQualifiedTables, + &adv, + clRead, + clWrite, + progressTable, + logger, + ) + if err != nil { + log.Fatalln(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + // React to Ctrl+C signal. + // + // 1st signal will cause the replicator to read changes up until + // the moment the signal was received, and then it will stop the replicator. + // This is the "most graceful" way of stopping the replicator. + // + // 2nd signal will cancel the context. This should stop all operations + // done by the replicator ASAP and stop it. + // + // 3rd signal will exit the process immediately with error code 1. + signalC := make(chan os.Signal, 3) + go func() { + <-signalC + now := time.Now() + log.Printf("stopping at %v", now) + repl.StopAt(now) + + <-signalC + log.Printf("stopping now") + cancel() + + <-signalC + log.Printf("killing") + os.Exit(1) + }() + signal.Notify(signalC, os.Interrupt) + + if err := repl.Run(ctx); err != nil { + log.Fatalln(err) + } + + log.Printf("quitting, rows read: %d", repl.GetReadRowsCount()) +} + +func parseConsistency(s string) gocql.Consistency { + switch strings.ToLower(s) { + case "one": + return gocql.One + case "quorum": + return gocql.Quorum + case "all": + return gocql.All + default: + log.Printf("warning: got unsupported consistency level \"%s\", will use \"one\" instead", s) + return gocql.One + } +} + +type replicator struct { + reader *scyllacdc.Reader + + sourceSession *gocql.Session + destinationSession *gocql.Session + + rowsRead *int64 +} + +func newReplicator( + ctx context.Context, + source, destination string, + tableNames []string, + advancedParams *scyllacdc.AdvancedReaderConfig, + readConsistency gocql.Consistency, + writeConsistency gocql.Consistency, + progressTable string, + logger scyllacdc.Logger, +) (*replicator, error) { + destinationCluster := gocql.NewCluster(destination) + destinationCluster.Timeout = 10 * time.Second + destinationSession, err := destinationCluster.CreateSession() + if err != nil { + return nil, err + } + + // Configure a session + cluster := gocql.NewCluster(source) + cluster.Timeout = 10 * time.Second + cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) + sourceSession, err := cluster.CreateSession() + if err != nil { + destinationSession.Close() + return nil, err + } + + rowsRead := new(int64) + + factory := &replicatorFactory{ + destinationSession: destinationSession, + consistency: writeConsistency, + + rowsRead: rowsRead, + logger: logger, + } + + var progressManager scyllacdc.ProgressManager + if progressTable != "" { + progressManager, err = scyllacdc.NewTableBackedProgressManager(destinationSession, progressTable, "cdc-replicator") + if err != nil { + destinationSession.Close() + return nil, err + } + } + + cfg := &scyllacdc.ReaderConfig{ + Session: sourceSession, + ChangeConsumerFactory: factory, + ProgressManager: progressManager, + TableNames: tableNames, + Consistency: readConsistency, + } + + if advancedParams != nil { + cfg.Advanced = *advancedParams + } + cfg.Consistency = readConsistency + cfg.Logger = logger + + reader, err := scyllacdc.NewReader(ctx, cfg) + if err != nil { + sourceSession.Close() + destinationSession.Close() + return nil, err + } + + repl := &replicator{ + reader: reader, + + sourceSession: sourceSession, + destinationSession: destinationSession, + + rowsRead: rowsRead, + } + + return repl, nil +} + +func (repl *replicator) Run(ctx context.Context) error { + defer repl.destinationSession.Close() + defer repl.sourceSession.Close() + return repl.reader.Run(ctx) +} + +func (repl *replicator) StopAt(at time.Time) { + repl.reader.StopAt(at) +} + +func (repl *replicator) Stop() { + repl.reader.Stop() +} + +func (repl *replicator) GetReadRowsCount() int64 { + return atomic.LoadInt64(repl.rowsRead) +} + +type replicatorFactory struct { + destinationSession *gocql.Session + consistency gocql.Consistency + + rowsRead *int64 + logger scyllacdc.Logger +} + +func (rf *replicatorFactory) CreateChangeConsumer( + ctx context.Context, + input scyllacdc.CreateChangeConsumerInput, +) (scyllacdc.ChangeConsumer, error) { + splitTableName := strings.SplitN(input.TableName, ".", 2) + if len(splitTableName) < 2 { + return nil, fmt.Errorf("table name is not fully qualified: %s", input.TableName) + } + + kmeta, err := rf.destinationSession.KeyspaceMetadata(splitTableName[0]) + if err != nil { + rf.destinationSession.Close() + return nil, err + } + tmeta, ok := kmeta.Tables[splitTableName[1]] + if !ok { + rf.destinationSession.Close() + return nil, fmt.Errorf("table %s does not exist", input.TableName) + } + + return NewDeltaReplicator(ctx, rf.destinationSession, kmeta, tmeta, rf.consistency, rf.rowsRead, input.StreamID, input.ProgressReporter, rf.logger) +} + +type DeltaReplicator struct { + session *gocql.Session + tableName string + consistency gocql.Consistency + + pkColumns []string + ckColumns []string + otherColumns []string + columnTypes map[string]TypeInfo + allColumns []string + + insertStr string + rowDeleteQueryStr string + partitionDeleteQueryStr string + + localCount int64 + totalCount *int64 + + streamID scyllacdc.StreamID + reporter *scyllacdc.PeriodicProgressReporter +} + +type updateQuerySet struct { + add string + remove string +} + +type udtInfo struct { + setterQuery string + fields []string +} + +func NewDeltaReplicator( + ctx context.Context, + session *gocql.Session, + kmeta *gocql.KeyspaceMetadata, + meta *gocql.TableMetadata, + consistency gocql.Consistency, + count *int64, + streamID scyllacdc.StreamID, + reporter *scyllacdc.ProgressReporter, + logger scyllacdc.Logger, +) (*DeltaReplicator, error) { + var ( + pkColumns []string + ckColumns []string + otherColumns []string + ) + + for _, name := range meta.OrderedColumns { + colDesc := meta.Columns[name] + switch colDesc.Kind { + case gocql.ColumnPartitionKey: + pkColumns = append(pkColumns, name) + case gocql.ColumnClusteringKey: + ckColumns = append(ckColumns, name) + default: + otherColumns = append(otherColumns, name) + } + } + + columnTypes := make(map[string]TypeInfo, len(meta.Columns)) + for colName, colMeta := range meta.Columns { + info := parseType(colMeta.Type) + columnTypes[colName] = info + } + + dr := &DeltaReplicator{ + session: session, + tableName: meta.Keyspace + "." + meta.Name, + consistency: consistency, + + pkColumns: pkColumns, + ckColumns: ckColumns, + otherColumns: otherColumns, + columnTypes: columnTypes, + allColumns: append(append(append([]string{}, otherColumns...), pkColumns...), ckColumns...), + + totalCount: count, + + streamID: streamID, + reporter: scyllacdc.NewPeriodicProgressReporter(logger, reportPeriod, reporter), + } + + dr.precomputeQueries() + + dr.reporter.Start(ctx) + return dr, nil +} + +func (r *DeltaReplicator) precomputeQueries() { + keyColumns := append(append([]string{}, r.pkColumns...), r.ckColumns...) + + var bindMarkers []string + for _, columnName := range keyColumns { + bindMarkers = append(bindMarkers, makeBindMarkerForType(r.columnTypes[columnName])) + } + + r.insertStr = fmt.Sprintf( + "INSERT INTO %s (%s) VALUES (%s) USING TTL ?", + r.tableName, strings.Join(keyColumns, ", "), strings.Join(bindMarkers, ", "), + ) + + r.rowDeleteQueryStr = fmt.Sprintf( + "DELETE FROM %s WHERE %s", + r.tableName, + r.makeBindMarkerAssignments(keyColumns, " AND "), + ) + + r.partitionDeleteQueryStr = fmt.Sprintf( + "DELETE FROM %s WHERE %s", + r.tableName, + r.makeBindMarkerAssignments(r.pkColumns, " AND "), + ) +} + +func (r *DeltaReplicator) Consume(ctx context.Context, c scyllacdc.Change) error { + timestamp := c.GetCassandraTimestamp() + pos := 0 + + if showTimestamps { + log.Printf("[%s] Processing timestamp: %s (%s)\n", c.StreamID, c.Time, c.Time.Time()) + } + + for pos < len(c.Delta) { + change := c.Delta[pos] + var err error + switch change.GetOperation() { + case scyllacdc.Update: + err = r.processUpdate(ctx, timestamp, change) + pos++ + + case scyllacdc.Insert: + err = r.processInsert(ctx, timestamp, change) + pos++ + + case scyllacdc.RowDelete: + err = r.processRowDelete(ctx, timestamp, change) + pos++ + + case scyllacdc.PartitionDelete: + err = r.processPartitionDelete(ctx, timestamp, change) + pos++ + + case scyllacdc.RangeDeleteStartInclusive, scyllacdc.RangeDeleteStartExclusive: + // Range delete start row should always be followed by a range delete end row. + // They should always come in pairs. + if pos+2 > len(c.Delta) { + return errors.New("invalid change: range delete start row without corresponding end row") + } + start := change + end := c.Delta[pos+1] + if end.GetOperation() != scyllacdc.RangeDeleteEndInclusive && end.GetOperation() != scyllacdc.RangeDeleteEndExclusive { + return errors.New("invalid change: range delete start row without corresponding end row") + } + err = r.processRangeDelete(ctx, timestamp, start, end) + pos += 2 + + case scyllacdc.RangeDeleteEndInclusive, scyllacdc.RangeDeleteEndExclusive: + // This should not happen and indicates some kind of inconsistency. + // Every RangeDeleteEnd... row should be preceded by a RangeDeleteStart... row. + return errors.New("invalid change: range delete end row does not have a corresponding start row") + + default: + return errors.New("unsupported operation: " + change.GetOperation().String()) + } + + if err != nil { + return err + } + } + + r.reporter.Update(c.Time) + r.localCount += int64(len(c.Delta)) + + return nil +} + +func (r *DeltaReplicator) End() error { + log.Printf("Streams [%s]: processed %d changes in total", r.streamID, r.localCount) + atomic.AddInt64(r.totalCount, r.localCount) + _ = r.reporter.SaveAndStop(context.Background()) + return nil +} + +func (r *DeltaReplicator) Empty(ctx context.Context, ackTime gocql.UUID) error { + log.Printf("Streams [%s]: saw no changes up to %s", r.streamID, ackTime.Time()) + r.reporter.Update(ackTime) + return nil +} + +// Make sure that DeltaReplicator supports the ChangeOrEmptyNotificationConsumer interface +var _ scyllacdc.ChangeOrEmptyNotificationConsumer = (*DeltaReplicator)(nil) + +func (r *DeltaReplicator) processUpdate(ctx context.Context, timestamp int64, c *scyllacdc.ChangeRow) error { + return r.processInsertOrUpdate(ctx, timestamp, false, c) +} + +func (r *DeltaReplicator) processInsert(ctx context.Context, timestamp int64, c *scyllacdc.ChangeRow) error { + return r.processInsertOrUpdate(ctx, timestamp, true, c) +} + +func (r *DeltaReplicator) processInsertOrUpdate(ctx context.Context, timestamp int64, isInsert bool, c *scyllacdc.ChangeRow) error { + runQuery := func(q string, vals []interface{}) error { + if debugQueries { + fmt.Println(q) + fmt.Println(vals...) + } + + return tryWithExponentialBackoff(ctx, func() error { + return r.session. + Query(q, vals...). + Consistency(r.consistency). + Idempotent(true). + WithTimestamp(timestamp). + Exec() + }) + } + + keyColumns := append(r.pkColumns, r.ckColumns...) + + if isInsert { + // Insert row to make a row marker + // The rest of the columns will be set by using UPDATE queries + var vals []interface{} + vals = appendKeyValuesToBind(vals, keyColumns, c) + vals = append(vals, c.GetTTL()) + if err := runQuery(r.insertStr, vals); err != nil { + return err + } + } + + // Precompute the WHERE x = ? AND y = ? ... part + pkConditions := r.makeBindMarkerAssignments(keyColumns, " AND ") + + for _, colName := range r.otherColumns { + typ := r.columnTypes[colName] + isNonFrozenCollection := !typ.IsFrozen() && typ.Type().IsCollection() + + if !isNonFrozenCollection { + atomicChange := c.GetAtomicChange(colName) + if atomicChange.IsDeleted { + // Delete the value from the column + deleteStr := fmt.Sprintf( + "DELETE %s FROM %s WHERE %s", + colName, r.tableName, pkConditions, + ) + + var vals []interface{} + vals = appendKeyValuesToBind(vals, keyColumns, c) + if err := runQuery(deleteStr, vals); err != nil { + return err + } + } else if !reflect.ValueOf(atomicChange.Value).IsNil() { + // The column was overwritten + updateStr := fmt.Sprintf( + "UPDATE %s USING TTL ? SET %s = %s WHERE %s", + r.tableName, colName, makeBindMarkerForType(typ), pkConditions, + ) + + var vals []interface{} + vals = append(vals, c.GetTTL()) + vals = appendValueByType(vals, atomicChange.Value, typ) + vals = appendKeyValuesToBind(vals, keyColumns, c) + if err := runQuery(updateStr, vals); err != nil { + return err + } + } + } else if typ.Type() == TypeList { + listChange := c.GetListChange(colName) + if listChange.IsReset { + // We can't just do UPDATE SET l = [...], + // because we need to precisely control timestamps + // of the list cells. This can be done only by + // UPDATE tbl SET l[SCYLLA_TIMEUUID_LIST_INDEX(?)] = ?, + // which is equivalent to an append of one cell. + // Hence, the need for clear + append. + // + // We clear using a timestamp one-less-than the real + // timestamp of the write. This is what Cassandra/Scylla + // does internally, so it's OK to for us to do that. + deleteStr := fmt.Sprintf( + "DELETE %s FROM %s USING TIMESTAMP ? WHERE %s", + colName, r.tableName, pkConditions, + ) + + var vals []interface{} + vals = append(vals, timestamp-1) + vals = appendKeyValuesToBind(vals, keyColumns, c) + if err := runQuery(deleteStr, vals); err != nil { + return err + } + } + if !reflect.ValueOf(listChange.AppendedElements).IsNil() { + // TODO: Explain + setStr := fmt.Sprintf( + "UPDATE %s USING TTL ? SET %s[SCYLLA_TIMEUUID_LIST_INDEX(?)] = %s WHERE %s", + r.tableName, colName, makeBindMarkerForType(typ), pkConditions, + ) + + rAppendedElements := reflect.ValueOf(listChange.AppendedElements) + r := rAppendedElements.MapRange() + for r.Next() { + k := r.Key().Interface() + v := r.Value().Interface() + + var vals []interface{} + vals = append(vals, c.GetTTL()) + vals = append(vals, k) + vals = appendValueByType(vals, v, typ) + vals = appendKeyValuesToBind(vals, keyColumns, c) + if err := runQuery(setStr, vals); err != nil { + return err + } + } + } + if !reflect.ValueOf(listChange.RemovedElements).IsNil() { + // TODO: Explain + clearStr := fmt.Sprintf( + "UPDATE %s SET %s[SCYLLA_TIMEUUID_LIST_INDEX(?)] = null WHERE %s", + r.tableName, colName, pkConditions, + ) + + rRemovedElements := reflect.ValueOf(listChange.RemovedElements) + elsLen := rRemovedElements.Len() + for i := 0; i < elsLen; i++ { + k := rRemovedElements.Index(i).Interface() + + var vals []interface{} + vals = append(vals, k) + vals = appendKeyValuesToBind(vals, keyColumns, c) + if err := runQuery(clearStr, vals); err != nil { + return err + } + } + } + } else if typ.Type() == TypeSet || typ.Type() == TypeMap { + // TODO: Better comment + // Fortunately, both cases can be handled by the same code + // by using reflection. We are forced to use reflection anyway. + var ( + added, removed interface{} + isReset bool + ) + + if typ.Type() == TypeSet { + setChange := c.GetSetChange(colName) + added = setChange.AddedElements + removed = setChange.RemovedElements + isReset = setChange.IsReset + } else { + mapChange := c.GetSetChange(colName) + added = mapChange.AddedElements + removed = mapChange.RemovedElements + isReset = mapChange.IsReset + } + + if isReset { + // Overwrite the existing value + setStr := fmt.Sprintf( + "UPDATE %s USING TTL ? SET %s = ? WHERE %s", + r.tableName, colName, pkConditions, + ) + + var vals []interface{} + vals = append(vals, c.GetTTL()) + vals = append(vals, added) + vals = appendKeyValuesToBind(vals, keyColumns, c) + if err := runQuery(setStr, vals); err != nil { + return err + } + } else { + if !reflect.ValueOf(added).IsNil() { + // Add elements + addStr := fmt.Sprintf( + "UPDATE %s USING TTL ? SET %s = %s + ? WHERE %s", + r.tableName, colName, colName, pkConditions, + ) + + var vals []interface{} + vals = append(vals, c.GetTTL()) + vals = append(vals, added) + vals = appendKeyValuesToBind(vals, keyColumns, c) + if err := runQuery(addStr, vals); err != nil { + return err + } + } + if !reflect.ValueOf(removed).IsNil() { + // Removed elements + remStr := fmt.Sprintf( + "UPDATE %s USING TTL ? SET %s = %s - ? WHERE %s", + r.tableName, colName, colName, pkConditions, + ) + + var vals []interface{} + vals = append(vals, c.GetTTL()) + vals = append(vals, removed) + vals = appendKeyValuesToBind(vals, keyColumns, c) + if err := runQuery(remStr, vals); err != nil { + return err + } + } + } + } else if typ.Type() == TypeUDT { + udtChange := c.GetUDTChange(colName) + if udtChange.IsReset { + // The column was overwritten + updateStr := fmt.Sprintf( + "UPDATE %s USING TTL ? SET %s = %s WHERE %s", + r.tableName, colName, makeBindMarkerForType(typ), pkConditions, + ) + + var vals []interface{} + vals = append(vals, c.GetTTL()) + vals = appendValueByType(vals, udtChange.AddedFields, typ) + vals = appendKeyValuesToBind(vals, keyColumns, c) + if err := runQuery(updateStr, vals); err != nil { + return err + } + } else { + // Overwrite those columns which are non-null in AddedFields, + // and remove those which are listed in RemovedFields. + var vals []interface{} + vals = append(vals, c.GetTTL()) + fieldAssignments := make([]string, 0, len(udtChange.AddedFields)+len(udtChange.RemovedFields)) + + // Overwrites + for fieldName, fieldValue := range udtChange.AddedFields { + if reflect.ValueOf(fieldValue).IsNil() { + continue + } + + // TODO: Properly create a bind marker, tuples nested in udts may cause problems + fieldAssignments = append(fieldAssignments, fmt.Sprintf("%s.%s = ?", colName, fieldName)) + vals = append(vals, fieldValue) + } + + // Clears + for _, fieldName := range udtChange.RemovedFields { + fieldAssignments = append(fieldAssignments, fmt.Sprintf("%s.%s = ?", colName, fieldName)) + vals = append(vals, nil) + } + + vals = appendKeyValuesToBind(vals, keyColumns, c) + + updateUDTStr := fmt.Sprintf( + "UPDATE %s USING TTL ? SET %s WHERE %s", + r.tableName, + strings.Join(fieldAssignments, ", "), + pkConditions, + ) + + if err := runQuery(updateUDTStr, vals); err != nil { + return err + } + } + } + } + + return nil +} + +func (r *DeltaReplicator) processRowDelete(ctx context.Context, timestamp int64, c *scyllacdc.ChangeRow) error { + vals := make([]interface{}, 0, len(r.pkColumns)+len(r.ckColumns)) + vals = appendKeyValuesToBind(vals, r.pkColumns, c) + vals = appendKeyValuesToBind(vals, r.ckColumns, c) + + if debugQueries { + fmt.Println(r.rowDeleteQueryStr) + fmt.Println(vals...) + } + + return tryWithExponentialBackoff(ctx, func() error { + return r.session. + Query(r.rowDeleteQueryStr, vals...). + Consistency(r.consistency). + Idempotent(true). + WithTimestamp(timestamp). + Exec() + }) +} + +func (r *DeltaReplicator) processPartitionDelete(ctx context.Context, timestamp int64, c *scyllacdc.ChangeRow) error { + vals := make([]interface{}, 0, len(r.pkColumns)) + vals = appendKeyValuesToBind(vals, r.pkColumns, c) + + if debugQueries { + fmt.Println(r.partitionDeleteQueryStr) + fmt.Println(vals...) + } + + return tryWithExponentialBackoff(ctx, func() error { + return r.session. + Query(r.partitionDeleteQueryStr, vals...). + Consistency(r.consistency). + Idempotent(true). + WithTimestamp(timestamp). + Exec() + }) +} + +func (r *DeltaReplicator) processRangeDelete(ctx context.Context, timestamp int64, start, end *scyllacdc.ChangeRow) error { + vals := make([]interface{}, 0) + vals = appendKeyValuesToBind(vals, r.pkColumns, start) + + conditions := r.makeBindMarkerAssignmentList(r.pkColumns) + + addConditions := func(c *scyllacdc.ChangeRow, cmpOp string) { + ckNames := make([]string, 0) + markers := make([]string, 0) + for _, ckCol := range r.ckColumns { + + // Clustering key values are always atomic + ckVal, _ := c.GetValue(ckCol) + if reflect.ValueOf(ckVal).IsNil() { + break + } + ckNames = append(ckNames, ckCol) + vals = append(vals, ckVal) + markers = append(markers, makeBindMarkerForType(r.columnTypes[ckCol])) + } + + if len(ckNames) > 0 { + conditions = append(conditions, fmt.Sprintf( + "(%s) %s (%s)", + strings.Join(ckNames, ", "), + cmpOp, + strings.Join(markers, ", "), + )) + } + } + + if start.GetOperation() == scyllacdc.RangeDeleteStartInclusive { + addConditions(start, ">=") + } else { + addConditions(start, ">") + } + + if end.GetOperation() == scyllacdc.RangeDeleteEndInclusive { + addConditions(end, "<=") + } else { + addConditions(end, "<") + } + + deleteStr := fmt.Sprintf( + "DELETE FROM %s WHERE %s", + r.tableName, + strings.Join(conditions, " AND "), + ) + + if debugQueries { + fmt.Println(deleteStr) + fmt.Println(vals...) + } + + return tryWithExponentialBackoff(ctx, func() error { + return r.session. + Query(deleteStr, vals...). + Consistency(r.consistency). + Idempotent(true). + WithTimestamp(timestamp). + Exec() + }) +} + +func (r *DeltaReplicator) makeBindMarkerAssignmentList(columnNames []string) []string { + assignments := make([]string, 0, len(columnNames)) + for _, name := range columnNames { + assignments = append(assignments, name+" = "+makeBindMarkerForType(r.columnTypes[name])) + } + return assignments +} + +func (r *DeltaReplicator) makeBindMarkerAssignments(columnNames []string, sep string) string { + assignments := r.makeBindMarkerAssignmentList(columnNames) + return strings.Join(assignments, sep) +} + +func makeBindMarkerForType(typ TypeInfo) string { + if typ.Type() != TypeTuple { + return "?" + } + tupleTyp := typ.Unfrozen().(*TupleType) + vals := make([]string, 0, len(tupleTyp.Elements)) + for _, el := range tupleTyp.Elements { + vals = append(vals, makeBindMarkerForType(el)) + } + return "(" + strings.Join(vals, ", ") + ")" +} + +func appendValueByType(vals []interface{}, v interface{}, typ TypeInfo) []interface{} { + if typ.Type() == TypeTuple { + tupTyp := typ.Unfrozen().(*TupleType) + + var vTup []interface{} + switch v := v.(type) { + case []interface{}: + if v != nil { + vTup = v + } else { + vTup = make([]interface{}, len(tupTyp.Elements)) + } + case *[]interface{}: + if v != nil { + vTup = *v + } else { + vTup = make([]interface{}, len(tupTyp.Elements)) + } + case nil: + vTup = make([]interface{}, len(tupTyp.Elements)) + default: + panic(fmt.Sprintf("unhandled tuple type: %t", v)) + } + + for i, vEl := range vTup { + elTyp := tupTyp.Elements[i] + vals = appendValueByType(vals, vEl, elTyp) + } + } else { + vals = append(vals, v) + } + return vals +} + +func appendKeyValuesToBind( + vals []interface{}, + names []string, + c *scyllacdc.ChangeRow, +) []interface{} { + // No need to handle non-frozen lists here, because they can't appear + // in either partition or clustering key + // TODO: Support tuples here, too + for _, name := range names { + v, _ := c.GetValue(name) + if reflect.ValueOf(v).IsNil() { + v = gocql.UnsetValue + } + vals = append(vals, v) + } + return vals +} + +func tryWithExponentialBackoff(ctx context.Context, f func() error) error { + dur := 50 * time.Millisecond + var err error + // TODO: Make it stop when the replicator is shut down + i := 0 + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + err = f() + if err == nil { + return nil + } + + log.Printf("ERROR (try #%d): %s", i+1, err) + i++ + + <-time.After(dur) + + // Increase backoff duration randomly - between 2 to 3 times + factor := 2.0 + rand.Float64()*3.0 + dur = time.Duration(float64(dur) * factor) + if dur > maxWaitBetweenRetries { + dur = maxWaitBetweenRetries + } + } +} diff --git a/examples/replicator/replicator_test.go b/examples/replicator/replicator_test.go new file mode 100644 index 0000000..777581b --- /dev/null +++ b/examples/replicator/replicator_test.go @@ -0,0 +1,527 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "reflect" + "regexp" + "testing" + "time" + + "github.com/gocql/gocql" + scyllacdc "github.com/scylladb/scylla-cdc-go" + "github.com/scylladb/scylla-cdc-go/internal/testutils" +) + +type schema struct { + tableName string + createQuery string +} + +var udts = []string{ + "CREATE TYPE udt_simple (a int, b int, c text)", +} + +var ( + schemaSimple = schema{ + "tbl_simple", + "CREATE TABLE tbl_simple (pk text, ck int, v1 int, v2 text, PRIMARY KEY (pk, ck))", + } + schemaMultipleClusteringKeys = schema{ + "tbl_multiple_clustering_keys", + "CREATE TABLE tbl_multiple_clustering_keys (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))", + } + schemaBlobs = schema{ + "tbl_blobs", + "CREATE TABLE tbl_blobs (pk text, ck int, v blob, PRIMARY KEY (pk, ck))", + } + schemaLists = schema{ + "tbl_lists", + "CREATE TABLE tbl_lists (pk text, ck int, v list, PRIMARY KEY(pk, ck))", + } + schemaSets = schema{ + "tbl_sets", + "CREATE TABLE tbl_sets (pk text, ck int, v set, PRIMARY KEY (pk, ck))", + } + schemaMaps = schema{ + "tbl_maps", + "CREATE TABLE tbl_maps (pk text, ck int, v map, PRIMARY KEY (pk, ck))", + } + schemaTuples = schema{ + "tbl_tuples", + "CREATE TABLE tbl_tuples (pk text, ck int, v tuple, PRIMARY KEY (pk, ck))", + } + schemaTuplesInTuples = schema{ + "tbl_tuples_in_tuples", + "CREATE TABLE tbl_tuples_in_tuples (pk text, ck int, v tuple, int>, PRIMARY KEY (pk, ck))", + } + schemaTuplesInTuplesInTuples = schema{ + "tbl_tuples_in_tuples_in_tuples", + "CREATE TABLE tbl_tuples_in_tuples_in_tuples (pk text, ck int, v tuple, text>, int>, PRIMARY KEY (pk, ck))", + } + schemaUDTs = schema{ + "tbl_udts", + "CREATE TABLE tbl_udts (pk text, ck int, v udt_simple, PRIMARY KEY (pk, ck))", + } +) + +var testCases = []struct { + schema schema + pk string + queries []string +}{ + // Operations test cases + { + schemaSimple, + "simpleInserts", + []string{ + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('simpleInserts', 1, 2, 'abc')", + "INSERT INTO %s (pk, ck, v1) VALUES ('simpleInserts', 2, 3)", + "INSERT INTO %s (pk, ck, v2) VALUES ('simpleInserts', 2, 'def')", + }, + }, + { + schemaSimple, + "simpleUpdates", + []string{ + "UPDATE %s SET v1 = 1 WHERE pk = 'simpleUpdates' AND ck = 1", + "UPDATE %s SET v2 = 'abc' WHERE pk = 'simpleUpdates' AND ck = 2", + "UPDATE %s SET v1 = 5, v2 = 'def' WHERE pk = 'simpleUpdates' AND ck = 3", + }, + }, + { + schemaSimple, + "rowDeletes", + []string{ + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('rowDeletes', 1, 2, 'abc')", + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('rowDeletes', 2, 3, 'def')", + "DELETE FROM %s WHERE pk = 'rowDeletes' AND ck = 1", + }, + }, + { + schemaSimple, + "partitionDeletes", + []string{ + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('partitionDeletes', 1, 2, 'abc')", + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('partitionDeletes', 2, 3, 'def')", + "DELETE FROM %s WHERE pk = 'partitionDeletes'", + // Insert one more row, just to check if replication works at all + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('partitionDeletes', 4, 5, 'def')", + }, + }, + { + schemaMultipleClusteringKeys, + "rangeDeletes", + []string{ + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 1, 1, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 1, 2, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 1, 3, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 1, 4, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 2, 1, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 2, 2, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 2, 3, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 2, 4, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 3, 1, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 3, 2, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 3, 3, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 3, 4, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 4, 1, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 4, 2, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 4, 3, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 4, 4, 0)", + "DELETE FROM %s WHERE pk = 'rangeDeletes' AND ck1 > 3", + "DELETE FROM %s WHERE pk = 'rangeDeletes' AND ck1 <= 1", + "DELETE FROM %s WHERE pk = 'rangeDeletes' AND ck1 = 2 AND ck2 > 1 AND ck2 < 4", + }, + }, + + // Blob test cases + { + schemaBlobs, + "blobs", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('blobs', 1, 0x1234)", + "INSERT INTO %s (pk, ck, v) VALUES ('blobs', 2, 0x)", + "INSERT INTO %s (pk, ck, v) VALUES ('blobs', 3, null)", + "INSERT INTO %s (pk, ck, v) VALUES ('blobs', 4, 0x4321)", + "INSERT INTO %s (pk, ck, v) VALUES ('blobs', 5, 0x00)", + "UPDATE %s SET v = null WHERE pk = 'blobs' AND ck = 4", + "UPDATE %s SET v = 0x WHERE pk = 'blobs' AND ck = 5", + }, + }, + + // Lists test cases + { + schemaLists, + "listOverwrites", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('listOverwrites', 1, [1, 2, 3])", + "INSERT INTO %s (pk, ck, v) VALUES ('listOverwrites', 1, [4, 5, 6, 7])", + "INSERT INTO %s (pk, ck, v) VALUES ('listOverwrites', 2, [6, 5, 4, 3, 2, 1])", + "INSERT INTO %s (pk, ck, v) VALUES ('listOverwrites', 2, null)", + "INSERT INTO %s (pk, ck, v) VALUES ('listOverwrites', 3, [1, 11, 111])", + "UPDATE %s SET v = [2, 22, 222] WHERE pk = 'listOverwrites' AND ck = 3", + }, + }, + { + schemaLists, + "listAppends", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('listAppends', 1, [1, 2, 3])", + "UPDATE %s SET v = v + [4, 5, 6] WHERE pk = 'listAppends' AND ck = 1", + "UPDATE %s SET v = [-2, -1, 0] + v WHERE pk = 'listAppends' AND ck = 1", + }, + }, + { + schemaLists, + "listRemoves", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('listRemoves', 1, [1, 2, 3])", + "UPDATE %s SET v = v + [4, 5, 6] WHERE pk = 'listRemoves' AND ck = 1", + "UPDATE %s SET v = v - [1, 2, 3] WHERE pk = 'listRemoves' AND ck = 1", + }, + }, + + // Set test cases + { + schemaSets, + "setOverwrites", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('setOverwrites', 1, {1, 2, 3, 4})", + "INSERT INTO %s (pk, ck, v) VALUES ('setOverwrites', 1, {4, 5, 6, 7})", + "INSERT INTO %s (pk, ck, v) VALUES ('setOverwrites', 2, {8, 9, 10, 11})", + "INSERT INTO %s (pk, ck, v) VALUES ('setOverwrites', 2, null)", + "INSERT INTO %s (pk, ck, v) VALUES ('setOverwrites', 3, {12, 13, 14, 15})", + "UPDATE %s SET v = null WHERE pk = 'setOverwrites' AND ck = 3", + }, + }, + { + schemaSets, + "setAppends", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('setAppends', 1, {1, 2, 3, 4})", + "UPDATE %s SET v = v + {5, 6} WHERE pk = 'setAppends' AND ck = 1", + "UPDATE %s SET v = v + {5, 6} WHERE pk = 'setAppends' AND ck = 2", + }, + }, + { + schemaSets, + "setRemovals", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('setRemovals', 1, {1, 2, 3, 4})", + "UPDATE %s SET v = v - {1, 3} WHERE pk = 'setRemovals' AND ck = 1", + "UPDATE %s SET v = v - {1138} WHERE pk = 'setRemovals' AND ck = 2", + }, + }, + + // Map test cases + { + schemaMaps, + "mapOverwrites", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('mapOverwrites', 1, {1: 2, 3: 4})", + "INSERT INTO %s (pk, ck, v) VALUES ('mapOverwrites', 1, {5: 6, 7: 8})", + "INSERT INTO %s (pk, ck, v) VALUES ('mapOverwrites', 2, {9: 10, 11: 12})", + "INSERT INTO %s (pk, ck, v) VALUES ('mapOverwrites', 2, null)", + "INSERT INTO %s (pk, ck, v) VALUES ('mapOverwrites', 3, {13: 14, 15: 16})", + "UPDATE %s SET v = null WHERE pk = 'mapOverwrites' AND ck = 3", + }, + }, + { + schemaMaps, + "mapSets", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('mapSets', 1, {1: 2, 3: 4, 5: 6})", + "UPDATE %s SET v[1] = 42 WHERE pk = 'mapSets' AND ck = 1", + "UPDATE %s SET v[3] = null WHERE pk = 'mapSets' AND ck = 1", + "UPDATE %s SET v[3] = 123 WHERE pk = 'mapSets' AND ck = 1", + "UPDATE %s SET v[5] = 321 WHERE pk = 'mapSets' AND ck = 2", + }, + }, + { + schemaMaps, + "mapAppends", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('mapAppends', 1, {1: 2, 3: 4})", + "UPDATE %s SET v = v + {5: 6} WHERE pk = 'mapAppends' AND ck = 1", + "UPDATE %s SET v = v + {5: 6} WHERE pk = 'mapAppends' AND ck = 2", + }, + }, + { + schemaMaps, + "mapRemovals", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('mapRemovals', 1, {1: 2, 3: 4})", + "UPDATE %s SET v = v - {1} WHERE pk = 'mapRemovals' AND ck = 1", + "UPDATE %s SET v = v - {1138} WHERE pk = 'mapRemovals' AND ck = 2", + }, + }, + + // Tuple test cases + { + schemaTuples, + "tupleInserts", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('tupleInserts', 1, (7, 'abc'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleInserts', 2, (9, 'def'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleInserts', 2, null)", + }, + }, + { + schemaTuples, + "tupleUpdates", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 1, (7, 'abc'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 2, (9, 'def'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 3, (11, 'ghi'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 4, (13, 'jkl'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 5, (15, 'mno'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 6, (17, 'pqr'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 7, (19, 'stu'))", + "UPDATE %s SET v = (111, 'zyx') WHERE pk = 'tupleUpdates' AND ck = 1", + "UPDATE %s SET v = null WHERE pk = 'tupleUpdates' AND ck = 2", + "INSERT INTO %s (pk, ck) VALUES ('tupleUpdates', 3)", + "UPDATE %s SET v = (null, null) WHERE pk = 'tupleUpdates' AND ck = 4", + "UPDATE %s SET v = (null, 'asdf') WHERE pk = 'tupleUpdates' AND ck = 5", + "UPDATE %s SET v = (123, null) WHERE pk = 'tupleUpdates' AND ck = 6", + "UPDATE %s SET v = (null, '') WHERE pk = 'tupleUpdates' AND ck = 7", + }, + }, + { + schemaTuplesInTuples, + "tuplesInTuples", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuples', 1, ((1, 'abc'), 7))", + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuples', 2, ((3, 'def'), 9))", + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuples', 3, ((3, 'ghi'), 9))", + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuples', 4, ((3, 'jkl'), 9))", + "UPDATE %s SET v = ((100, 'zyx'), 111) WHERE pk = 'tuplesInTuples' AND ck = 1", + "UPDATE %s SET v = null WHERE pk = 'tuplesInTuples' AND ck = 2", + "UPDATE %s SET v = ((200, null), 999) WHERE pk = 'tuplesInTuples' AND ck = 3", + "UPDATE %s SET v = ((300, ''), 333) WHERE pk = 'tuplesInTuples' AND ck = 4", + }, + }, + { + schemaTuplesInTuplesInTuples, + "tuplesInTuplesInTuples", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuplesInTuples', 1, (((1, 9), 'abc'), 7))", + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuplesInTuples', 2, (((3, 8), 'def'), 9))", + "UPDATE %s SET v = (((100, 200), 'zyx'), 111) WHERE pk = 'tuplesInTuplesInTuples' AND ck = 1", + "UPDATE %s SET v = null WHERE pk = 'tuplesInTuplesInTuples' AND ck = 2", + "UPDATE %s SET v = (null, 123) WHERE pk = 'tuplesInTuplesInTuples' AND ck = 3", + "UPDATE %s SET v = ((null, 'xyz'), 321) WHERE pk = 'tuplesInTuplesInTuples' AND ck = 4", + }, + }, + + // UDT test cases + { + schemaUDTs, + "udt", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 1, (2, 3, 'abc'))", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 2, {a: 6, c: 'zxcv'})", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 3, (9, 4, 'def'))", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 4, (123, 321, 'ghi'))", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 5, (333, 222, 'jkl'))", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 6, (432, 678, 'mno'))", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 7, (765, 345, 'pqr'))", + "UPDATE %s SET v.b = 41414 WHERE pk = 'udt' AND ck = 2", + "UPDATE %s SET v = null WHERE pk = 'udt' AND ck = 3", + "UPDATE %s SET v = {b: 123456, c: 'tyu'} WHERE pk = 'udt' AND ck = 4", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 5, (999, 888, 'zxc'))", + "UPDATE %s SET v.c = null WHERE pk = 'udt' AND ck = 6", + "UPDATE %s SET v = {a: 923, b: 123456, c: ''} WHERE pk = 'udt' AND ck = 7", + }, + }, +} + +func TestReplicator(t *testing.T) { + filter := os.Getenv("REPLICATOR_TEST_FILTER") + if filter == "" { + filter = ".*" + } + re := regexp.MustCompile(filter) + + // Collect all schemas + schemas := make(map[string]string) + for _, tc := range testCases { + schemas[tc.schema.tableName] = tc.schema.createQuery + } + + sourceAddress := testutils.GetSourceClusterContactPoint() + destinationAddress := testutils.GetDestinationClusterContactPoint() + keyspaceName := testutils.GetUniqueName("test_keyspace") + + sourceSession := createSessionAndSetupSchema(t, sourceAddress, keyspaceName, true, schemas) + defer sourceSession.Close() + + destinationSession := createSessionAndSetupSchema(t, destinationAddress, keyspaceName, false, schemas) + defer destinationSession.Close() + + // Execute all of the queries + for _, tc := range testCases { + if !re.MatchString(tc.pk) { + continue + } + for _, qStr := range tc.queries { + execQuery(t, sourceSession, fmt.Sprintf(qStr, tc.schema.tableName)) + } + } + + t.Log("running replicators") + + adv := scyllacdc.AdvancedReaderConfig{ + ChangeAgeLimit: time.Minute, + PostNonEmptyQueryDelay: 3 * time.Second, + PostEmptyQueryDelay: 3 * time.Second, + PostFailedQueryDelay: 3 * time.Second, + QueryTimeWindowSize: 5 * time.Minute, + ConfidenceWindowSize: time.Millisecond, + } + + schemaNames := make([]string, 0) + for tbl := range schemas { + schemaNames = append(schemaNames, fmt.Sprintf("%s.%s", keyspaceName, tbl)) + } + + logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile) + replicator, err := newReplicator( + context.Background(), + sourceAddress, + destinationAddress, + schemaNames, + &adv, gocql.Quorum, + gocql.Quorum, + "", + logger, + ) + + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + + errC := make(chan error) + go func() { errC <- replicator.Run(ctx) }() + + time.Sleep(time.Second) + + replicator.StopAt(time.Now().Add(time.Second)) + if err := <-errC; err != nil { + t.Fatal(err) + } + + t.Log("validating results") + + // Compare + sourceSet := fetchFullSet(t, sourceSession, schemas) + destinationSet := fetchFullSet(t, destinationSession, schemas) + + failedCount := 0 + + for _, tc := range testCases { + sourceData := sourceSet[tc.pk] + destinationData := destinationSet[tc.pk] + + if len(sourceData) != len(destinationData) { + t.Logf( + "%s: source len %d, destination len %d\n", + tc.pk, + len(sourceData), + len(destinationData), + ) + t.Log(" source:") + for _, row := range sourceData { + t.Logf(" %v", row) + } + t.Log(" dest:") + for _, row := range destinationData { + t.Logf(" %v", row) + } + t.Fail() + failedCount++ + continue + } + + failed := false + for i := 0; i < len(sourceData); i++ { + if !reflect.DeepEqual(sourceData[i], destinationData[i]) { + t.Logf("%s: mismatch", tc.pk) + t.Logf(" source: %v", sourceData[i]) + t.Logf(" dest: %v", destinationData[i]) + failed = true + } + } + + if failed { + t.Fail() + failedCount++ + } else { + t.Logf("%s: OK", tc.pk) + } + } + + if failedCount > 0 { + t.Logf("failed %d/%d test cases", failedCount, len(testCases)) + } +} + +func createSessionAndSetupSchema(t *testing.T, addr string, keyspaceName string, withCdc bool, schemas map[string]string) *gocql.Session { + testutils.CreateKeyspace(t, addr, keyspaceName) + + cfg := gocql.NewCluster(addr) + cfg.Keyspace = keyspaceName + session, err := cfg.CreateSession() + if err != nil { + t.Fatal(err) + } + + for _, udt := range udts { + execQuery(t, session, udt) + } + + for _, tbl := range schemas { + tblQuery := tbl + if withCdc { + tblQuery += " WITH cdc = {'enabled': true, 'preimage': true, 'postimage': true}" + } + execQuery(t, session, tblQuery) + } + + err = session.AwaitSchemaAgreement(context.Background()) + if err != nil { + t.Fatal(err) + } + + return session +} + +func execQuery(t *testing.T, session *gocql.Session, query string) { + t.Logf("executing query %s", query) + err := session.Query(query).Exec() + if err != nil { + t.Fatal(err) + } +} + +func fetchFullSet(t *testing.T, session *gocql.Session, schemas map[string]string) map[string][]map[string]interface{} { + groups := make(map[string][]map[string]interface{}) + + for tbl := range schemas { + data, err := session.Query("SELECT * FROM " + tbl).Iter().SliceMap() + if err != nil { + t.Fatal(err) + } + + for _, row := range data { + pk := row["pk"].(string) + groups[pk] = append(groups[pk], row) + } + } + + return groups +} diff --git a/examples/replicator/utils.go b/examples/replicator/utils.go new file mode 100644 index 0000000..dcf5ec0 --- /dev/null +++ b/examples/replicator/utils.go @@ -0,0 +1,273 @@ +package main + +import "strings" + +// Re-implementation of the type parsing logic from the driver. +// Unlike the driver, this implementation differentiates frozen types +// from non-frozen ones. + +type Type int + +const ( + TypeCustom Type = 0x0000 + TypeAscii Type = 0x0001 + TypeBigInt Type = 0x0002 + TypeBlob Type = 0x0003 + TypeBoolean Type = 0x0004 + TypeCounter Type = 0x0005 + TypeDecimal Type = 0x0006 + TypeDouble Type = 0x0007 + TypeFloat Type = 0x0008 + TypeInt Type = 0x0009 + TypeText Type = 0x000A + TypeTimestamp Type = 0x000B + TypeUUID Type = 0x000C + TypeVarchar Type = 0x000D + TypeVarint Type = 0x000E + TypeTimeUUID Type = 0x000F + TypeInet Type = 0x0010 + TypeDate Type = 0x0011 + TypeTime Type = 0x0012 + TypeSmallInt Type = 0x0013 + TypeTinyInt Type = 0x0014 + TypeDuration Type = 0x0015 + TypeList Type = 0x0020 + TypeMap Type = 0x0021 + TypeSet Type = 0x0022 + TypeUDT Type = 0x0030 + TypeTuple Type = 0x0031 +) + +func (t Type) IsCollection() bool { + switch t { + case TypeList, TypeMap, TypeSet, TypeUDT: + return true + default: + return false + } +} + +type TypeInfo interface { + Type() Type + IsFrozen() bool + Unfrozen() TypeInfo +} + +type FrozenType struct { + Inner TypeInfo +} + +func (ft *FrozenType) Type() Type { + return ft.Inner.Type() +} + +func (ft *FrozenType) IsFrozen() bool { + return true +} + +func (ft *FrozenType) Unfrozen() TypeInfo { + return ft.Inner +} + +type MapType struct { + Key TypeInfo + Value TypeInfo +} + +func (mt *MapType) Type() Type { + return TypeMap +} + +func (mt *MapType) IsFrozen() bool { + return false +} + +func (mt *MapType) Unfrozen() TypeInfo { + return mt +} + +type ListType struct { + Element TypeInfo +} + +func (lt *ListType) Type() Type { + return TypeList +} + +func (lt *ListType) IsFrozen() bool { + return false +} + +func (lt *ListType) Unfrozen() TypeInfo { + return lt +} + +type SetType struct { + Element TypeInfo +} + +func (st *SetType) Type() Type { + return TypeSet +} + +func (st *SetType) IsFrozen() bool { + return false +} + +func (st *SetType) Unfrozen() TypeInfo { + return st +} + +type TupleType struct { + Elements []TypeInfo +} + +func (tt *TupleType) Type() Type { + return TypeTuple +} + +func (tt *TupleType) IsFrozen() bool { + return false +} + +func (tt *TupleType) Unfrozen() TypeInfo { + return tt +} + +type NativeType struct { + RealType Type +} + +func (nt *NativeType) Type() Type { + return nt.RealType +} + +func (nt *NativeType) IsFrozen() bool { + return false +} + +func (nt *NativeType) Unfrozen() TypeInfo { + return nt +} + +type UDTType struct { + Name string +} + +func (ut *UDTType) Type() Type { + return TypeUDT +} + +func (ut *UDTType) IsFrozen() bool { + return false +} + +func (ut *UDTType) Unfrozen() TypeInfo { + return ut +} + +func parseType(str string) TypeInfo { + if strings.HasPrefix(str, "frozen<") { + innerStr := strings.TrimSuffix(strings.TrimPrefix(str, "frozen<"), ">") + return &FrozenType{parseType(innerStr)} + } + if strings.HasPrefix(str, "list<") { + innerStr := strings.TrimSuffix(strings.TrimPrefix(str, "list<"), ">") + return &ListType{parseType(innerStr)} + } + if strings.HasPrefix(str, "set<") { + innerStr := strings.TrimSuffix(strings.TrimPrefix(str, "set<"), ">") + return &SetType{parseType(innerStr)} + } + if strings.HasPrefix(str, "map<") { + innerStr := strings.TrimSuffix(strings.TrimPrefix(str, "map<"), ">") + list := parseTypeList(innerStr) + return &MapType{Key: list[0], Value: list[1]} + } + if strings.HasPrefix(str, "tuple<") { + innerStr := strings.TrimSuffix(strings.TrimPrefix(str, "tuple<"), ">") + list := parseTypeList(innerStr) + return &TupleType{Elements: list} + } + typ := parseNativeType(str) + if typ == TypeUDT { + return &UDTType{Name: str} + } + return &NativeType{RealType: typ} +} + +func parseTypeList(str string) []TypeInfo { + var ret []TypeInfo + var level int + var builder strings.Builder + for _, r := range str { + if r == ',' && level == 0 { + s := strings.TrimSpace(builder.String()) + ret = append(ret, parseType(s)) + builder.Reset() + continue + } + + if r == '<' { + level++ + } else if r == '>' { + level-- + } + builder.WriteRune(r) + } + if builder.Len() != 0 { + s := strings.TrimSpace(builder.String()) + ret = append(ret, parseType(s)) + } + return ret +} + +func parseNativeType(str string) Type { + switch str { + case "ascii": + return TypeAscii + case "bigint": + return TypeBigInt + case "blob": + return TypeBlob + case "boolean": + return TypeBoolean + case "counter": + return TypeCounter + case "date": + return TypeDate + case "decimal": + return TypeDecimal + case "double": + return TypeDouble + case "duration": + return TypeDuration + case "float": + return TypeFloat + case "int": + return TypeInt + case "smallint": + return TypeSmallInt + case "tinyint": + return TypeTinyInt + case "time": + return TypeTime + case "timestamp": + return TypeTimestamp + case "uuid": + return TypeUUID + case "varchar": + return TypeVarchar + case "text": + return TypeText + case "varint": + return TypeVarint + case "timeuuid": + return TypeTimeUUID + case "inet": + return TypeInet + default: + // Assume it's a UDT + return TypeUDT + } +} diff --git a/examples/simple-printer/main.go b/examples/simple-printer/main.go new file mode 100644 index 0000000..8ee1230 --- /dev/null +++ b/examples/simple-printer/main.go @@ -0,0 +1,76 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + + "github.com/gocql/gocql" + scyllacdc "github.com/scylladb/scylla-cdc-go" +) + +// Make sure you create the following table before you run this example: +// CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH cdc = {'enabled': 'true'}; + +func main() { + cluster := gocql.NewCluster("127.0.0.1") + cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy("local-dc")) + session, err := cluster.CreateSession() + if err != nil { + log.Fatal(err) + } + defer session.Close() + + cfg := &scyllacdc.ReaderConfig{ + Session: session, + TableNames: []string{"ks.tbl"}, + ChangeConsumerFactory: changeConsumerFactory, + Logger: log.New(os.Stderr, "", log.Ldate|log.Lshortfile), + } + + reader, err := scyllacdc.NewReader(context.Background(), cfg) + if err != nil { + log.Fatal(err) + } + + if err := reader.Run(context.Background()); err != nil { + log.Fatal(err) + } +} + +func consumeChange(ctx context.Context, tableName string, c scyllacdc.Change) error { + for _, changeRow := range c.Delta { + pkRaw, _ := changeRow.GetValue("pk") + ckRaw, _ := changeRow.GetValue("ck") + v := changeRow.GetAtomicChange("v") + + pk := pkRaw.(*int) + ck := ckRaw.(*int) + + fmt.Printf("Operation: %s, pk: %s, ck: %s\n", changeRow.GetOperation(), + nullableIntToStr(pk), nullableIntToStr(ck)) + + if v.IsDeleted { + fmt.Printf(" Column v was set to null/deleted\n") + } else { + vInt := v.Value.(*int) + if vInt != nil { + fmt.Printf(" Column v was set to %d\n", *vInt) + } else { + fmt.Print(" Column v was not changed\n") + } + } + } + + return nil +} + +func nullableIntToStr(i *int) string { + if i == nil { + return "null" + } + return fmt.Sprintf("%d", *i) +} + +var changeConsumerFactory = scyllacdc.MakeChangeConsumerFactoryFromFunc(consumeChange)