Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37298] Added Pluggable Components for BatchStrategy & BufferWrapper in AsyncSinkWriter. #26274

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

Poorvankbhatia
Copy link

What is the purpose of the change

FLIP-509

  • The current AsyncSinkWriter handles batching using fixed size constraints (maxBatchSize & maxBatchSizeInBytes). However, this approach lacks flexibility for more complex batching strategies like grouping records by partition key (important for sinks like Cassandra).
  • This change introduces a pluggable batching mechanism where sink implementers can define custom batching logic instead of relying on fixed queue-based buffering.
  • It also decouples the buffer implementation by replacing the direct use of Deque<RequestEntryWrapper> with a pluggable buffer wrapper, allowing for alternative data structures that optimize lookup, prioritization, or performance.

Brief change log

  • Added BufferWrapper interface to allow pluggable buffering strategies instead of a fixed Deque.
  • Implemented DequeBufferWrapper as the default buffer wrapper using ArrayDeque.
  • Introduced BatchCreator interface to enable custom batching logic.
  • Added SimpleBatchCreator as the default batch creator, replicating existing batch formation logic.
  • Created BatchCreationResult class to encapsulate batch metadata (entries, size, record count).
  • Modified AsyncSinkWriter to incorporate both the pluggable components.

Verifying this change

  • Added tests for BufferWrapper and its default implementation (DequeBufferWrapper).
  • Added tests for BatchCreator to validate the default batching behavior.
  • Ensured batched requests are correctly formed and processed using the new interfaces.
  • Verified that the default implementations (SimpleBatchCreator and DequeBufferWrapper) maintain existing behavior without requiring changes from users.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 8, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@vahmed-hamdy vahmed-hamdy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good, thanks for delivering this. Just a couple of questions and nits.

*
* @param <RequestEntryT> the type of request entry in this batch
*/
@PublicEvolving
Copy link
Contributor

@vahmed-hamdy vahmed-hamdy Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this fits better as @Internal or PublicEvolving? It is not intended to be exposed to public users

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @vahmed-hamdy , So Batch is created by BatchCreator interface, which can be inherited and implemented by the Sink Implementor. So i was under the impression that anything that external developers are expected to use should NOT be @Internal as they might change across releases. Flink sink developers might create custom implementations of BatchCreator or BufferWrapper for better performance or sink-specific logic. But please correct me if i am wrong. Will change it if required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I don't have a strong opinion here, I also see classes like AsyncSinkWriterConfiguration and RateLimitingStrategy marked PublicEvolving for the same reason. let's keep it then

*
* @param <RequestEntryT> the type of the request entries to be batched
*/
@PublicEvolving
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as Batch

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied above.

*
* @param <RequestEntryT> The type of request entries being buffered.
*/
@PublicEvolving
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replied above.

AsyncSinkWriterImpl initialSinkWriter =
new AsyncSinkWriterImplBuilder().context(sinkInitContext).build();

assertThat(initialSinkWriter.getBufferedRequestEntries())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't test using default when passing nulls?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added null BatchCreator & BatchWrapper to AsyncSinkWriterImpl in the test case.

* @param maxBatchSizeInBytes the maximum cumulative size in bytes allowed for any single batch
*/
private SimpleBatchCreator(long maxBatchSizeInBytes) {
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
this.maxBatchSizeInBytes = Preconditions.checkArgument(maxBatchSizeInBytes > 0);

Also can test that

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this and a corresponding test case as well.

Copy link
Contributor

@vahmed-hamdy vahmed-hamdy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks Good, thanks for the huge efforts

* construct a new (retry) request entry from the response and add that back to the queue for
* later retry.
* <p>The buffer stores {@link RequestEntryWrapper} objects rather than raw {@link
* RequestEntryT} instances, as buffering wrapped entries allows for better tracking of size and
Copy link
Contributor

@davidradl davidradl Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not seeing the main code issuing retry anywhere ie. add(entry, true) .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry logic is handled inside the completeRequest() method.

Specifically, when an entry fails, it is re-added at the head of the buffer using:

while (iterator.hasPrevious()) {
    addEntryToBuffer(iterator.previous(), true);
}

This ensures failed entries are retried on the next flush.
Let me know if you’d like any further clarification! This part of the code has not been changed and it remains the same. Reference

@davidradl
Copy link
Contributor

could we have the docs change to publicise this change and advice on supplying pluggable strategies and BufferWrapper.

@Poorvankbhatia
Copy link
Author

could we have the docs change to publicise this change and advice on supplying pluggable strategies and BufferWrapper.

Thanks for the suggestion! The JavaDocs for BatchCreator and BufferWrapper already document their purpose and usage.

Regarding additional documentation changes since there's an ongoing vote on this FLIP, would it make sense to wait until finalized and then follow up with documentation updates in a separate PR?

Let me know what you think—happy to adjust based on the preferred approach! :)

@Poorvankbhatia Poorvankbhatia force-pushed the FLINK-37298-async_custom_batch branch from 8b97772 to f3c3386 Compare March 13, 2025 09:17
@Poorvankbhatia
Copy link
Author

Hey @dannycranmer can you help with this review?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants