@@ -19,15 +19,15 @@ type OperationType int8
19
19
20
20
const (
21
21
PreImage OperationType = 0
22
- Update = 1
23
- Insert = 2
24
- RowDelete = 3
25
- PartitionDelete = 4
26
- RangeDeleteStartInclusive = 5
27
- RangeDeleteStartExclusive = 6
28
- RangeDeleteEndInclusive = 7
29
- RangeDeleteEndExclusive = 8
30
- PostImage = 9
22
+ Update OperationType = 1
23
+ Insert OperationType = 2
24
+ RowDelete OperationType = 3
25
+ PartitionDelete OperationType = 4
26
+ RangeDeleteStartInclusive OperationType = 5
27
+ RangeDeleteStartExclusive OperationType = 6
28
+ RangeDeleteEndInclusive OperationType = 7
29
+ RangeDeleteEndExclusive OperationType = 8
30
+ PostImage OperationType = 9
31
31
)
32
32
33
33
// String is needed to implement the fmt.Stringer interface.
@@ -512,6 +512,7 @@ type changeConsumerFuncInstance struct {
512
512
func (ccfi * changeConsumerFuncInstance ) End () error {
513
513
return nil
514
514
}
515
+
515
516
func (ccfi * changeConsumerFuncInstance ) Consume (ctx context.Context , change Change ) error {
516
517
return ccfi .f (ctx , ccfi .tableName , change )
517
518
}
@@ -609,7 +610,7 @@ func (crq *changeRowQuerier) queryRange(start, end gocql.UUID) (*changeRowIterat
609
610
}
610
611
611
612
// For a given range, returns the cdc$time of the earliest rows for each stream.
612
- func (crq * changeRowQuerier ) findFirstRowsInRange (start , end gocql.UUID ) (map [string ]gocql.UUID , error ) {
613
+ func (crq * changeRowQuerier ) findFirstRowsInRange (start , end gocql.UUID ) (map [string ]gocql.UUID , error ) { // nolint:unused
613
614
queryStr := fmt .Sprintf (
614
615
"SELECT \" cdc$stream_id\" , \" cdc$time\" FROM %s.%s%s WHERE %s AND \" cdc$time\" > ? AND \" cdc$time\" <= ? PER PARTITION LIMIT 1 BYPASS CACHE" ,
615
616
crq .keyspaceName ,
@@ -644,46 +645,46 @@ func (crq *changeRowQuerier) findFirstRowsInRange(start, end gocql.UUID) (map[st
644
645
//
645
646
// Gocql has two main methods of retrieving row data:
646
647
//
647
- // - If you know what columns will be returned by the query and which types
648
- // to use to represent them, you use (*Iter).Scan(...) function and pass
649
- // a list of pointers to values of types you chose for the representation.
650
- // For example, if `x` is int, `Scan(&x)` will put the value of the column
651
- // directly to the `x` variable, setting it to 0 if the column was null.
652
- // - If you don't know which columns will be returned and what are their
653
- // types, you can use (*Iter).MapScan, which returns a map from column
654
- // name to the column value. Gocql automatically chooses a type which
655
- // will be used to represent the column value.
648
+ // - If you know what columns will be returned by the query and which types
649
+ // to use to represent them, you use (*Iter).Scan(...) function and pass
650
+ // a list of pointers to values of types you chose for the representation.
651
+ // For example, if `x` is int, `Scan(&x)` will put the value of the column
652
+ // directly to the `x` variable, setting it to 0 if the column was null.
653
+ // - If you don't know which columns will be returned and what are their
654
+ // types, you can use (*Iter).MapScan, which returns a map from column
655
+ // name to the column value. Gocql automatically chooses a type which
656
+ // will be used to represent the column value.
656
657
//
657
658
// In our interface, we would like to use an API like MapScan, but there
658
659
// are some problems which are addressed by changeRowIterator:
659
660
//
660
- // - Gocql's choice of the type used to represent column values is not the best
661
- // for CDC use case. First and foremost, it's very important to differentiate
662
- // Go's default value for a type from a null. For example, for int columns,
663
- // MapScan chooses Go's int type, and sets it to 0 in both cases if it was 0
664
- // or null in the table. For CDC, this means completely different things -
665
- // 0 would mean that the 0 value was written to that column, while null would
666
- // mean that this column value was not changed.
667
- // Fortunately, we can solve this issue by using a pointer-to-type (e.g. *int).
668
- // Gocql will set it to null if it was null in the database, and set it
669
- // to a pointer to a proper value if it was not null.
661
+ // - Gocql's choice of the type used to represent column values is not the best
662
+ // for CDC use case. First and foremost, it's very important to differentiate
663
+ // Go's default value for a type from a null. For example, for int columns,
664
+ // MapScan chooses Go's int type, and sets it to 0 in both cases if it was 0
665
+ // or null in the table. For CDC, this means completely different things -
666
+ // 0 would mean that the 0 value was written to that column, while null would
667
+ // mean that this column value was not changed.
668
+ // Fortunately, we can solve this issue by using a pointer-to-type (e.g. *int).
669
+ // Gocql will set it to null if it was null in the database, and set it
670
+ // to a pointer to a proper value if it was not null.
670
671
//
671
- // - Similarly to above, UDTs suffer from a similar problem - they are,
672
- // by default, represented by a map[string]interface{} which holds non-pointer
673
- // values of UDT's elements. Fortunately, we can provide a custom type
674
- // which uses pointers to UDT's elements - see udtWithNulls.
672
+ // - Similarly to above, UDTs suffer from a similar problem - they are,
673
+ // by default, represented by a map[string]interface{} which holds non-pointer
674
+ // values of UDT's elements. Fortunately, we can provide a custom type
675
+ // which uses pointers to UDT's elements - see udtWithNulls.
675
676
//
676
- // - Tuples are handled in a peculiar way - instead of returning, for example,
677
- // an []interface{} which holds tuple values, Scan expects that a pointer
678
- // for each tuple element will be provided, and MapScan puts each tuple
679
- // element under a separate key in the map. This creates a problem - it's
680
- // impossible to differentiate a tuple with all fields set to null, and
681
- // a tuple that is just a null. In CDC, the first means an overwrite of the
682
- // column, and the second means that the column should not be changed.
683
- // This is worked around by using the writetime(X) function on the tuple
684
- // column - this function returns null iff column X was null.
685
- // Moreover, tuples are represented as an []interface{} slice containing
686
- // pointers to tuple elements.
677
+ // - Tuples are handled in a peculiar way - instead of returning, for example,
678
+ // an []interface{} which holds tuple values, Scan expects that a pointer
679
+ // for each tuple element will be provided, and MapScan puts each tuple
680
+ // element under a separate key in the map. This creates a problem - it's
681
+ // impossible to differentiate a tuple with all fields set to null, and
682
+ // a tuple that is just a null. In CDC, the first means an overwrite of the
683
+ // column, and the second means that the column should not be changed.
684
+ // This is worked around by using the writetime(X) function on the tuple
685
+ // column - this function returns null iff column X was null.
686
+ // Moreover, tuples are represented as an []interface{} slice containing
687
+ // pointers to tuple elements.
687
688
type changeRowIterator struct {
688
689
iter * gocql.Iter
689
690
columnValues []interface {}
0 commit comments