Skip to content

Commit

Permalink
replicator-aws-sns: dump pre-image and post image
Browse files Browse the repository at this point in the history
  • Loading branch information
dkropachev committed Dec 12, 2024
1 parent b6f0202 commit 03c3f30
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 28 deletions.
10 changes: 9 additions & 1 deletion examples/replicator-aws-sns/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@ go 1.22

require (
github.com/aws/aws-sdk-go-v2 v1.32.6
github.com/aws/aws-sdk-go-v2/config v1.28.6
github.com/aws/aws-sdk-go-v2/service/sns v1.33.7
github.com/aws/aws-sdk-go-v2/service/sqs v1.37.2
github.com/gocql/gocql v0.0.0-20201215165327-e49edf966d90
github.com/scylladb/scylla-cdc-go v0.0.0-20201215165327-e49edf966d90
)

require (
github.com/aws/aws-sdk-go-v2/credentials v1.17.47 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 // indirect
github.com/aws/smithy-go v1.22.1 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
Expand Down
20 changes: 18 additions & 2 deletions examples/replicator-aws-sns/go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
github.com/aws/aws-sdk-go-v2 v1.32.6 h1:7BokKRgRPuGmKkFMhEg/jSul+tB9VvXhcViILtfG8b4=
github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U=
github.com/aws/aws-sdk-go-v2/config v1.28.6 h1:D89IKtGrs/I3QXOLNTH93NJYtDhm8SYa9Q5CsPShmyo=
github.com/aws/aws-sdk-go-v2/config v1.28.6/go.mod h1:GDzxJ5wyyFSCoLkS+UhGB0dArhb9mI+Co4dHtoTxbko=
github.com/aws/aws-sdk-go-v2/credentials v1.17.47 h1:48bA+3/fCdi2yAwVt+3COvmatZ6jUDNkDTIsqDiMUdw=
github.com/aws/aws-sdk-go-v2/credentials v1.17.47/go.mod h1:+KdckOejLW3Ks3b0E3b5rHsr2f9yuORBum0WPnE5o5w=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 h1:AmoU1pziydclFT/xRV+xXE/Vb8fttJCLRPv8oAkprc0=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21/go.mod h1:AjUdLYe4Tgs6kpH4Bv7uMZo7pottoyHMn4eTcIcneaY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 h1:s/fF4+yDQDoElYhfIVvSNyeCydfbuTKzhxSXDXCPasU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25/go.mod h1:IgPfDv5jqFIzQSNbUEMoitNooSMXjRSDkhXv8jiROvU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 h1:ZntTCl5EsYnhN/IygQEUugpdwbhdkom9uHcbCftiGgA=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25/go.mod h1:DBdPrgeocww+CSl1C8cEV8PN1mHMBhuCDLpXezyvWkE=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 h1:50+XsN70RS7dwJ2CkVNXzj7U2L1HKP8nqTd3XWEXBN4=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6/go.mod h1:WqgLmwY7so32kG01zD8CPTJWVWM+TzJoOVHwTg4aPug=
github.com/aws/aws-sdk-go-v2/service/sns v1.33.7 h1:N3o8mXK6/MP24BtD9sb51omEO9J9cgPM3Ughc293dZc=
github.com/aws/aws-sdk-go-v2/service/sns v1.33.7/go.mod h1:AAHZydTB8/V2zn3WNwjLXBK1RAcSEpDNmFfrmjvrJQg=
github.com/aws/aws-sdk-go-v2/service/sqs v1.37.2 h1:mFLfxLZB/TVQwNJAYox4WaxpIu+dFVIcExrmRmRCOhw=
github.com/aws/aws-sdk-go-v2/service/sqs v1.37.2/go.mod h1:GnvfTdlvcpD+or3oslHPOn4Mu6KaCwlCp+0p0oqWnrM=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 h1:rLnYAfXQ3YAccocshIH5mzNNwZBkBo+bP6EhIxak6Hw=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7/go.mod h1:ZHtuQJ6t9A/+YDuxOLnbryAmITtr8UysSny3qcyvJTc=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 h1:JnhTZR3PiYDNKlXy50/pNeix9aGMo6lLpXwJ1mw8MD4=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6/go.mod h1:URronUEGfXZN1VpdktPSD1EkAL9mfrV+2F4sjH38qOY=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 h1:s4074ZO1Hk8qv65GqNXqDjmkf4HSQqJukaLuuW0TpDA=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.2/go.mod h1:mVggCnIWoM09jP71Wh+ea7+5gAp53q+49wDFs1SW5z8=
github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro=
github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
Expand Down
100 changes: 75 additions & 25 deletions examples/replicator-aws-sns/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
scyllacdc "github.com/scylladb/scylla-cdc-go"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sns"
)

Expand All @@ -33,6 +34,10 @@ func main() {
readConsistency string
writeConsistency string

snsTopic string
snsRegion string
snsSubject string

progressTable string
)

