Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to create separate consumer for every Table #11

Closed
JonasHiltl opened this issue Jul 17, 2022 · 5 comments
Closed

How to create separate consumer for every Table #11

JonasHiltl opened this issue Jul 17, 2022 · 5 comments
Assignees
Labels
bug Something isn't working

Comments

@JonasHiltl
Copy link

I have created my own factory struct to dinamically create consumers based on the input Tablenames.
Both the FriendRelationConsumer and PartyFavoritesConsumer implement the ChangeConsumer interface, but they both don't receive changes to the table.

func (f *factory) CreateChangeConsumer(ctx context.Context, input scyllacdc.CreateChangeConsumerInput) (scyllacdc.ChangeConsumer, error) {
	f.nextID++

	reporter := scyllacdc.NewPeriodicProgressReporter(f.logger, time.Minute, input.ProgressReporter)
	reporter.Start(ctx)

	splitTableName := strings.SplitN(input.TableName, ".", 2)
	if len(splitTableName) < 2 {
		return nil, fmt.Errorf("table name is not fully qualified: %s", input.TableName)
	}

	if splitTableName[1] == consumer.FRIEND_RELATIONS_TABLE {
		return &consumer.FriendRelationConsumer{
			Id:        f.nextID - 1,
			TableName: splitTableName[1],
			Stream:    f.stream,
			Reporter:  reporter,
		}, nil
	} else if splitTableName[1] == consumer.PARTY_FAVORITES_TABLE {
		return &consumer.PartyFavoritesConsumer{
			Id:        f.nextID - 1,
			TableName: splitTableName[1],
			Stream:    f.stream,
			Reporter:  reporter,
		}, nil
	}
	return nil, errors.New("Unsupported Table")
}

Is this pattern supported or is their another way to handle separate Consumers for every table?
Or should I just create a global consumer that gets all changes from everyt table?

@JonasHiltl JonasHiltl changed the title How to create separate cosumer for every Table How to create separate consumer for every Table Jul 17, 2022
@piodul
Copy link
Collaborator

piodul commented Jul 18, 2022

Yes, this pattern is supported - e.g. the example/replicator application uses it.

Could you show your ReaderConfig? Please also try setting Logger in the config and see if any errors are printed - look at example/simple-printer to see how to do it.

@JonasHiltl
Copy link
Author

This is my reader config.

factory := newFactory(logger, stream) // return *factory
logger := log.New(os.Stderr, "", log.Ldate|log.Lmicroseconds|log.Lshortfile)
progressManager, err := scyllacdc.NewTableBackedProgressManager(sess.Session, "cdc_progress", "scylla_sync")

...
cfg := &scyllacdc.ReaderConfig{
  Session:               sess.Session,
  TableNames:            []string{c.CQL_KEYSPACE + "." + consumer.FRIEND_RELATIONS_TABLE, c.CQL_KEYSPACE + "." + consumer.PARTY_FAVORITES_TABLE},
  ProgressManager:       progressManager,
  ChangeConsumerFactory: factory,
  Logger:                logger,
  Advanced: scyllacdc.AdvancedReaderConfig{
    PostEmptyQueryDelay:    time.Second * 1,
    PostNonEmptyQueryDelay: time.Second * 1,
    PostFailedQueryDelay:   time.Second * 1,
    ConfidenceWindowSize:   time.Second * 1,
    QueryTimeWindowSize:    time.Second * 10,
    ChangeAgeLimit:         time.Second * 10,
  },
}
type factory struct {
  logger scyllacdc.Logger
  stream stream.Stream // Nats connection
  nextID int
}

func newFactory(logger scyllacdc.Logger, stream stream.Stream) *factory {
  return &factory{
    logger: logger,
    stream: stream,
  }
}

These are my logs

2022/07/18 16:20:31 Connected to keyspace test and hosts [host.docker.internal] 
2022/07/18 16:20:31.745617 reader.go:191: last saved progress was at generation 2022-07-11 17:51:08.092 +0000 UTC
2022/07/18 16:20:31.761408 topology.go:136: starting generation fetcher loop
2022/07/18 16:20:31.776091 topology.go:272: pushing generation 2022-07-11 17:51:08.092 +0000 UTC
2022/07/18 16:20:31.776123 reader.go:242: starting reading generation 2022-07-11 17:51:08.092 +0000 UTC from timestamp 2022-07-11 17:51:08.092 +0000 UTC
2022/07/18 16:20:31.780330 reader.go:251: grouped 3072 streams into 256 batches

I already had a look at the replicator Example but I couldn't find the lines in the CreateChangeConsumer funtion were different consumers get created based on the input Tablename.
Could you show me were it is implemented in the examples?

@piodul
Copy link
Collaborator

piodul commented Jul 18, 2022

I already had a look at the replicator Example but I couldn't find the lines in the CreateChangeConsumer funtion were different consumers get created based on the input Tablename.
Could you show me were it is implemented in the examples?

The replicator example creates a separate consumer for each stream - see (*replicatorFactory).CreateChangeConsumer() and NewDeltaReplicator().

If by "different consumers" you meant consumers of different type, then it should be supported as well - the library doesn't assume anywhere that all consumers must have the same type.

2022/07/18 16:20:31.776123 reader.go:242: starting reading generation 2022-07-11 17:51:08.092 +0000 UTC from timestamp 2022-07-11 17:51:08.092 +0000 UTC

Judging by the logs, this is most likely caused by the bug described in #8. If you start your application for the first time but it won't consume any rows (or too little of them), then the progress for individual streams won't be saved and, on restart, the library will default to the time of the most recent generation (which usually happens during a topology change) before the application was started - in your case, it looks like it was around 2022-07-11 17:51:08.092 +0000 UTC.

In order to fix this situation, you can drop/truncate the progress table, then ensure that enough data is written to the CDC log - the library will start reading from Now() - ChangeAgeLimit. Then, if you generate enough data in the CDC log so that all consumers process data, the progress will be stored properly.

Alternatively, you can disable the progress saving feature.

@JonasHiltl
Copy link
Author

I have disabled saving progress for now, and it works. I can receive changes with both Consumer types.
Thanks for linking the bug.

@dkropachev
Copy link
Collaborator

Issue was resolved, closing it.

@dkropachev dkropachev self-assigned this Aug 17, 2024
@dkropachev dkropachev added the bug Something isn't working label Aug 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants