diff --git a/examples/replicator-aws-sns/go.mod b/examples/replicator-aws-sns/go.mod index 8b5b525..26b3549 100644 --- a/examples/replicator-aws-sns/go.mod +++ b/examples/replicator-aws-sns/go.mod @@ -4,15 +4,23 @@ go 1.22 require ( github.com/aws/aws-sdk-go-v2 v1.32.6 + github.com/aws/aws-sdk-go-v2/config v1.28.6 github.com/aws/aws-sdk-go-v2/service/sns v1.33.7 - github.com/aws/aws-sdk-go-v2/service/sqs v1.37.2 github.com/gocql/gocql v0.0.0-20201215165327-e49edf966d90 github.com/scylladb/scylla-cdc-go v0.0.0-20201215165327-e49edf966d90 ) require ( + github.com/aws/aws-sdk-go-v2/credentials v1.17.47 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 // indirect github.com/aws/smithy-go v1.22.1 // indirect github.com/golang/snappy v0.0.3 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect diff --git a/examples/replicator-aws-sns/go.sum b/examples/replicator-aws-sns/go.sum index 8240f44..3c44a96 100644 --- a/examples/replicator-aws-sns/go.sum +++ b/examples/replicator-aws-sns/go.sum @@ -1,13 +1,29 @@ github.com/aws/aws-sdk-go-v2 v1.32.6 h1:7BokKRgRPuGmKkFMhEg/jSul+tB9VvXhcViILtfG8b4= github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= +github.com/aws/aws-sdk-go-v2/config v1.28.6 h1:D89IKtGrs/I3QXOLNTH93NJYtDhm8SYa9Q5CsPShmyo= +github.com/aws/aws-sdk-go-v2/config v1.28.6/go.mod h1:GDzxJ5wyyFSCoLkS+UhGB0dArhb9mI+Co4dHtoTxbko= +github.com/aws/aws-sdk-go-v2/credentials v1.17.47 h1:48bA+3/fCdi2yAwVt+3COvmatZ6jUDNkDTIsqDiMUdw= +github.com/aws/aws-sdk-go-v2/credentials v1.17.47/go.mod h1:+KdckOejLW3Ks3b0E3b5rHsr2f9yuORBum0WPnE5o5w= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 h1:AmoU1pziydclFT/xRV+xXE/Vb8fttJCLRPv8oAkprc0= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21/go.mod h1:AjUdLYe4Tgs6kpH4Bv7uMZo7pottoyHMn4eTcIcneaY= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 h1:s/fF4+yDQDoElYhfIVvSNyeCydfbuTKzhxSXDXCPasU= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25/go.mod h1:IgPfDv5jqFIzQSNbUEMoitNooSMXjRSDkhXv8jiROvU= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 h1:ZntTCl5EsYnhN/IygQEUugpdwbhdkom9uHcbCftiGgA= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25/go.mod h1:DBdPrgeocww+CSl1C8cEV8PN1mHMBhuCDLpXezyvWkE= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 h1:50+XsN70RS7dwJ2CkVNXzj7U2L1HKP8nqTd3XWEXBN4= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6/go.mod h1:WqgLmwY7so32kG01zD8CPTJWVWM+TzJoOVHwTg4aPug= github.com/aws/aws-sdk-go-v2/service/sns v1.33.7 h1:N3o8mXK6/MP24BtD9sb51omEO9J9cgPM3Ughc293dZc= github.com/aws/aws-sdk-go-v2/service/sns v1.33.7/go.mod h1:AAHZydTB8/V2zn3WNwjLXBK1RAcSEpDNmFfrmjvrJQg= -github.com/aws/aws-sdk-go-v2/service/sqs v1.37.2 h1:mFLfxLZB/TVQwNJAYox4WaxpIu+dFVIcExrmRmRCOhw= -github.com/aws/aws-sdk-go-v2/service/sqs v1.37.2/go.mod h1:GnvfTdlvcpD+or3oslHPOn4Mu6KaCwlCp+0p0oqWnrM= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 h1:rLnYAfXQ3YAccocshIH5mzNNwZBkBo+bP6EhIxak6Hw= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.7/go.mod h1:ZHtuQJ6t9A/+YDuxOLnbryAmITtr8UysSny3qcyvJTc= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 h1:JnhTZR3PiYDNKlXy50/pNeix9aGMo6lLpXwJ1mw8MD4= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6/go.mod h1:URronUEGfXZN1VpdktPSD1EkAL9mfrV+2F4sjH38qOY= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 h1:s4074ZO1Hk8qv65GqNXqDjmkf4HSQqJukaLuuW0TpDA= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.2/go.mod h1:mVggCnIWoM09jP71Wh+ea7+5gAp53q+49wDFs1SW5z8= github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= diff --git a/examples/replicator-aws-sns/main.go b/examples/replicator-aws-sns/main.go index 58d5958..c7930ed 100644 --- a/examples/replicator-aws-sns/main.go +++ b/examples/replicator-aws-sns/main.go @@ -16,6 +16,7 @@ import ( scyllacdc "github.com/scylladb/scylla-cdc-go" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sns" ) @@ -33,6 +34,10 @@ func main() { readConsistency string writeConsistency string + snsTopic string + snsRegion string + snsSubject string + progressTable string ) @@ -43,6 +48,11 @@ func main() { flag.StringVar(&readConsistency, "read-consistency", "", "consistency level used to read from cdc log (one, quorum, all)") flag.StringVar(&writeConsistency, "write-consistency", "", "consistency level used to write to the destination cluster (one, quorum, all)") flag.StringVar(&progressTable, "progress-table", "", "fully-qualified name of the table in the destination cluster to use for saving progress; if omitted, the progress won't be saved") + + flag.StringVar(&snsTopic, "sns-topic", "", "SNS Topic ARN") + flag.StringVar(&snsSubject, "sns-subject", "", "SNS Subject") + flag.StringVar(&snsRegion, "sns-region", "", "AWS region where SNS topic is deployed") + flag.String("mode", "", "mode (ignored)") flag.Parse() @@ -85,6 +95,7 @@ func main() { context.Background(), source, progressNode, fullyQualifiedTables, + snsTopic, snsSubject, snsRegion, &adv, clRead, clWrite, @@ -158,6 +169,7 @@ func newReplicator( ctx context.Context, source, destination string, tableNames []string, + topic, subject, region string, advancedParams *scyllacdc.AdvancedReaderConfig, readConsistency gocql.Consistency, progressConsistency gocql.Consistency, @@ -186,6 +198,9 @@ func newReplicator( factory := &replicatorFactory{ rowsRead: rowsRead, + topic: topic, + subject: subject, + region: region, logger: logger, } @@ -251,6 +266,9 @@ func (repl *replicator) GetReadRowsCount() int64 { type replicatorFactory struct { rowsRead *int64 + topic string + subject string + region string logger scyllacdc.Logger } @@ -262,7 +280,7 @@ func (rf *replicatorFactory) CreateChangeConsumer( if len(splitTableName) < 2 { return nil, fmt.Errorf("table name is not fully qualified: %s", input.TableName) } - return NewSNSReplicator(ctx, rf.rowsRead, input.StreamID, input.ProgressReporter, rf.logger) + return NewSNSReplicator(ctx, rf.topic, rf.subject, rf.region, rf.rowsRead, input.StreamID, input.ProgressReporter, rf.logger) } type SNSReplicator struct { @@ -290,12 +308,26 @@ type SNSReplicator struct { func NewSNSReplicator( ctx context.Context, + topic, subject, region string, count *int64, streamID scyllacdc.StreamID, reporter *scyllacdc.ProgressReporter, logger scyllacdc.Logger, ) (*SNSReplicator, error) { + var opts [](func(*config.LoadOptions) error) + if region != "" { + opts = append(opts, config.WithRegion(region)) + } + + awsCfg, err := config.LoadDefaultConfig(ctx, opts...) + if err != nil { + return nil, err + } + dr := &SNSReplicator{ + snsClient: sns.NewFromConfig(awsCfg), + snsTopic: topic, + snsSubject: subject, totalCount: count, streamID: streamID, reporter: scyllacdc.NewPeriodicProgressReporter(logger, reportPeriod, reporter), @@ -307,36 +339,25 @@ func NewSNSReplicator( func (r *SNSReplicator) Consume(ctx context.Context, c scyllacdc.Change) error { timestamp := c.GetCassandraTimestamp() - pos := 0 - if showTimestamps { log.Printf("[%s] Processing timestamp: %s (%s)\n", c.StreamID, c.Time, c.Time.Time()) } - for pos < len(c.Delta) { - change := c.Delta[pos] - change.GetRawData() - change.GetOperation() - - msg, err := json.Marshal(map[string]interface{}{ - "timestamp": timestamp, - "ttl": change.GetTTL(), - "seq_no": change.GetSeqNo(), - "operation": change.GetOperation(), - "end_of_batch": change.GetEndOfBatch(), - "data": change.GetRawData(), - }) - if err != nil { - return fmt.Errorf("failed to serialize message: %w", err) + for _, change := range c.Delta { + if err := r.sendChangeToSNS(ctx, change, timestamp, "Delta"); err != nil { + return err } + } - _, err = r.snsClient.Publish(ctx, &sns.PublishInput{ - TopicArn: aws.String(r.snsTopic), - Message: aws.String(string(msg)), - Subject: aws.String(r.snsSubject), - }) - if err != nil { - return fmt.Errorf("failed to publish message to topic %q: %w", r.snsTopic, err) + for _, change := range c.PreImage { + if err := r.sendChangeToSNS(ctx, change, timestamp, "PreImage"); err != nil { + return err + } + } + + for _, change := range c.PostImage { + if err := r.sendChangeToSNS(ctx, change, timestamp, "PostImage"); err != nil { + return err } } @@ -346,6 +367,35 @@ func (r *SNSReplicator) Consume(ctx context.Context, c scyllacdc.Change) error { return nil } +func (r *SNSReplicator) sendChangeToSNS(ctx context.Context, change *scyllacdc.ChangeRow, timestamp int64, recType string) error { + change.GetRawData() + change.GetOperation() + + msg, err := json.Marshal(map[string]interface{}{ + "type": recType, + "operation": change.GetOperation(), + "timestamp": timestamp, + "ttl": change.GetTTL(), + "seq_no": change.GetSeqNo(), + "end_of_batch": change.GetEndOfBatch(), + "data": change.GetRawData(), + }) + if err != nil { + return fmt.Errorf("failed to serialize message: %w", err) + } + + _, err = r.snsClient.Publish(ctx, &sns.PublishInput{ + TopicArn: aws.String(r.snsTopic), + Message: aws.String(string(msg)), + Subject: aws.String(r.snsSubject), + }) + + if err != nil { + return fmt.Errorf("failed to send message to SNS: %w", err) + } + return nil +} + func (r *SNSReplicator) End() error { log.Printf("Streams [%s]: processed %d changes in total", r.streamID, r.localCount) atomic.AddInt64(r.totalCount, r.localCount)