Expand All @@ -43,6 +48,11 @@ func main() {
flag.StringVar(&readConsistency, "read-consistency", "", "consistency level used to read from cdc log (one, quorum, all)")
flag.StringVar(&writeConsistency, "write-consistency", "", "consistency level used to write to the destination cluster (one, quorum, all)")
flag.StringVar(&progressTable, "progress-table", "", "fully-qualified name of the table in the destination cluster to use for saving progress; if omitted, the progress won't be saved")

flag.StringVar(&snsTopic, "sns-topic", "", "SNS Topic ARN")
flag.StringVar(&snsSubject, "sns-subject", "", "SNS Subject")
flag.StringVar(&snsRegion, "sns-region", "", "AWS region where SNS topic is deployed")

flag.String("mode", "", "mode (ignored)")
flag.Parse()

Expand Down Expand Up @@ -85,6 +95,7 @@ func main() {
context.Background(),
source, progressNode,
fullyQualifiedTables,
snsTopic, snsSubject, snsRegion,
&adv,
clRead,
clWrite,
Expand Down Expand Up @@ -158,6 +169,7 @@ func newReplicator(
ctx context.Context,
source, destination string,
tableNames []string,
topic, subject, region string,
advancedParams *scyllacdc.AdvancedReaderConfig,
readConsistency gocql.Consistency,
progressConsistency gocql.Consistency,
Expand Down Expand Up @@ -186,6 +198,9 @@ func newReplicator(

factory := &replicatorFactory{
rowsRead: rowsRead,
topic: topic,
subject: subject,
region: region,
logger: logger,
}

Expand Down Expand Up @@ -251,6 +266,9 @@ func (repl *replicator) GetReadRowsCount() int64 {

type replicatorFactory struct {
rowsRead *int64
topic string
subject string
region string
logger scyllacdc.Logger
}

Expand All @@ -262,7 +280,7 @@ func (rf *replicatorFactory) CreateChangeConsumer(
if len(splitTableName) < 2 {
return nil, fmt.Errorf("table name is not fully qualified: %s", input.TableName)
}
return NewSNSReplicator(ctx, rf.rowsRead, input.StreamID, input.ProgressReporter, rf.logger)
return NewSNSReplicator(ctx, rf.topic, rf.subject, rf.region, rf.rowsRead, input.StreamID, input.ProgressReporter, rf.logger)
}

type SNSReplicator struct {
Expand Down Expand Up @@ -290,12 +308,26 @@ type SNSReplicator struct {

func NewSNSReplicator(
ctx context.Context,
topic, subject, region string,
count *int64,
streamID scyllacdc.StreamID,
reporter *scyllacdc.ProgressReporter,
logger scyllacdc.Logger,
) (*SNSReplicator, error) {
var opts [](func(*config.LoadOptions) error)
if region != "" {
opts = append(opts, config.WithRegion(region))
}

awsCfg, err := config.LoadDefaultConfig(ctx, opts...)
if err != nil {
return nil, err
}

dr := &SNSReplicator{
snsClient: sns.NewFromConfig(awsCfg),
snsTopic: topic,
snsSubject: subject,
totalCount: count,
streamID: streamID,
reporter: scyllacdc.NewPeriodicProgressReporter(logger, reportPeriod, reporter),
Expand All @@ -307,36 +339,25 @@ func NewSNSReplicator(

func (r *SNSReplicator) Consume(ctx context.Context, c scyllacdc.Change) error {
timestamp := c.GetCassandraTimestamp()
pos := 0

if showTimestamps {
log.Printf("[%s] Processing timestamp: %s (%s)\n", c.StreamID, c.Time, c.Time.Time())
}

for pos < len(c.Delta) {
change := c.Delta[pos]
change.GetRawData()
change.GetOperation()

msg, err := json.Marshal(map[string]interface{}{
"timestamp": timestamp,
"ttl": change.GetTTL(),
"seq_no": change.GetSeqNo(),
"operation": change.GetOperation(),
"end_of_batch": change.GetEndOfBatch(),
"data": change.GetRawData(),
})
if err != nil {
return fmt.Errorf("failed to serialize message: %w", err)
for _, change := range c.Delta {
if err := r.sendChangeToSNS(ctx, change, timestamp, "Delta"); err != nil {
return err
}
}

_, err = r.snsClient.Publish(ctx, &sns.PublishInput{
TopicArn: aws.String(r.snsTopic),
Message: aws.String(string(msg)),
Subject: aws.String(r.snsSubject),
})
if err != nil {
return fmt.Errorf("failed to publish message to topic %q: %w", r.snsTopic, err)
for _, change := range c.PreImage {
if err := r.sendChangeToSNS(ctx, change, timestamp, "PreImage"); err != nil {
return err
}
}

for _, change := range c.PostImage {
if err := r.sendChangeToSNS(ctx, change, timestamp, "PostImage"); err != nil {
return err
}
}

Expand All @@ -346,6 +367,35 @@ func (r *SNSReplicator) Consume(ctx context.Context, c scyllacdc.Change) error {
return nil
}

func (r *SNSReplicator) sendChangeToSNS(ctx context.Context, change *scyllacdc.ChangeRow, timestamp int64, recType string) error {
change.GetRawData()
change.GetOperation()

msg, err := json.Marshal(map[string]interface{}{
"type": recType,
"operation": change.GetOperation(),
"timestamp": timestamp,
"ttl": change.GetTTL(),
"seq_no": change.GetSeqNo(),
"end_of_batch": change.GetEndOfBatch(),
"data": change.GetRawData(),
})
if err != nil {
return fmt.Errorf("failed to serialize message: %w", err)
}

_, err = r.snsClient.Publish(ctx, &sns.PublishInput{
TopicArn: aws.String(r.snsTopic),
Message: aws.String(string(msg)),
Subject: aws.String(r.snsSubject),
})

if err != nil {
return fmt.Errorf("failed to send message to SNS: %w", err)
}
return nil
}

func (r *SNSReplicator) End() error {
log.Printf("Streams [%s]: processed %d changes in total", r.streamID, r.localCount)
atomic.AddInt64(r.totalCount, r.localCount)
Expand Down

0 comments on commit 03c3f30

Please sign in to comment.