Skip to content

Commit 415783d

Browse files
committed
feat(zerozone2): add overload protection (#3002)
Signed-off-by: Robin Han <[email protected]>
1 parent 1ef8405 commit 415783d

File tree

5 files changed

+107
-33
lines changed

5 files changed

+107
-33
lines changed

core/src/main/java/kafka/automq/zerozone/DefaultLinkRecordDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ public CompletableFuture<StreamRecordBatch> decode(StreamRecordBatch src) {
6767
streamRecordBatch.encoded(SnapshotReadCache.ENCODE_ALLOC);
6868
return streamRecordBatch;
6969
} finally {
70-
src.release();
7170
buf.release();
7271
}
7372
}).whenComplete((rst, ex) -> {
73+
src.release();
7474
if (ex != null) {
7575
LOGGER.error("Error while decoding link record, link={}", linkRecord, ex);
7676
}

core/src/main/java/kafka/automq/zerozone/ObjectRouterChannel.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.automq.stream.s3.wal.impl.object.ObjectWALService;
2828
import com.automq.stream.utils.FutureUtil;
2929
import com.automq.stream.utils.LogContext;
30+
import com.automq.stream.utils.Threads;
3031

3132
import org.slf4j.Logger;
3233

@@ -44,6 +45,7 @@
4445

4546
public class ObjectRouterChannel implements RouterChannel {
4647
private static final ExecutorService ASYNC_EXECUTOR = Executors.newCachedThreadPool();
48+
private static final long OVER_CAPACITY_RETRY_DELAY_MS = 1000L;
4749
private final Logger logger;
4850
private final AtomicLong mockOffset = new AtomicLong(0);
4951
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -82,20 +84,30 @@ public CompletableFuture<AppendResult> append(int targetNodeId, short orderHint,
8284

8385
CompletableFuture<AppendResult> append0(int targetNodeId, short orderHint, ByteBuf data) {
8486
StreamRecordBatch record = new StreamRecordBatch(targetNodeId, 0, mockOffset.incrementAndGet(), 1, data);
85-
try {
86-
return wal.append(TraceContext.DEFAULT, record).thenApply(walRst -> {
87-
readLock.lock();
88-
try {
89-
long epoch = this.channelEpoch;
90-
ChannelOffset channelOffset = ChannelOffset.of(channelId, orderHint, nodeId, targetNodeId, walRst.recordOffset().buffer());
91-
channelEpoch2LastRecordOffset.put(epoch, walRst.recordOffset());
92-
return new AppendResult(epoch, channelOffset.byteBuf());
93-
} finally {
94-
readLock.unlock();
95-
}
96-
});
97-
} catch (OverCapacityException e) {
98-
return CompletableFuture.failedFuture(e);
87+
record.encoded();
88+
record.retain();
89+
for (; ; ) {
90+
try {
91+
return wal.append(TraceContext.DEFAULT, record).thenApply(walRst -> {
92+
readLock.lock();
93+
try {
94+
long epoch = this.channelEpoch;
95+
ChannelOffset channelOffset = ChannelOffset.of(channelId, orderHint, nodeId, targetNodeId, walRst.recordOffset().buffer());
96+
channelEpoch2LastRecordOffset.put(epoch, walRst.recordOffset());
97+
return new AppendResult(epoch, channelOffset.byteBuf());
98+
} finally {
99+
readLock.unlock();
100+
}
101+
}).whenComplete((r, e) -> record.release());
102+
} catch (OverCapacityException e) {
103+
logger.warn("OverCapacityException occurred while appending, err={}", e.getMessage());
104+
// Use block-based delayed retries for network backpressure.
105+
Threads.sleep(OVER_CAPACITY_RETRY_DELAY_MS);
106+
} catch (Throwable e) {
107+
logger.error("[UNEXPECTED], append wal fail", e);
108+
record.release();
109+
return CompletableFuture.failedFuture(e);
110+
}
99111
}
100112
}
101113

core/src/main/java/kafka/automq/zerozone/RouterInV2.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@
4747
import java.util.Collections;
4848
import java.util.List;
4949
import java.util.Map;
50-
import java.util.Queue;
50+
import java.util.concurrent.ArrayBlockingQueue;
51+
import java.util.concurrent.BlockingQueue;
5152
import java.util.concurrent.CompletableFuture;
52-
import java.util.concurrent.ConcurrentLinkedQueue;
5353
import java.util.concurrent.atomic.AtomicInteger;
5454
import java.util.stream.Collectors;
5555

@@ -71,7 +71,7 @@ public class RouterInV2 implements NonBlockingLocalRouterHandler {
7171
private final String rack;
7272
private final RouterInProduceHandler localAppendHandler;
7373
private RouterInProduceHandler routerInProduceHandler;
74-
private final Queue<PartitionProduceRequest> unpackLinkQueue = new ConcurrentLinkedQueue<>();
74+
private final BlockingQueue<PartitionProduceRequest> unpackLinkQueue = new ArrayBlockingQueue<>(Systems.CPU_CORES * 8192);
7575
private final EventLoop[] appendEventLoops;
7676
private final FastThreadLocal<RequestLocal> requestLocals = new FastThreadLocal<>() {
7777
@Override
@@ -115,9 +115,9 @@ private CompletableFuture<AutomqZoneRouterResponse> handleZoneRouterRequest0(Aut
115115
for (ByteBuf channelOffset : routerRecord.channelOffsets()) {
116116
PartitionProduceRequest partitionProduceRequest = new PartitionProduceRequest(ChannelOffset.of(channelOffset));
117117
partitionProduceRequest.unpackLinkCf = routerChannel.get(channelOffset);
118-
unpackLinkQueue.add(partitionProduceRequest);
118+
addToUnpackLinkQueue(partitionProduceRequest);
119119
partitionProduceRequest.unpackLinkCf.whenComplete((rst, ex) -> {
120-
if (ex != null) {
120+
if (ex == null) {
121121
size.addAndGet(rst.readableBytes());
122122
}
123123
handleUnpackLink();
@@ -165,6 +165,16 @@ private void handleUnpackLink() {
165165
}
166166
}
167167

168+
private void addToUnpackLinkQueue(PartitionProduceRequest req) {
169+
for (;;) {
170+
try {
171+
unpackLinkQueue.put(req);
172+
return;
173+
} catch (InterruptedException ignored) {
174+
}
175+
}
176+
}
177+
168178
@Override
169179
public CompletableFuture<AutomqZoneRouterResponseData.Response> append(
170180
ChannelOffset channelOffset,

core/src/main/java/kafka/automq/zerozone/RouterOutV2.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ public void handleProduceAppendProxy(ProduceRequestArgs args) {
109109
}
110110
ZeroZoneMetricsManager.PROXY_REQUEST_LATENCY.record(time.nanoseconds() - startNanos);
111111
});
112+
}).exceptionally(ex -> {
113+
LOGGER.error("Exception in processing append proxies", ex);
114+
// Make the producer retry send.
115+
responseMap.put(tp, errorPartitionResponse(Errors.LEADER_NOT_AVAILABLE));
116+
return null;
112117
});
113118
cfList.add(proxyCf);
114119
}
@@ -144,6 +149,10 @@ interface Proxy {
144149
void send(ProxyRequest request);
145150
}
146151

152+
static ProduceResponse.PartitionResponse errorPartitionResponse(Errors error) {
153+
return new ProduceResponse.PartitionResponse(error, -1, -1, -1, -1, Collections.emptyList(), "");
154+
}
155+
147156
static class LocalProxy implements Proxy {
148157
private final NonBlockingLocalRouterHandler localRouterHandler;
149158

@@ -341,7 +350,7 @@ public void completeWithUnknownError() {
341350
}
342351

343352
private void completeWithError(Errors errors) {
344-
ProduceResponse.PartitionResponse rst = new ProduceResponse.PartitionResponse(errors, -1, -1, -1, -1, Collections.emptyList(), "");
353+
ProduceResponse.PartitionResponse rst = errorPartitionResponse(errors);
345354
cf.complete(rst);
346355
}
347356
}

s3stream/src/main/java/com/automq/stream/s3/cache/SnapshotReadCache.java

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@
3535
import java.util.Map;
3636
import java.util.Queue;
3737
import java.util.Set;
38+
import java.util.concurrent.ArrayBlockingQueue;
39+
import java.util.concurrent.BlockingQueue;
3840
import java.util.concurrent.CompletableFuture;
3941
import java.util.concurrent.ConcurrentLinkedQueue;
4042
import java.util.concurrent.CopyOnWriteArrayList;
4143
import java.util.concurrent.ExecutionException;
42-
import java.util.concurrent.Semaphore;
4344
import java.util.concurrent.TimeUnit;
4445
import java.util.concurrent.atomic.AtomicLong;
4546
import java.util.function.Function;
@@ -74,7 +75,8 @@ public class SnapshotReadCache {
7475
private final LinkRecordDecoder linkRecordDecoder;
7576
private final Time time = Time.SYSTEM;
7677

77-
public SnapshotReadCache(StreamManager streamManager, LogCache cache, ObjectStorage objectStorage, LinkRecordDecoder linkRecordDecoder) {
78+
public SnapshotReadCache(StreamManager streamManager, LogCache cache, ObjectStorage objectStorage,
79+
LinkRecordDecoder linkRecordDecoder) {
7880
activeStreams = CacheBuilder.newBuilder()
7981
.expireAfterAccess(10, TimeUnit.MINUTES)
8082
.removalListener((RemovalListener<Long, Boolean>) notification ->
@@ -128,7 +130,8 @@ public synchronized CompletableFuture<Void> replay(List<S3ObjectMetadata> object
128130
return objectReplay.replay(objects);
129131
}
130132

131-
public synchronized CompletableFuture<Void> replay(WriteAheadLog confirmWAL, RecordOffset startOffset, RecordOffset endOffset) {
133+
public synchronized CompletableFuture<Void> replay(WriteAheadLog confirmWAL, RecordOffset startOffset,
134+
RecordOffset endOffset) {
132135
long startNanos = time.nanoseconds();
133136
return walReplay.replay(confirmWAL, startOffset, endOffset)
134137
.whenComplete((nil, ex) -> REPLAY_LATENCY.record(time.nanoseconds() - startNanos));
@@ -153,32 +156,69 @@ private void activeStream(long streamId) {
153156
}
154157

155158
class WalReplay {
159+
private static final long TASK_WAITING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(5);
160+
private static final int MAX_WAITING_LOAD_TASK_COUNT = 4096;
156161
// soft limit the inflight memory
157-
private final Semaphore inflightLimiter = new Semaphore(Systems.CPU_CORES * 4);
158-
private final Queue<WalReplayTask> waitingLoadTasks = new ConcurrentLinkedQueue<>();
162+
private final int maxInflightLoadingCount = Systems.CPU_CORES * 4;
163+
private final BlockingQueue<WalReplayTask> waitingLoadTasks = new ArrayBlockingQueue<>(MAX_WAITING_LOAD_TASK_COUNT);
159164
private final Queue<WalReplayTask> loadingTasks = new ConcurrentLinkedQueue<>();
160165

161166
public CompletableFuture<Void> replay(WriteAheadLog wal, RecordOffset startOffset, RecordOffset endOffset) {
162-
inflightLimiter.acquireUninterruptibly();
163167
WalReplayTask task = new WalReplayTask(wal, startOffset, endOffset);
164-
waitingLoadTasks.add(task);
168+
while (!waitingLoadTasks.add(task)) {
169+
// The replay won't be called on the SnapshotReadCache.eventLoop, so there won't be a deadlock.
170+
eventLoop.submit(this::clearOverloadedTask).join();
171+
}
165172
eventLoop.submit(this::tryLoad);
166-
return task.replayCf.whenComplete((nil, ex) -> inflightLimiter.release());
173+
return task.replayCf.whenCompleteAsync((nil, ex) -> tryLoad(), eventLoop);
167174
}
168175

169176
@EventLoopSafe
170177
private void tryLoad() {
171178
for (; ; ) {
172-
WalReplayTask task = waitingLoadTasks.poll();
179+
if (loadingTasks.size() >= maxInflightLoadingCount) {
180+
break;
181+
}
182+
WalReplayTask task = waitingLoadTasks.peek();
173183
if (task == null) {
174184
break;
175185
}
186+
if (time.nanoseconds() - task.timestampNanos > TASK_WAITING_TIMEOUT_NANOS) {
187+
clearOverloadedTask();
188+
return;
189+
}
190+
waitingLoadTasks.poll();
176191
loadingTasks.add(task);
177192
task.run();
178193
task.loadCf.whenCompleteAsync((rst, ex) -> tryPutIntoCache(), eventLoop);
179194
}
180195
}
181196

197+
/**
198+
* Clears all waiting tasks when the replay system is overloaded.
199+
* This is triggered when tasks wait longer than TASK_WAITING_TIMEOUT_NANOS or waitingLoadTasks is full.
200+
* All dropped tasks have their futures completed with null, and affected
201+
* nodes are notified to commit their WAL to free up resources.
202+
*/
203+
@EventLoopSafe
204+
private void clearOverloadedTask() {
205+
// The WalReplay is overloaded, so we need to drain all tasks promptly.
206+
Set<Integer> nodeIds = new HashSet<>();
207+
int dropCount = 0;
208+
for (; ; ) {
209+
WalReplayTask task = waitingLoadTasks.poll();
210+
if (task == null) {
211+
break;
212+
}
213+
nodeIds.add(task.wal.metadata().nodeId());
214+
task.loadCf.complete(null);
215+
task.replayCf.complete(null);
216+
dropCount++;
217+
}
218+
nodeIds.forEach(cacheFreeListener::notifyListener);
219+
LOGGER.warn("wal replay is overloaded, drop all {} waiting tasks and request nodes={} to commit", dropCount, nodeIds);
220+
}
221+
182222
@EventLoopSafe
183223
private void tryPutIntoCache() {
184224
for (; ; ) {
@@ -195,6 +235,7 @@ private void tryPutIntoCache() {
195235
}
196236

197237
class WalReplayTask {
238+
final long timestampNanos = time.nanoseconds();
198239
final WriteAheadLog wal;
199240
final RecordOffset startOffset;
200241
final RecordOffset endOffset;
@@ -389,9 +430,11 @@ public void onFree(List<LogCache.StreamRangeBound> bounds) {
389430
requestCommitNodes.add(streamMetadata.nodeId());
390431
}
391432
}
392-
listeners.forEach(listener ->
393-
requestCommitNodes.forEach(nodeId ->
394-
FutureUtil.suppress(() -> listener.onEvent(new RequestCommitEvent(nodeId)), LOGGER)));
433+
requestCommitNodes.forEach(this::notifyListener);
434+
}
435+
436+
public void notifyListener(int nodeId) {
437+
listeners.forEach(listener -> FutureUtil.suppress(() -> listener.onEvent(new RequestCommitEvent(nodeId)), LOGGER));
395438
}
396439

397440
public void addListener(EventListener listener) {

0 commit comments

Comments
 (0)