diff --git a/examples/replicator-gcp-pub/go.mod b/examples/replicator-gcp-pub/go.mod new file mode 100644 index 0000000..6bbdbae --- /dev/null +++ b/examples/replicator-gcp-pub/go.mod @@ -0,0 +1,66 @@ +module github.com/scylladb/replicator-gcp-pub + +go 1.22 + +require ( + cloud.google.com/go/pubsub v1.45.3 + github.com/aws/aws-sdk-go-v2 v1.32.7 + github.com/aws/aws-sdk-go-v2/config v1.28.7 + github.com/aws/aws-sdk-go-v2/service/sns v1.33.8 + github.com/gocql/gocql v0.0.0-20201215165327-e49edf966d90 + github.com/scylladb/scylla-cdc-go v0.0.0-20201215165327-e49edf966d90 +) + +require ( + cloud.google.com/go v0.116.0 // indirect + cloud.google.com/go/auth v0.11.0 // indirect + cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect + cloud.google.com/go/compute/metadata v0.5.2 // indirect + cloud.google.com/go/iam v1.2.2 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.48 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.26 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.26 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.7 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.8 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.7 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.3 // indirect + github.com/aws/smithy-go v1.22.1 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/google/s2a-go v0.1.8 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect + github.com/googleapis/gax-go/v2 v2.14.0 // indirect + github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect + go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect + go.opentelemetry.io/otel v1.29.0 // indirect + go.opentelemetry.io/otel/metric v1.29.0 // indirect + go.opentelemetry.io/otel/trace v1.29.0 // indirect + golang.org/x/crypto v0.29.0 // indirect + golang.org/x/net v0.31.0 // indirect + golang.org/x/oauth2 v0.24.0 // indirect + golang.org/x/sync v0.9.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.20.0 // indirect + golang.org/x/time v0.8.0 // indirect + google.golang.org/api v0.210.0 // indirect + google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20241113202542-65e8d215514f // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 // indirect + google.golang.org/grpc v1.67.1 // indirect + google.golang.org/protobuf v1.35.2 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect +) + +replace ( + github.com/gocql/gocql => github.com/scylladb/gocql v1.14.4 + github.com/scylladb/scylla-cdc-go => ../../ +) diff --git a/examples/replicator-gcp-pub/go.sum b/examples/replicator-gcp-pub/go.sum new file mode 100644 index 0000000..2aff3c1 --- /dev/null +++ b/examples/replicator-gcp-pub/go.sum @@ -0,0 +1,231 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE= +cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U= +cloud.google.com/go/auth v0.11.0 h1:Ic5SZz2lsvbYcWT5dfjNWgw6tTlGi2Wc8hyQSC9BstA= +cloud.google.com/go/auth v0.11.0/go.mod h1:xxA5AqpDrvS+Gkmo9RqrGGRh6WSNKKOXhY3zNOr38tI= +cloud.google.com/go/auth/oauth2adapt v0.2.6 h1:V6a6XDu2lTwPZWOawrAa9HUK+DB2zfJyTuciBG5hFkU= +cloud.google.com/go/auth/oauth2adapt v0.2.6/go.mod h1:AlmsELtlEBnaNTL7jCj8VQFLy6mbZv0s4Q7NGBeQ5E8= +cloud.google.com/go/compute/metadata v0.5.2 h1:UxK4uu/Tn+I3p2dYWTfiX4wva7aYlKixAHn3fyqngqo= +cloud.google.com/go/compute/metadata v0.5.2/go.mod h1:C66sj2AluDcIqakBq/M8lw8/ybHgOZqin2obFxa/E5k= +cloud.google.com/go/iam v1.2.2 h1:ozUSofHUGf/F4tCNy/mu9tHLTaxZFLOUiKzjcgWHGIA= +cloud.google.com/go/iam v1.2.2/go.mod h1:0Ys8ccaZHdI1dEUilwzqng/6ps2YB6vRsjIe00/+6JY= +cloud.google.com/go/kms v1.20.1 h1:og29Wv59uf2FVaZlesaiDAqHFzHaoUyHI3HYp9VUHVg= +cloud.google.com/go/kms v1.20.1/go.mod h1:LywpNiVCvzYNJWS9JUcGJSVTNSwPwi0vBAotzDqn2nc= +cloud.google.com/go/longrunning v0.6.2 h1:xjDfh1pQcWPEvnfjZmwjKQEcHnpz6lHjfy7Fo0MK+hc= +cloud.google.com/go/longrunning v0.6.2/go.mod h1:k/vIs83RN4bE3YCswdXC5PFfWVILjm3hpEUlSko4PiI= +cloud.google.com/go/pubsub v1.45.3 h1:prYj8EEAAAwkp6WNoGTE4ahe0DgHoyJd5Pbop931zow= +cloud.google.com/go/pubsub v1.45.3/go.mod h1:cGyloK/hXC4at7smAtxFnXprKEFTqmMXNNd9w+bd94Q= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aws/aws-sdk-go-v2 v1.32.7 h1:ky5o35oENWi0JYWUZkB7WYvVPP+bcRF5/Iq7JWSb5Rw= +github.com/aws/aws-sdk-go-v2 v1.32.7/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= +github.com/aws/aws-sdk-go-v2/config v1.28.7 h1:GduUnoTXlhkgnxTD93g1nv4tVPILbdNQOzav+Wpg7AE= +github.com/aws/aws-sdk-go-v2/config v1.28.7/go.mod h1:vZGX6GVkIE8uECSUHB6MWAUsd4ZcG2Yq/dMa4refR3M= +github.com/aws/aws-sdk-go-v2/credentials v1.17.48 h1:IYdLD1qTJ0zanRavulofmqut4afs45mOWEI+MzZtTfQ= +github.com/aws/aws-sdk-go-v2/credentials v1.17.48/go.mod h1:tOscxHN3CGmuX9idQ3+qbkzrjVIx32lqDSU1/0d/qXs= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22 h1:kqOrpojG71DxJm/KDPO+Z/y1phm1JlC8/iT+5XRmAn8= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22/go.mod h1:NtSFajXVVL8TA2QNngagVZmUtXciyrHOt7xgz4faS/M= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.26 h1:I/5wmGMffY4happ8NOCuIUEWGUvvFp5NSeQcXl9RHcI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.26/go.mod h1:FR8f4turZtNy6baO0KJ5FJUmXH/cSkI9fOngs0yl6mA= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.26 h1:zXFLuEuMMUOvEARXFUVJdfqZ4bvvSgdGRq/ATcrQxzM= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.26/go.mod h1:3o2Wpy0bogG1kyOPrgkXA8pgIfEEv0+m19O9D5+W8y8= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.7 h1:8eUsivBQzZHqe/3FE+cqwfH+0p5Jo8PFM/QYQSmeZ+M= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.7/go.mod h1:kLPQvGUmxn/fqiCrDeohwG33bq2pQpGeY62yRO6Nrh0= +github.com/aws/aws-sdk-go-v2/service/sns v1.33.8 h1:zKokiUMOfbZSrAUVqw+bSjr6gl9u/JcvPzHTmL+tmdQ= +github.com/aws/aws-sdk-go-v2/service/sns v1.33.8/go.mod h1:Nf9YEyqE51C+Dyj0DWSATxvsr39jBFIss6Jee9Hyqx4= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.8 h1:CvuUmnXI7ebaUAhbJcDy9YQx8wHR69eZ9I7q5hszt/g= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.8/go.mod h1:XDeGv1opzwm8ubxddF0cgqkZWsyOtw4lr6dxwmb6YQg= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.7 h1:F2rBfNAL5UyswqoeWv9zs74N/NanhK16ydHW1pahX6E= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.7/go.mod h1:JfyQ0g2JG8+Krq0EuZNnRwX0mU0HrwY/tG6JNfcqh4k= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.3 h1:Xgv/hyNgvLda/M9l9qxXc4UFSgppnRczLxlMs5Ae/QY= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.3/go.mod h1:5Gn+d+VaaRgsjewpMvGazt0WfcFO+Md4wLOuBfGR9Bc= +github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= +github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= +github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/s2a-go v0.1.8 h1:zZDs9gcbt9ZPLV0ndSyQk6Kacx2g/X+SKYovpnz3SMM= +github.com/google/s2a-go v0.1.8/go.mod h1:6iNWHTpQ+nfNRN5E00MSdfDwVesa8hhS32PhPO8deJA= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gTgghdIA6Stxb52D5RnLI1SLyw= +github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= +github.com/googleapis/gax-go/v2 v2.14.0 h1:f+jMrjBPl+DL9nI4IQzLUxMq7XrAqFYB7hBPqMNIe8o= +github.com/googleapis/gax-go/v2 v2.14.0/go.mod h1:lhBCnjdLrWRaPvLWhmc8IS24m9mr07qSYnHncrgo+zk= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/scylladb/gocql v1.14.4 h1:MhevwCfyAraQ6RvZYFO3pF4Lt0YhvQlfg8Eo2HEqVQA= +github.com/scylladb/gocql v1.14.4/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.einride.tech/aip v0.68.0 h1:4seM66oLzTpz50u4K1zlJyOXQ3tCzcJN7I22tKkjipw= +go.einride.tech/aip v0.68.0/go.mod h1:7y9FF8VtPWqpxuAxl0KQWqaULxW4zFIesD6zF5RIHHg= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= +go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= +go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= +go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= +go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= +go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= +go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= +go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= +go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= +golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= +golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE= +golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/api v0.210.0 h1:HMNffZ57OoZCRYSbdWVRoqOa8V8NIHLL0CzdBPLztWk= +google.golang.org/api v0.210.0/go.mod h1:B9XDZGnx2NtyjzVkOVTGrFSAVZgPcbedzKg/gTLwqBs= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 h1:ToEetK57OidYuqD4Q5w+vfEnPvPpuTwedCNVohYJfNk= +google.golang.org/genproto v0.0.0-20241118233622-e639e219e697/go.mod h1:JJrvXBWRZaFMxBufik1a4RpFw4HhgVtBBWQeQgUj2cc= +google.golang.org/genproto/googleapis/api v0.0.0-20241113202542-65e8d215514f h1:M65LEviCfuZTfrfzwwEoxVtgvfkFkBUbFnRbxCXuXhU= +google.golang.org/genproto/googleapis/api v0.0.0-20241113202542-65e8d215514f/go.mod h1:Yo94eF2nj7igQt+TiJ49KxjIH8ndLYPZMIRSiRcEbg0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 h1:LWZqQOEjDyONlF1H6afSWpAL/znlREo2tHfLoe+8LMA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= +sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= diff --git a/examples/replicator-gcp-pub/main.go b/examples/replicator-gcp-pub/main.go new file mode 100644 index 0000000..2772a50 --- /dev/null +++ b/examples/replicator-gcp-pub/main.go @@ -0,0 +1,429 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "os" + "os/signal" + "strings" + "sync" + "sync/atomic" + "time" + + "cloud.google.com/go/pubsub" + "github.com/gocql/gocql" + + scyllacdc "github.com/scylladb/scylla-cdc-go" +) + +// TODO: Escape field names? +var showTimestamps = false + +var reportPeriod = 1 * time.Minute + +func main() { + var ( + keyspace string + table string + source string + progressNode string + readConsistency string + writeConsistency string + + pubTopic string + + progressTable string + ) + + flag.StringVar(&keyspace, "keyspace", "", "keyspace name") + flag.StringVar(&table, "table", "", "table name; you can specify multiple table by separating them with a comma") + flag.StringVar(&source, "source", "", "address of a node in source cluster") + flag.StringVar(&progressNode, "progress-node", "", "address of a node in progress cluster") + flag.StringVar(&readConsistency, "read-consistency", "", "consistency level used to read from cdc log (one, quorum, all)") + flag.StringVar(&writeConsistency, "write-consistency", "", "consistency level used to write to the destination cluster (one, quorum, all)") + flag.StringVar(&progressTable, "progress-table", "", "fully-qualified name of the table in the destination cluster to use for saving progress; if omitted, the progress won't be saved") + + flag.StringVar(&pubTopic, "topic", "", "GCP PUB/SUB Topic") + + flag.String("mode", "", "mode (ignored)") + + adv := scyllacdc.AdvancedReaderConfig{} + flag.DurationVar(&adv.ConfidenceWindowSize, "polling-confidence-window-size", 30*time.Second, "defines a minimal age a change must have in order to be read.") + flag.DurationVar(&adv.ChangeAgeLimit, "polling-change-age-limit", 10*time.Minute, "When the library starts for the first time it has to start consuming\nchanges from some point in time. This parameter defines how far in the\npast it needs to look. If the value of the parameter is set to an hour,\nthen the library will only read historical changes that are no older than\nan hour.") + flag.DurationVar(&adv.QueryTimeWindowSize, "pooling-query-time-window-size", 1*time.Minute, "Changes are queried using select statements with restriction on the time\nthose changes appeared. The restriction is bounding the time from both\nlower and upper bounds. This parameter defines the width of the time\nwindow used for the restriction.") + flag.DurationVar(&adv.PostEmptyQueryDelay, "polling-post-empty-query-delay", 30*time.Second, "The library uses select statements to fetch changes from CDC Log tables.\nEach select fetches changes from a single table and fetches only changes\nfrom a limited set of CDC streams. If such select returns no changes then\nnext select to this table and set of CDC streams will be issued after\na delay. This parameter specifies the length of the delay") + flag.DurationVar(&adv.PostNonEmptyQueryDelay, "polling-post-non-empty-query-delay", 10*time.Second, "The library uses select statements to fetch changes from CDC Log tables.\nEach select fetches changes from a single table and fetches only changes\nfrom a limited set of CDC streams. If such select returns one or more\nchanges then next select to this table and set of CDC streams will be\nissued after a delay. This parameter specifies the length of the delay") + flag.DurationVar(&adv.PostFailedQueryDelay, "pooling-post-failed-query-delay", 1*time.Second, "If the library tries to read from the CDC log and the read operation\nfails, it will wait some time before attempting to read again. This\nparameter specifies the length of the delay.") + + flag.Parse() + + clRead := parseConsistency(readConsistency) + clWrite := parseConsistency(writeConsistency) + + fmt.Println("Parameters:") + fmt.Printf(" Keyspace: %s\n", keyspace) + fmt.Printf(" Table: %s\n", table) + fmt.Printf(" Source cluster IP: %s\n", source) + fmt.Printf(" Destination cluster IP: %s\n", progressNode) + fmt.Printf(" Consistency for reads: %s\n", clRead) + fmt.Printf(" Consistency for writes: %s\n", clWrite) + fmt.Printf(" Table to use for saving progress: %s\n", progressTable) + fmt.Println("Advanced reader parameters:") + fmt.Printf(" Confidence window size: %s\n", adv.ConfidenceWindowSize) + fmt.Printf(" Change age limit: %s\n", adv.ChangeAgeLimit) + fmt.Printf(" Query window size: %s\n", adv.QueryTimeWindowSize) + fmt.Printf(" Delay after poll with empty results: %s\n", adv.PostEmptyQueryDelay) + fmt.Printf(" Delay after poll with non-empty results: %s\n", adv.PostNonEmptyQueryDelay) + fmt.Printf(" Delay after failed poll: %s\n", adv.PostFailedQueryDelay) + + var fullyQualifiedTables []string + + for _, t := range strings.Split(table, ",") { + fullyQualifiedTables = append(fullyQualifiedTables, keyspace+"."+t) + } + + logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile) + repl, err := newReplicator( + context.Background(), + source, progressNode, + fullyQualifiedTables, + pubTopic, + &adv, + clRead, + clWrite, + progressTable, + logger, + ) + if err != nil { + log.Fatalln(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + // React to Ctrl+C signal. + // + // 1st signal will cause the replicator to read changes up until + // the moment the signal was received, and then it will stop the replicator. + // This is the "most graceful" way of stopping the replicator. + // + // 2nd signal will cancel the context. This should stop all operations + // done by the replicator ASAP and stop it. + // + // 3rd signal will exit the process immediately with error code 1. + signalC := make(chan os.Signal, 3) + go func() { + <-signalC + now := time.Now() + log.Printf("stopping at %v", now) + repl.StopAt(now) + + <-signalC + log.Printf("stopping now") + cancel() + + <-signalC + log.Printf("killing") + os.Exit(1) + }() + signal.Notify(signalC, os.Interrupt) + + if err := repl.Run(ctx); err != nil { + log.Fatalln(err) + } + + log.Printf("quitting, rows read: %d", repl.GetReadRowsCount()) +} + +func parseConsistency(s string) gocql.Consistency { + switch strings.ToLower(s) { + case "one": + return gocql.One + case "quorum": + return gocql.Quorum + case "all": + return gocql.All + default: + log.Printf("warning: got unsupported consistency level \"%s\", will use \"one\" instead", s) + return gocql.One + } +} + +type replicator struct { + reader *scyllacdc.Reader + + readerSession *gocql.Session + progressSession *gocql.Session + + rowsRead *int64 +} + +func newReplicator( + ctx context.Context, + source, destination string, + tableNames []string, + topic string, + advancedParams *scyllacdc.AdvancedReaderConfig, + readConsistency gocql.Consistency, + progressConsistency gocql.Consistency, + progressTable string, + logger scyllacdc.Logger, +) (*replicator, error) { + ptCluster := gocql.NewCluster(destination) + ptCluster.Timeout = 10 * time.Second + ptCluster.Consistency = progressConsistency + progressSession, err := ptCluster.CreateSession() + if err != nil { + return nil, err + } + + // Configure a session + readerCluster := gocql.NewCluster(source) + readerCluster.Timeout = 10 * time.Second + readerCluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) + readerSession, err := readerCluster.CreateSession() + if err != nil { + progressSession.Close() + return nil, err + } + + rowsRead := new(int64) + + factory := &replicatorFactory{ + rowsRead: rowsRead, + topic: topic, + logger: logger, + } + + var progressManager scyllacdc.ProgressManager + if progressTable != "" { + progressManager, err = scyllacdc.NewTableBackedProgressManager(progressSession, progressTable, "cdc-replicator") + if err != nil { + progressSession.Close() + return nil, err + } + } + + cfg := &scyllacdc.ReaderConfig{ + Session: readerSession, + ChangeConsumerFactory: factory, + ProgressManager: progressManager, + TableNames: tableNames, + Consistency: readConsistency, + } + + if advancedParams != nil { + cfg.Advanced = *advancedParams + } + cfg.Consistency = readConsistency + cfg.Logger = logger + + reader, err := scyllacdc.NewReader(ctx, cfg) + if err != nil { + readerSession.Close() + progressSession.Close() + return nil, err + } + + repl := &replicator{ + reader: reader, + + readerSession: readerSession, + progressSession: progressSession, + + rowsRead: rowsRead, + } + + return repl, nil +} + +func (repl *replicator) Run(ctx context.Context) error { + defer repl.progressSession.Close() + defer repl.readerSession.Close() + return repl.reader.Run(ctx) +} + +func (repl *replicator) StopAt(at time.Time) { + repl.reader.StopAt(at) +} + +func (repl *replicator) Stop() { + repl.reader.Stop() +} + +func (repl *replicator) GetReadRowsCount() int64 { + return atomic.LoadInt64(repl.rowsRead) +} + +type replicatorFactory struct { + rowsRead *int64 + projectID string + topic string + logger scyllacdc.Logger +} + +func (rf *replicatorFactory) CreateChangeConsumer( + ctx context.Context, + input scyllacdc.CreateChangeConsumerInput, +) (scyllacdc.ChangeConsumer, error) { + splitTableName := strings.SplitN(input.TableName, ".", 2) + if len(splitTableName) < 2 { + return nil, fmt.Errorf("table name is not fully qualified: %s", input.TableName) + } + return NewPUBReplicator(ctx, rf.projectID, rf.topic, rf.rowsRead, input.StreamID, input.ProgressReporter, rf.logger) +} + +type PUBReplicator struct { + topic *pubsub.Topic + pubTopic string + consistency gocql.Consistency + + pkColumns []string + ckColumns []string + otherColumns []string + columnTypes map[string]TypeInfo + allColumns []string + + insertStr string + rowDeleteQueryStr string + partitionDeleteQueryStr string + + localCount int64 + totalCount *int64 + + streamID scyllacdc.StreamID + reporter *scyllacdc.PeriodicProgressReporter +} + +func NewPUBReplicator( + ctx context.Context, + projectID, topic string, + count *int64, + streamID scyllacdc.StreamID, + reporter *scyllacdc.ProgressReporter, + logger scyllacdc.Logger, +) (*PUBReplicator, error) { + cl, err := pubsub.NewClient(ctx, projectID) + if err != nil { + return nil, err + } + + dr := &PUBReplicator{ + topic: cl.Topic(topic), + pubTopic: topic, + totalCount: count, + streamID: streamID, + reporter: scyllacdc.NewPeriodicProgressReporter(logger, reportPeriod, reporter), + } + + dr.reporter.Start(ctx) + return dr, nil +} + +func (r *PUBReplicator) Consume(ctx context.Context, c scyllacdc.Change) error { + timestamp := c.GetCassandraTimestamp() + if showTimestamps { + log.Printf("[%s] Processing timestamp: %s (%s)\n", c.StreamID, c.Time, c.Time.Time()) + } + wg := &sync.WaitGroup{} + errs := &writeSafeList[error]{} + for _, change := range c.Delta { + wg.Add(1) + if err := r.sendChangeToPUB(ctx, change, timestamp, "Delta", wg, errs); err != nil { + return err + } + } + + for _, change := range c.PreImage { + wg.Add(1) + if err := r.sendChangeToPUB(ctx, change, timestamp, "PreImage", wg, errs); err != nil { + return err + } + } + + for _, change := range c.PostImage { + wg.Add(1) + if err := r.sendChangeToPUB(ctx, change, timestamp, "PostImage", wg, errs); err != nil { + return err + } + } + + wg.Wait() + for _, err := range errs.list { + if err != nil { + return err + } + } + + r.reporter.Update(c.Time) + r.localCount += int64(len(c.Delta)) + + return nil +} + +func (r *PUBReplicator) sendChangeToPUB(ctx context.Context, change *scyllacdc.ChangeRow, timestamp int64, recType string, wg *sync.WaitGroup, errs *writeSafeList[error]) error { + change.GetRawData() + change.GetOperation() + + msg, err := json.Marshal(map[string]interface{}{ + "type": recType, + "operation": change.GetOperation(), + "timestamp": timestamp, + "ttl": change.GetTTL(), + "seq_no": change.GetSeqNo(), + "end_of_batch": change.GetEndOfBatch(), + "data": change.GetRawData(), + }) + if err != nil { + return fmt.Errorf("failed to serialize message: %w", err) + } + + resp := r.topic.Publish(ctx, &pubsub.Message{ + Data: msg, + }) + + go func() { + defer wg.Done() + _, err := resp.Get(ctx) + errs.Add(err) + }() + + return nil +} + +func (r *PUBReplicator) End() error { + log.Printf("Streams [%s]: processed %d changes in total", r.streamID, r.localCount) + atomic.AddInt64(r.totalCount, r.localCount) + _ = r.reporter.SaveAndStop(context.Background()) + r.topic.Flush() + r.topic.Stop() + return nil +} + +func (r *PUBReplicator) Empty(ctx context.Context, ackTime gocql.UUID) error { + log.Printf("Streams [%s]: saw no changes up to %s", r.streamID, ackTime.Time()) + r.reporter.Update(ackTime) + return nil +} + +// Make sure that PUBReplicator supports the ChangeOrEmptyNotificationConsumer interface +var _ scyllacdc.ChangeOrEmptyNotificationConsumer = (*PUBReplicator)(nil) + +type writeSafeList[V any] struct { + list []V + mutex sync.Mutex +} + +func (w *writeSafeList[V]) Add(item V) { + w.mutex.Lock() + w.list = append(w.list, item) + w.mutex.Unlock() +} + +func (w *writeSafeList[V]) Items() []V { + w.mutex.Lock() + defer w.mutex.Unlock() + return w.list +} diff --git a/examples/replicator-gcp-pub/replicator_test.go b/examples/replicator-gcp-pub/replicator_test.go new file mode 100644 index 0000000..d19f322 --- /dev/null +++ b/examples/replicator-gcp-pub/replicator_test.go @@ -0,0 +1,534 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "reflect" + "regexp" + "testing" + "time" + + "github.com/gocql/gocql" + scyllacdc "github.com/scylladb/scylla-cdc-go" + "github.com/scylladb/scylla-cdc-go/testutils" +) + +type schema struct { + tableName string + createQuery string +} + +var udts = []string{ + "CREATE TYPE udt_simple (a int, b int, c text)", +} + +var ( + schemaSimple = schema{ + "tbl_simple", + "CREATE TABLE tbl_simple (pk text, ck int, v1 int, v2 text, PRIMARY KEY (pk, ck))", + } + schemaMultipleClusteringKeys = schema{ + "tbl_multiple_clustering_keys", + "CREATE TABLE tbl_multiple_clustering_keys (pk text, ck1 int, ck2 int, v int, PRIMARY KEY (pk, ck1, ck2))", + } + schemaBlobs = schema{ + "tbl_blobs", + "CREATE TABLE tbl_blobs (pk text, ck int, v blob, PRIMARY KEY (pk, ck))", + } + schemaLists = schema{ + "tbl_lists", + "CREATE TABLE tbl_lists (pk text, ck int, v list, PRIMARY KEY(pk, ck))", + } + schemaSets = schema{ + "tbl_sets", + "CREATE TABLE tbl_sets (pk text, ck int, v set, PRIMARY KEY (pk, ck))", + } + schemaMaps = schema{ + "tbl_maps", + "CREATE TABLE tbl_maps (pk text, ck int, v map, PRIMARY KEY (pk, ck))", + } + schemaTuples = schema{ + "tbl_tuples", + "CREATE TABLE tbl_tuples (pk text, ck int, v tuple, PRIMARY KEY (pk, ck))", + } + schemaTuplesInTuples = schema{ + "tbl_tuples_in_tuples", + "CREATE TABLE tbl_tuples_in_tuples (pk text, ck int, v tuple, int>, PRIMARY KEY (pk, ck))", + } + schemaTuplesInTuplesInTuples = schema{ + "tbl_tuples_in_tuples_in_tuples", + "CREATE TABLE tbl_tuples_in_tuples_in_tuples (pk text, ck int, v tuple, text>, int>, PRIMARY KEY (pk, ck))", + } + schemaUDTs = schema{ + "tbl_udts", + "CREATE TABLE tbl_udts (pk text, ck int, v udt_simple, PRIMARY KEY (pk, ck))", + } +) + +var testCases = []struct { + schema schema + pk string + queries []string +}{ + // Operations test cases + { + schemaSimple, + "simpleInserts", + []string{ + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('simpleInserts', 1, 2, 'abc')", + "INSERT INTO %s (pk, ck, v1) VALUES ('simpleInserts', 2, 3)", + "INSERT INTO %s (pk, ck, v2) VALUES ('simpleInserts', 2, 'def')", + }, + }, + { + schemaSimple, + "simpleUpdates", + []string{ + "UPDATE %s SET v1 = 1 WHERE pk = 'simpleUpdates' AND ck = 1", + "UPDATE %s SET v2 = 'abc' WHERE pk = 'simpleUpdates' AND ck = 2", + "UPDATE %s SET v1 = 5, v2 = 'def' WHERE pk = 'simpleUpdates' AND ck = 3", + }, + }, + { + schemaSimple, + "rowDeletes", + []string{ + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('rowDeletes', 1, 2, 'abc')", + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('rowDeletes', 2, 3, 'def')", + "DELETE FROM %s WHERE pk = 'rowDeletes' AND ck = 1", + }, + }, + { + schemaSimple, + "partitionDeletes", + []string{ + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('partitionDeletes', 1, 2, 'abc')", + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('partitionDeletes', 2, 3, 'def')", + "DELETE FROM %s WHERE pk = 'partitionDeletes'", + // Insert one more row, just to check if replication works at all + "INSERT INTO %s (pk, ck, v1, v2) VALUES ('partitionDeletes', 4, 5, 'def')", + }, + }, + { + schemaMultipleClusteringKeys, + "rangeDeletes", + []string{ + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 1, 1, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 1, 2, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 1, 3, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 1, 4, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 2, 1, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 2, 2, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 2, 3, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 2, 4, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 3, 1, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 3, 2, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 3, 3, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 3, 4, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 4, 1, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 4, 2, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 4, 3, 0)", + "INSERT INTO %s (pk, ck1, ck2, v) VALUES ('rangeDeletes', 4, 4, 0)", + "DELETE FROM %s WHERE pk = 'rangeDeletes' AND ck1 > 3", + "DELETE FROM %s WHERE pk = 'rangeDeletes' AND ck1 <= 1", + "DELETE FROM %s WHERE pk = 'rangeDeletes' AND ck1 = 2 AND ck2 > 1 AND ck2 < 4", + }, + }, + + // Blob test cases + { + schemaBlobs, + "blobs", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('blobs', 1, 0x1234)", + "INSERT INTO %s (pk, ck, v) VALUES ('blobs', 2, 0x)", + "INSERT INTO %s (pk, ck, v) VALUES ('blobs', 3, null)", + "INSERT INTO %s (pk, ck, v) VALUES ('blobs', 4, 0x4321)", + "INSERT INTO %s (pk, ck, v) VALUES ('blobs', 5, 0x00)", + "UPDATE %s SET v = null WHERE pk = 'blobs' AND ck = 4", + "UPDATE %s SET v = 0x WHERE pk = 'blobs' AND ck = 5", + }, + }, + + // Lists test cases + { + schemaLists, + "listOverwrites", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('listOverwrites', 1, [1, 2, 3])", + "INSERT INTO %s (pk, ck, v) VALUES ('listOverwrites', 1, [4, 5, 6, 7])", + "INSERT INTO %s (pk, ck, v) VALUES ('listOverwrites', 2, [6, 5, 4, 3, 2, 1])", + "INSERT INTO %s (pk, ck, v) VALUES ('listOverwrites', 2, null)", + "INSERT INTO %s (pk, ck, v) VALUES ('listOverwrites', 3, [1, 11, 111])", + "UPDATE %s SET v = [2, 22, 222] WHERE pk = 'listOverwrites' AND ck = 3", + }, + }, + { + schemaLists, + "listAppends", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('listAppends', 1, [1, 2, 3])", + "UPDATE %s SET v = v + [4, 5, 6] WHERE pk = 'listAppends' AND ck = 1", + "UPDATE %s SET v = [-2, -1, 0] + v WHERE pk = 'listAppends' AND ck = 1", + }, + }, + { + schemaLists, + "listRemoves", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('listRemoves', 1, [1, 2, 3])", + "UPDATE %s SET v = v + [4, 5, 6] WHERE pk = 'listRemoves' AND ck = 1", + "UPDATE %s SET v = v - [1, 2, 3] WHERE pk = 'listRemoves' AND ck = 1", + }, + }, + + // Set test cases + { + schemaSets, + "setOverwrites", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('setOverwrites', 1, {1, 2, 3, 4})", + "INSERT INTO %s (pk, ck, v) VALUES ('setOverwrites', 1, {4, 5, 6, 7})", + "INSERT INTO %s (pk, ck, v) VALUES ('setOverwrites', 2, {8, 9, 10, 11})", + "INSERT INTO %s (pk, ck, v) VALUES ('setOverwrites', 2, null)", + "INSERT INTO %s (pk, ck, v) VALUES ('setOverwrites', 3, {12, 13, 14, 15})", + "UPDATE %s SET v = null WHERE pk = 'setOverwrites' AND ck = 3", + }, + }, + { + schemaSets, + "setAppends", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('setAppends', 1, {1, 2, 3, 4})", + "UPDATE %s SET v = v + {5, 6} WHERE pk = 'setAppends' AND ck = 1", + "UPDATE %s SET v = v + {5, 6} WHERE pk = 'setAppends' AND ck = 2", + }, + }, + { + schemaSets, + "setRemovals", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('setRemovals', 1, {1, 2, 3, 4})", + "UPDATE %s SET v = v - {1, 3} WHERE pk = 'setRemovals' AND ck = 1", + "UPDATE %s SET v = v - {1138} WHERE pk = 'setRemovals' AND ck = 2", + }, + }, + + // Map test cases + { + schemaMaps, + "mapOverwrites", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('mapOverwrites', 1, {1: 2, 3: 4})", + "INSERT INTO %s (pk, ck, v) VALUES ('mapOverwrites', 1, {5: 6, 7: 8})", + "INSERT INTO %s (pk, ck, v) VALUES ('mapOverwrites', 2, {9: 10, 11: 12})", + "INSERT INTO %s (pk, ck, v) VALUES ('mapOverwrites', 2, null)", + "INSERT INTO %s (pk, ck, v) VALUES ('mapOverwrites', 3, {13: 14, 15: 16})", + "UPDATE %s SET v = null WHERE pk = 'mapOverwrites' AND ck = 3", + }, + }, + { + schemaMaps, + "mapSets", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('mapSets', 1, {1: 2, 3: 4, 5: 6})", + "UPDATE %s SET v[1] = 42 WHERE pk = 'mapSets' AND ck = 1", + "UPDATE %s SET v[3] = null WHERE pk = 'mapSets' AND ck = 1", + "UPDATE %s SET v[3] = 123 WHERE pk = 'mapSets' AND ck = 1", + "UPDATE %s SET v[5] = 321 WHERE pk = 'mapSets' AND ck = 2", + }, + }, + { + schemaMaps, + "mapAppends", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('mapAppends', 1, {1: 2, 3: 4})", + "UPDATE %s SET v = v + {5: 6} WHERE pk = 'mapAppends' AND ck = 1", + "UPDATE %s SET v = v + {5: 6} WHERE pk = 'mapAppends' AND ck = 2", + }, + }, + { + schemaMaps, + "mapRemovals", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('mapRemovals', 1, {1: 2, 3: 4})", + "UPDATE %s SET v = v - {1} WHERE pk = 'mapRemovals' AND ck = 1", + "UPDATE %s SET v = v - {1138} WHERE pk = 'mapRemovals' AND ck = 2", + }, + }, + + // Tuple test cases + { + schemaTuples, + "tupleInserts", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('tupleInserts', 1, (7, 'abc'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleInserts', 2, (9, 'def'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleInserts', 2, null)", + }, + }, + { + schemaTuples, + "tupleUpdates", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 1, (7, 'abc'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 2, (9, 'def'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 3, (11, 'ghi'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 4, (13, 'jkl'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 5, (15, 'mno'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 6, (17, 'pqr'))", + "INSERT INTO %s (pk, ck, v) VALUES ('tupleUpdates', 7, (19, 'stu'))", + "UPDATE %s SET v = (111, 'zyx') WHERE pk = 'tupleUpdates' AND ck = 1", + "UPDATE %s SET v = null WHERE pk = 'tupleUpdates' AND ck = 2", + "INSERT INTO %s (pk, ck) VALUES ('tupleUpdates', 3)", + "UPDATE %s SET v = (null, null) WHERE pk = 'tupleUpdates' AND ck = 4", + "UPDATE %s SET v = (null, 'asdf') WHERE pk = 'tupleUpdates' AND ck = 5", + "UPDATE %s SET v = (123, null) WHERE pk = 'tupleUpdates' AND ck = 6", + "UPDATE %s SET v = (null, '') WHERE pk = 'tupleUpdates' AND ck = 7", + }, + }, + { + schemaTuplesInTuples, + "tuplesInTuples", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuples', 1, ((1, 'abc'), 7))", + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuples', 2, ((3, 'def'), 9))", + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuples', 3, ((3, 'ghi'), 9))", + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuples', 4, ((3, 'jkl'), 9))", + "UPDATE %s SET v = ((100, 'zyx'), 111) WHERE pk = 'tuplesInTuples' AND ck = 1", + "UPDATE %s SET v = null WHERE pk = 'tuplesInTuples' AND ck = 2", + "UPDATE %s SET v = ((200, null), 999) WHERE pk = 'tuplesInTuples' AND ck = 3", + "UPDATE %s SET v = ((300, ''), 333) WHERE pk = 'tuplesInTuples' AND ck = 4", + }, + }, + { + schemaTuplesInTuplesInTuples, + "tuplesInTuplesInTuples", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuplesInTuples', 1, (((1, 9), 'abc'), 7))", + "INSERT INTO %s (pk, ck, v) VALUES ('tuplesInTuplesInTuples', 2, (((3, 8), 'def'), 9))", + "UPDATE %s SET v = (((100, 200), 'zyx'), 111) WHERE pk = 'tuplesInTuplesInTuples' AND ck = 1", + "UPDATE %s SET v = null WHERE pk = 'tuplesInTuplesInTuples' AND ck = 2", + "UPDATE %s SET v = (null, 123) WHERE pk = 'tuplesInTuplesInTuples' AND ck = 3", + "UPDATE %s SET v = ((null, 'xyz'), 321) WHERE pk = 'tuplesInTuplesInTuples' AND ck = 4", + }, + }, + + // UDT test cases + { + schemaUDTs, + "udt", + []string{ + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 1, (2, 3, 'abc'))", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 2, {a: 6, c: 'zxcv'})", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 3, (9, 4, 'def'))", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 4, (123, 321, 'ghi'))", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 5, (333, 222, 'jkl'))", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 6, (432, 678, 'mno'))", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 7, (765, 345, 'pqr'))", + "UPDATE %s SET v.b = 41414 WHERE pk = 'udt' AND ck = 2", + "UPDATE %s SET v = null WHERE pk = 'udt' AND ck = 3", + "UPDATE %s SET v = {b: 123456, c: 'tyu'} WHERE pk = 'udt' AND ck = 4", + "INSERT INTO %s (pk, ck, v) VALUES ('udt', 5, (999, 888, 'zxc'))", + "UPDATE %s SET v.c = null WHERE pk = 'udt' AND ck = 6", + "UPDATE %s SET v = {a: 923, b: 123456, c: ''} WHERE pk = 'udt' AND ck = 7", + }, + }, +} + +func TestReplicator(t *testing.T) { + filter := os.Getenv("REPLICATOR_TEST_FILTER") + if filter == "" { + filter = ".*" + } + re := regexp.MustCompile(filter) + + topic := os.Getenv("TEST_TOPIC") + + if topic == "" { + t.Fatal("TEST_TOPIC can't be empty") + } + + // Collect all schemas + schemas := make(map[string]string) + for _, tc := range testCases { + schemas[tc.schema.tableName] = tc.schema.createQuery + } + + sourceAddress := testutils.GetSourceClusterContactPoint() + destinationAddress := testutils.GetDestinationClusterContactPoint() + keyspaceName := testutils.GetUniqueName("test_keyspace") + + sourceSession := createSessionAndSetupSchema(t, sourceAddress, keyspaceName, true, schemas) + defer sourceSession.Close() + + destinationSession := createSessionAndSetupSchema(t, destinationAddress, keyspaceName, false, schemas) + defer destinationSession.Close() + + // Execute all of the queries + for _, tc := range testCases { + if !re.MatchString(tc.pk) { + continue + } + for _, qStr := range tc.queries { + execQuery(t, sourceSession, fmt.Sprintf(qStr, tc.schema.tableName)) + } + } + + t.Log("running replicators") + + adv := scyllacdc.AdvancedReaderConfig{ + ChangeAgeLimit: time.Minute, + PostNonEmptyQueryDelay: 3 * time.Second, + PostEmptyQueryDelay: 3 * time.Second, + PostFailedQueryDelay: 3 * time.Second, + QueryTimeWindowSize: 5 * time.Minute, + ConfidenceWindowSize: time.Millisecond, + } + + schemaNames := make([]string, 0) + for tbl := range schemas { + schemaNames = append(schemaNames, fmt.Sprintf("%s.%s", keyspaceName, tbl)) + } + + logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile) + replicator, err := newReplicator( + context.Background(), + sourceAddress, + destinationAddress, + schemaNames, + "", + &adv, gocql.Quorum, + gocql.Quorum, + "", + logger, + ) + + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + + errC := make(chan error) + go func() { errC <- replicator.Run(ctx) }() + + time.Sleep(time.Second) + + replicator.StopAt(time.Now().Add(time.Second)) + if err := <-errC; err != nil { + t.Fatal(err) + } + + t.Log("validating results") + + // Compare + sourceSet := fetchFullSet(t, sourceSession, schemas) + destinationSet := fetchFullSet(t, destinationSession, schemas) + + failedCount := 0 + + for _, tc := range testCases { + sourceData := sourceSet[tc.pk] + destinationData := destinationSet[tc.pk] + + if len(sourceData) != len(destinationData) { + t.Logf( + "%s: source len %d, destination len %d\n", + tc.pk, + len(sourceData), + len(destinationData), + ) + t.Log(" source:") + for _, row := range sourceData { + t.Logf(" %v", row) + } + t.Log(" dest:") + for _, row := range destinationData { + t.Logf(" %v", row) + } + t.Fail() + failedCount++ + continue + } + + failed := false + for i := 0; i < len(sourceData); i++ { + if !reflect.DeepEqual(sourceData[i], destinationData[i]) { + t.Logf("%s: mismatch", tc.pk) + t.Logf(" source: %v", sourceData[i]) + t.Logf(" dest: %v", destinationData[i]) + failed = true + } + } + + if failed { + t.Fail() + failedCount++ + } else { + t.Logf("%s: OK", tc.pk) + } + } + + if failedCount > 0 { + t.Logf("failed %d/%d test cases", failedCount, len(testCases)) + } +} + +func createSessionAndSetupSchema(t *testing.T, addr string, keyspaceName string, withCdc bool, schemas map[string]string) *gocql.Session { + testutils.CreateKeyspace(t, addr, keyspaceName) + + cfg := gocql.NewCluster(addr) + cfg.Keyspace = keyspaceName + session, err := cfg.CreateSession() + if err != nil { + t.Fatal(err) + } + + for _, udt := range udts { + execQuery(t, session, udt) + } + + for _, tbl := range schemas { + tblQuery := tbl + if withCdc { + tblQuery += " WITH cdc = {'enabled': true, 'preimage': true, 'postimage': true}" + } + execQuery(t, session, tblQuery) + } + + err = session.AwaitSchemaAgreement(context.Background()) + if err != nil { + t.Fatal(err) + } + + return session +} + +func execQuery(t *testing.T, session *gocql.Session, query string) { + t.Logf("executing query %s", query) + err := session.Query(query).Exec() + if err != nil { + t.Fatal(err) + } +} + +func fetchFullSet(t *testing.T, session *gocql.Session, schemas map[string]string) map[string][]map[string]interface{} { + groups := make(map[string][]map[string]interface{}) + + for tbl := range schemas { + data, err := session.Query("SELECT * FROM " + tbl).Iter().SliceMap() + if err != nil { + t.Fatal(err) + } + + for _, row := range data { + pk := row["pk"].(string) + groups[pk] = append(groups[pk], row) + } + } + + return groups +} diff --git a/examples/replicator-gcp-pub/utils.go b/examples/replicator-gcp-pub/utils.go new file mode 100644 index 0000000..dcf5ec0 --- /dev/null +++ b/examples/replicator-gcp-pub/utils.go @@ -0,0 +1,273 @@ +package main + +import "strings" + +// Re-implementation of the type parsing logic from the driver. +// Unlike the driver, this implementation differentiates frozen types +// from non-frozen ones. + +type Type int + +const ( + TypeCustom Type = 0x0000 + TypeAscii Type = 0x0001 + TypeBigInt Type = 0x0002 + TypeBlob Type = 0x0003 + TypeBoolean Type = 0x0004 + TypeCounter Type = 0x0005 + TypeDecimal Type = 0x0006 + TypeDouble Type = 0x0007 + TypeFloat Type = 0x0008 + TypeInt Type = 0x0009 + TypeText Type = 0x000A + TypeTimestamp Type = 0x000B + TypeUUID Type = 0x000C + TypeVarchar Type = 0x000D + TypeVarint Type = 0x000E + TypeTimeUUID Type = 0x000F + TypeInet Type = 0x0010 + TypeDate Type = 0x0011 + TypeTime Type = 0x0012 + TypeSmallInt Type = 0x0013 + TypeTinyInt Type = 0x0014 + TypeDuration Type = 0x0015 + TypeList Type = 0x0020 + TypeMap Type = 0x0021 + TypeSet Type = 0x0022 + TypeUDT Type = 0x0030 + TypeTuple Type = 0x0031 +) + +func (t Type) IsCollection() bool { + switch t { + case TypeList, TypeMap, TypeSet, TypeUDT: + return true + default: + return false + } +} + +type TypeInfo interface { + Type() Type + IsFrozen() bool + Unfrozen() TypeInfo +} + +type FrozenType struct { + Inner TypeInfo +} + +func (ft *FrozenType) Type() Type { + return ft.Inner.Type() +} + +func (ft *FrozenType) IsFrozen() bool { + return true +} + +func (ft *FrozenType) Unfrozen() TypeInfo { + return ft.Inner +} + +type MapType struct { + Key TypeInfo + Value TypeInfo +} + +func (mt *MapType) Type() Type { + return TypeMap +} + +func (mt *MapType) IsFrozen() bool { + return false +} + +func (mt *MapType) Unfrozen() TypeInfo { + return mt +} + +type ListType struct { + Element TypeInfo +} + +func (lt *ListType) Type() Type { + return TypeList +} + +func (lt *ListType) IsFrozen() bool { + return false +} + +func (lt *ListType) Unfrozen() TypeInfo { + return lt +} + +type SetType struct { + Element TypeInfo +} + +func (st *SetType) Type() Type { + return TypeSet +} + +func (st *SetType) IsFrozen() bool { + return false +} + +func (st *SetType) Unfrozen() TypeInfo { + return st +} + +type TupleType struct { + Elements []TypeInfo +} + +func (tt *TupleType) Type() Type { + return TypeTuple +} + +func (tt *TupleType) IsFrozen() bool { + return false +} + +func (tt *TupleType) Unfrozen() TypeInfo { + return tt +} + +type NativeType struct { + RealType Type +} + +func (nt *NativeType) Type() Type { + return nt.RealType +} + +func (nt *NativeType) IsFrozen() bool { + return false +} + +func (nt *NativeType) Unfrozen() TypeInfo { + return nt +} + +type UDTType struct { + Name string +} + +func (ut *UDTType) Type() Type { + return TypeUDT +} + +func (ut *UDTType) IsFrozen() bool { + return false +} + +func (ut *UDTType) Unfrozen() TypeInfo { + return ut +} + +func parseType(str string) TypeInfo { + if strings.HasPrefix(str, "frozen<") { + innerStr := strings.TrimSuffix(strings.TrimPrefix(str, "frozen<"), ">") + return &FrozenType{parseType(innerStr)} + } + if strings.HasPrefix(str, "list<") { + innerStr := strings.TrimSuffix(strings.TrimPrefix(str, "list<"), ">") + return &ListType{parseType(innerStr)} + } + if strings.HasPrefix(str, "set<") { + innerStr := strings.TrimSuffix(strings.TrimPrefix(str, "set<"), ">") + return &SetType{parseType(innerStr)} + } + if strings.HasPrefix(str, "map<") { + innerStr := strings.TrimSuffix(strings.TrimPrefix(str, "map<"), ">") + list := parseTypeList(innerStr) + return &MapType{Key: list[0], Value: list[1]} + } + if strings.HasPrefix(str, "tuple<") { + innerStr := strings.TrimSuffix(strings.TrimPrefix(str, "tuple<"), ">") + list := parseTypeList(innerStr) + return &TupleType{Elements: list} + } + typ := parseNativeType(str) + if typ == TypeUDT { + return &UDTType{Name: str} + } + return &NativeType{RealType: typ} +} + +func parseTypeList(str string) []TypeInfo { + var ret []TypeInfo + var level int + var builder strings.Builder + for _, r := range str { + if r == ',' && level == 0 { + s := strings.TrimSpace(builder.String()) + ret = append(ret, parseType(s)) + builder.Reset() + continue + } + + if r == '<' { + level++ + } else if r == '>' { + level-- + } + builder.WriteRune(r) + } + if builder.Len() != 0 { + s := strings.TrimSpace(builder.String()) + ret = append(ret, parseType(s)) + } + return ret +} + +func parseNativeType(str string) Type { + switch str { + case "ascii": + return TypeAscii + case "bigint": + return TypeBigInt + case "blob": + return TypeBlob + case "boolean": + return TypeBoolean + case "counter": + return TypeCounter + case "date": + return TypeDate + case "decimal": + return TypeDecimal + case "double": + return TypeDouble + case "duration": + return TypeDuration + case "float": + return TypeFloat + case "int": + return TypeInt + case "smallint": + return TypeSmallInt + case "tinyint": + return TypeTinyInt + case "time": + return TypeTime + case "timestamp": + return TypeTimestamp + case "uuid": + return TypeUUID + case "varchar": + return TypeVarchar + case "text": + return TypeText + case "varint": + return TypeVarint + case "timeuuid": + return TypeTimeUUID + case "inet": + return TypeInet + default: + // Assume it's a UDT + return TypeUDT + } +}