Skip to content

[FLINK-37298] Added Pluggable Components for BatchStrategy & BufferWrapper in AsyncSinkWriter. #26274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 15, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -83,7 +80,16 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable

private final int maxBatchSize;
private final int maxBufferedRequests;
private final long maxBatchSizeInBytes;

/**
* Threshold in bytes to trigger a flush from the buffer.
*
* <p>This is derived from {@code maxBatchSizeInBytes} in the configuration, but is only used
* here to decide when the buffer should be flushed. The actual batch size limit is now enforced
* by the {@link BatchCreator}.
*/
private final long flushThresholdBytes;

private final long maxTimeInBufferMS;
private final long maxRecordSizeInBytes;

Expand Down Expand Up @@ -112,8 +118,14 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
* construct a new (retry) request entry from the response and add that back to the queue for
* later retry.
*/
private final Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries =
new ArrayDeque<>();
private final RequestBuffer<RequestEntryT> bufferedRequestEntries;

/**
* Batch component responsible for forming a batch of request entries from the buffer when the
* sink is ready to flush. This determines the logic of including entries in a batch from the
* buffered requests.
*/
private final BatchCreator<RequestEntryT> batchCreator;

/**
* Tracks all pending async calls that have been executed since the last checkpoint. Calls that
Expand All @@ -126,12 +138,6 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
*/
private int inFlightRequestsCount;

/**
* Tracks the cumulative size of all elements in {@code bufferedRequestEntries} to facilitate
* the criterion for flushing after {@code maxBatchSizeInBytes} is reached.
*/
private double bufferedRequestEntriesTotalSizeInBytes;

private boolean existsActiveTimerCallback = false;

/**
Expand Down Expand Up @@ -213,11 +219,32 @@ protected void submitRequestEntries(
*/
protected abstract long getSizeInBytes(RequestEntryT requestEntry);

/**
* This constructor is deprecated. Users should use {@link #AsyncSinkWriter(ElementConverter,
* WriterInitContext, AsyncSinkWriterConfiguration, Collection, BatchCreator, RequestBuffer)}.
*/
@Deprecated
public AsyncSinkWriter(
ElementConverter<InputT, RequestEntryT> elementConverter,
WriterInitContext context,
AsyncSinkWriterConfiguration configuration,
Collection<BufferedRequestState<RequestEntryT>> states) {
this(
elementConverter,
context,
configuration,
states,
new SimpleBatchCreator<>(configuration.getMaxBatchSizeInBytes()),
new DequeRequestBuffer<>());
}

public AsyncSinkWriter(
ElementConverter<InputT, RequestEntryT> elementConverter,
WriterInitContext context,
AsyncSinkWriterConfiguration configuration,
Collection<BufferedRequestState<RequestEntryT>> states,
BatchCreator<RequestEntryT> batchCreator,
RequestBuffer<RequestEntryT> bufferedRequestEntries) {
this.elementConverter = elementConverter;
this.mailboxExecutor = context.getMailboxExecutor();
this.timeService = context.getProcessingTimeService();
Expand All @@ -237,23 +264,26 @@ public AsyncSinkWriter(
"The maximum allowed size in bytes per flush must be greater than or equal to the"
+ " maximum allowed size in bytes of a single record.");
Preconditions.checkNotNull(configuration.getRateLimitingStrategy());
Preconditions.checkNotNull(
batchCreator, "batchCreator must not be null; required for creating batches.");
Preconditions.checkNotNull(
bufferedRequestEntries,
"bufferedRequestEntries must not be null; holds pending request data.");
this.maxBatchSize = configuration.getMaxBatchSize();
this.maxBufferedRequests = configuration.getMaxBufferedRequests();
this.maxBatchSizeInBytes = configuration.getMaxBatchSizeInBytes();
this.flushThresholdBytes = configuration.getMaxBatchSizeInBytes();
this.maxTimeInBufferMS = configuration.getMaxTimeInBufferMS();
this.maxRecordSizeInBytes = configuration.getMaxRecordSizeInBytes();
this.rateLimitingStrategy = configuration.getRateLimitingStrategy();
this.requestTimeoutMS = configuration.getRequestTimeoutMS();
this.failOnTimeout = configuration.isFailOnTimeout();

this.inFlightRequestsCount = 0;
this.bufferedRequestEntriesTotalSizeInBytes = 0;

this.metrics = context.metricGroup();
this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter();
this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();

this.batchCreator = batchCreator;
this.bufferedRequestEntries = bufferedRequestEntries;
this.fatalExceptionCons =
exception ->
mailboxExecutor.execute(
Expand Down Expand Up @@ -303,7 +333,7 @@ public void write(InputT element, Context context) throws IOException, Interrupt
private void nonBlockingFlush() throws InterruptedException {
while (!rateLimitingStrategy.shouldBlock(createRequestInfo())
&& (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
|| bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
|| bufferedRequestEntries.totalSizeInBytes() >= flushThresholdBytes)) {
flush();
}
}
Expand All @@ -327,7 +357,12 @@ private void flush() throws InterruptedException {
requestInfo = createRequestInfo();
}

List<RequestEntryT> batch = createNextAvailableBatch(requestInfo);
Batch<RequestEntryT> batchCreationResult =
batchCreator.createNextBatch(requestInfo, bufferedRequestEntries);
List<RequestEntryT> batch = batchCreationResult.getBatchEntries();
numBytesOutCounter.inc(batchCreationResult.getSizeInBytes());
numRecordsOutCounter.inc(batchCreationResult.getRecordCount());

if (batch.isEmpty()) {
return;
}
Expand All @@ -344,31 +379,6 @@ private int getNextBatchSize() {
return Math.min(getNextBatchSizeLimit(), bufferedRequestEntries.size());
}

/**
* Creates the next batch of request entries while respecting the {@code maxBatchSize} and
* {@code maxBatchSizeInBytes}. Also adds these to the metrics counters.
*/
private List<RequestEntryT> createNextAvailableBatch(RequestInfo requestInfo) {
List<RequestEntryT> batch = new ArrayList<>(requestInfo.getBatchSize());

long batchSizeBytes = 0;
for (int i = 0; i < requestInfo.getBatchSize(); i++) {
long requestEntrySize = bufferedRequestEntries.peek().getSize();
if (batchSizeBytes + requestEntrySize > maxBatchSizeInBytes) {
break;
}
RequestEntryWrapper<RequestEntryT> elem = bufferedRequestEntries.remove();
batch.add(elem.getRequestEntry());
bufferedRequestEntriesTotalSizeInBytes -= requestEntrySize;
batchSizeBytes += requestEntrySize;
}

numRecordsOutCounter.inc(batch.size());
numBytesOutCounter.inc(batchSizeBytes);

return batch;
}

/**
* Marks an in-flight request as completed and prepends failed requestEntries back to the
* internal requestEntry buffer for later retry.
Expand Down Expand Up @@ -409,13 +419,7 @@ private void addEntryToBuffer(RequestEntryWrapper<RequestEntryT> entry, boolean
entry.getSize(), maxRecordSizeInBytes));
}

if (insertAtHead) {
bufferedRequestEntries.addFirst(entry);
} else {
bufferedRequestEntries.add(entry);
}

bufferedRequestEntriesTotalSizeInBytes += entry.getSize();
bufferedRequestEntries.add(entry, insertAtHead);
}

/**
Expand All @@ -428,7 +432,7 @@ private void addEntryToBuffer(RequestEntryWrapper<RequestEntryT> entry, boolean
*/
@Override
public void flush(boolean flush) throws InterruptedException {
while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 && flush)) {
while (inFlightRequestsCount > 0 || (!bufferedRequestEntries.isEmpty() && flush)) {
yieldIfThereExistsInFlightRequests();
if (flush) {
flush();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.base.sink.writer;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;
import java.util.List;

/**
* A container for the result of creating a batch of request entries, including:
*
* <ul>
* <li>The actual list of entries forming the batch
* <li>The total size in bytes of those entries
* <li>The total number of entries in the batch
* </ul>
*
* <p>Instances of this class are typically created by a {@link BatchCreator} to summarize which
* entries have been selected for sending downstream and to provide any relevant metrics for
* tracking, such as the byte size or the record count.
*
* @param <RequestEntryT> the type of request entry in this batch
*/
@PublicEvolving
public class Batch<RequestEntryT extends Serializable> {

/** The list of request entries in this batch. */
private final List<RequestEntryT> batchEntries;

/** The total size in bytes of the entire batch. */
private final long sizeInBytes;

/** The total number of entries in the batch. */
private final int recordCount;

/**
* Creates a new {@code Batch} with the specified entries, total size, and record count.
*
* @param requestEntries the list of request entries that form the batch
* @param sizeInBytes the total size in bytes of the entire batch
*/
public Batch(List<RequestEntryT> requestEntries, long sizeInBytes) {
this.batchEntries = requestEntries;
this.sizeInBytes = sizeInBytes;
this.recordCount = requestEntries.size();
}

/**
* Returns the list of request entries in this batch.
*
* @return a list of request entries for the batch
*/
public List<RequestEntryT> getBatchEntries() {
return batchEntries;
}

/**
* Returns the total size in bytes of the batch.
*
* @return the batch's cumulative byte size
*/
public long getSizeInBytes() {
return sizeInBytes;
}

/**
* Returns the total number of entries in the batch.
*
* @return the record count in the batch
*/
public int getRecordCount() {
return recordCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.base.sink.writer;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.sink.writer.strategy.RequestInfo;

import java.io.Serializable;
import java.util.Deque;

/**
* A pluggable interface for forming batches of request entries from a buffer. Implementations
* control how many entries are grouped together and in what manner before sending them downstream.
*
* <p>The {@code AsyncSinkWriter} (or similar sink component) calls {@link
* #createNextBatch(RequestInfo, RequestBuffer)} (RequestInfo, Deque)} when it decides to flush or
* otherwise gather a new batch of elements. For instance, a batch creator might limit the batch by
* the number of elements, total payload size, or any custom partition-based strategy.
*
* @param <RequestEntryT> the type of the request entries to be batched
*/
@PublicEvolving
public interface BatchCreator<RequestEntryT extends Serializable> {

/**
* Creates the next batch of request entries based on the provided {@link RequestInfo} and the
* currently buffered entries.
*
* <p>This method is expected to:
*
* <ul>
* <li>Mutate the {@code bufferedRequestEntries} by polling/removing elements from it.
* <li>Return a batch containing the selected entries.
* </ul>
*
* <p><strong>Thread-safety note:</strong> This method is called from {@code flush()}, which is
* executed on the Flink main thread. Implementations should assume single-threaded access and
* must not be shared across subtasks.
*
* <p><strong>Contract:</strong> Implementations must ensure that any entry removed from {@code
* bufferedRequestEntries} is either added to the returned batch or properly handled (e.g.,
* retried or logged), and not silently dropped.
*
* @param requestInfo information about the desired request properties or constraints (e.g., an
* allowed batch size or other relevant hints)
* @param bufferedRequestEntries a collection ex: {@link Deque} of all currently buffered
* entries waiting to be grouped into batches
* @return a {@link Batch} containing the new batch of entries along with metadata about the
* batch (e.g., total byte size, record count)
*/
Batch<RequestEntryT> createNextBatch(
RequestInfo requestInfo, RequestBuffer<RequestEntryT> bufferedRequestEntries);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class BufferedRequestState<RequestEntryT extends Serializable> implements
private final List<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries;
private final long stateSize;

@Deprecated
public BufferedRequestState(Deque<RequestEntryWrapper<RequestEntryT>> bufferedRequestEntries) {
this.bufferedRequestEntries = new ArrayList<>(bufferedRequestEntries);
this.stateSize = calculateStateSize();
Expand All @@ -49,6 +50,11 @@ public BufferedRequestState(List<RequestEntryWrapper<RequestEntryT>> bufferedReq
this.stateSize = calculateStateSize();
}

public BufferedRequestState(RequestBuffer<RequestEntryT> requestBuffer) {
this.bufferedRequestEntries = new ArrayList<>(requestBuffer.getBufferedState());
this.stateSize = calculateStateSize();
}

public List<RequestEntryWrapper<RequestEntryT>> getBufferedRequestEntries() {
return bufferedRequestEntries;
}
Expand Down
Loading