Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public CompletableFuture<StreamRecordBatch> decode(StreamRecordBatch src) {
streamRecordBatch.encoded(SnapshotReadCache.ENCODE_ALLOC);
return streamRecordBatch;
} finally {
src.release();
buf.release();
}
}).whenComplete((rst, ex) -> {
src.release();
if (ex != null) {
LOGGER.error("Error while decoding link record, link={}", linkRecord, ex);
}
Expand Down
40 changes: 26 additions & 14 deletions core/src/main/java/kafka/automq/zerozone/ObjectRouterChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.automq.stream.s3.wal.impl.object.ObjectWALService;
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.LogContext;
import com.automq.stream.utils.Threads;

import org.slf4j.Logger;

Expand All @@ -44,6 +45,7 @@

public class ObjectRouterChannel implements RouterChannel {
private static final ExecutorService ASYNC_EXECUTOR = Executors.newCachedThreadPool();
private static final long OVER_CAPACITY_RETRY_DELAY_MS = 1000L;
private final Logger logger;
private final AtomicLong mockOffset = new AtomicLong(0);
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -82,20 +84,30 @@ public CompletableFuture<AppendResult> append(int targetNodeId, short orderHint,

CompletableFuture<AppendResult> append0(int targetNodeId, short orderHint, ByteBuf data) {
StreamRecordBatch record = new StreamRecordBatch(targetNodeId, 0, mockOffset.incrementAndGet(), 1, data);
try {
return wal.append(TraceContext.DEFAULT, record).thenApply(walRst -> {
readLock.lock();
try {
long epoch = this.channelEpoch;
ChannelOffset channelOffset = ChannelOffset.of(channelId, orderHint, nodeId, targetNodeId, walRst.recordOffset().buffer());
channelEpoch2LastRecordOffset.put(epoch, walRst.recordOffset());
return new AppendResult(epoch, channelOffset.byteBuf());
} finally {
readLock.unlock();
}
});
} catch (OverCapacityException e) {
return CompletableFuture.failedFuture(e);
record.encoded();
record.retain();
for (; ; ) {
try {
return wal.append(TraceContext.DEFAULT, record).thenApply(walRst -> {
readLock.lock();
try {
long epoch = this.channelEpoch;
ChannelOffset channelOffset = ChannelOffset.of(channelId, orderHint, nodeId, targetNodeId, walRst.recordOffset().buffer());
channelEpoch2LastRecordOffset.put(epoch, walRst.recordOffset());
return new AppendResult(epoch, channelOffset.byteBuf());
} finally {
readLock.unlock();
}
}).whenComplete((r, e) -> record.release());
} catch (OverCapacityException e) {
logger.warn("OverCapacityException occurred while appending, err={}", e.getMessage());
// Use block-based delayed retries for network backpressure.
Threads.sleep(OVER_CAPACITY_RETRY_DELAY_MS);
} catch (Throwable e) {
logger.error("[UNEXPECTED], append wal fail", e);
record.release();
return CompletableFuture.failedFuture(e);
}
}
}

Expand Down
20 changes: 15 additions & 5 deletions core/src/main/java/kafka/automq/zerozone/RouterInV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

Expand All @@ -71,7 +71,7 @@ public class RouterInV2 implements NonBlockingLocalRouterHandler {
private final String rack;
private final RouterInProduceHandler localAppendHandler;
private RouterInProduceHandler routerInProduceHandler;
private final Queue<PartitionProduceRequest> unpackLinkQueue = new ConcurrentLinkedQueue<>();
private final BlockingQueue<PartitionProduceRequest> unpackLinkQueue = new ArrayBlockingQueue<>(Systems.CPU_CORES * 8192);
private final EventLoop[] appendEventLoops;
private final FastThreadLocal<RequestLocal> requestLocals = new FastThreadLocal<>() {
@Override
Expand Down Expand Up @@ -115,9 +115,9 @@ private CompletableFuture<AutomqZoneRouterResponse> handleZoneRouterRequest0(Aut
for (ByteBuf channelOffset : routerRecord.channelOffsets()) {
PartitionProduceRequest partitionProduceRequest = new PartitionProduceRequest(ChannelOffset.of(channelOffset));
partitionProduceRequest.unpackLinkCf = routerChannel.get(channelOffset);
unpackLinkQueue.add(partitionProduceRequest);
addToUnpackLinkQueue(partitionProduceRequest);
partitionProduceRequest.unpackLinkCf.whenComplete((rst, ex) -> {
if (ex != null) {
if (ex == null) {
size.addAndGet(rst.readableBytes());
}
handleUnpackLink();
Expand Down Expand Up @@ -165,6 +165,16 @@ private void handleUnpackLink() {
}
}

private void addToUnpackLinkQueue(PartitionProduceRequest req) {
for (;;) {
try {
unpackLinkQueue.put(req);
return;
} catch (InterruptedException ignored) {
}
}
}

@Override
public CompletableFuture<AutomqZoneRouterResponseData.Response> append(
ChannelOffset channelOffset,
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/java/kafka/automq/zerozone/RouterOutV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public void handleProduceAppendProxy(ProduceRequestArgs args) {
}
ZeroZoneMetricsManager.PROXY_REQUEST_LATENCY.record(time.nanoseconds() - startNanos);
});
}).exceptionally(ex -> {
LOGGER.error("Exception in processing append proxies", ex);
// Make the producer retry send.
responseMap.put(tp, errorPartitionResponse(Errors.LEADER_NOT_AVAILABLE));
return null;
});
cfList.add(proxyCf);
}
Expand Down Expand Up @@ -144,6 +149,10 @@ interface Proxy {
void send(ProxyRequest request);
}

static ProduceResponse.PartitionResponse errorPartitionResponse(Errors error) {
return new ProduceResponse.PartitionResponse(error, -1, -1, -1, -1, Collections.emptyList(), "");
}

static class LocalProxy implements Proxy {
private final NonBlockingLocalRouterHandler localRouterHandler;

Expand Down Expand Up @@ -341,7 +350,7 @@ public void completeWithUnknownError() {
}

private void completeWithError(Errors errors) {
ProduceResponse.PartitionResponse rst = new ProduceResponse.PartitionResponse(errors, -1, -1, -1, -1, Collections.emptyList(), "");
ProduceResponse.PartitionResponse rst = errorPartitionResponse(errors);
cf.complete(rst);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
Expand Down Expand Up @@ -74,7 +75,8 @@ public class SnapshotReadCache {
private final LinkRecordDecoder linkRecordDecoder;
private final Time time = Time.SYSTEM;

public SnapshotReadCache(StreamManager streamManager, LogCache cache, ObjectStorage objectStorage, LinkRecordDecoder linkRecordDecoder) {
public SnapshotReadCache(StreamManager streamManager, LogCache cache, ObjectStorage objectStorage,
LinkRecordDecoder linkRecordDecoder) {
activeStreams = CacheBuilder.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES)
.removalListener((RemovalListener<Long, Boolean>) notification ->
Expand Down Expand Up @@ -128,7 +130,8 @@ public synchronized CompletableFuture<Void> replay(List<S3ObjectMetadata> object
return objectReplay.replay(objects);
}

public synchronized CompletableFuture<Void> replay(WriteAheadLog confirmWAL, RecordOffset startOffset, RecordOffset endOffset) {
public synchronized CompletableFuture<Void> replay(WriteAheadLog confirmWAL, RecordOffset startOffset,
RecordOffset endOffset) {
long startNanos = time.nanoseconds();
return walReplay.replay(confirmWAL, startOffset, endOffset)
.whenComplete((nil, ex) -> REPLAY_LATENCY.record(time.nanoseconds() - startNanos));
Expand All @@ -153,32 +156,69 @@ private void activeStream(long streamId) {
}

class WalReplay {
private static final long TASK_WAITING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(5);
private static final int MAX_WAITING_LOAD_TASK_COUNT = 4096;
// soft limit the inflight memory
private final Semaphore inflightLimiter = new Semaphore(Systems.CPU_CORES * 4);
private final Queue<WalReplayTask> waitingLoadTasks = new ConcurrentLinkedQueue<>();
private final int maxInflightLoadingCount = Systems.CPU_CORES * 4;
private final BlockingQueue<WalReplayTask> waitingLoadTasks = new ArrayBlockingQueue<>(MAX_WAITING_LOAD_TASK_COUNT);
private final Queue<WalReplayTask> loadingTasks = new ConcurrentLinkedQueue<>();

public CompletableFuture<Void> replay(WriteAheadLog wal, RecordOffset startOffset, RecordOffset endOffset) {
inflightLimiter.acquireUninterruptibly();
WalReplayTask task = new WalReplayTask(wal, startOffset, endOffset);
waitingLoadTasks.add(task);
while (!waitingLoadTasks.add(task)) {
// The replay won't be called on the SnapshotReadCache.eventLoop, so there won't be a deadlock.
eventLoop.submit(this::clearOverloadedTask).join();
}
eventLoop.submit(this::tryLoad);
return task.replayCf.whenComplete((nil, ex) -> inflightLimiter.release());
return task.replayCf.whenCompleteAsync((nil, ex) -> tryLoad(), eventLoop);
}

@EventLoopSafe
private void tryLoad() {
for (; ; ) {
WalReplayTask task = waitingLoadTasks.poll();
if (loadingTasks.size() >= maxInflightLoadingCount) {
break;
}
WalReplayTask task = waitingLoadTasks.peek();
if (task == null) {
break;
}
if (time.nanoseconds() - task.timestampNanos > TASK_WAITING_TIMEOUT_NANOS) {
clearOverloadedTask();
return;
}
waitingLoadTasks.poll();
loadingTasks.add(task);
task.run();
task.loadCf.whenCompleteAsync((rst, ex) -> tryPutIntoCache(), eventLoop);
}
}

/**
* Clears all waiting tasks when the replay system is overloaded.
* This is triggered when tasks wait longer than TASK_WAITING_TIMEOUT_NANOS or waitingLoadTasks is full.
* All dropped tasks have their futures completed with null, and affected
* nodes are notified to commit their WAL to free up resources.
*/
@EventLoopSafe
private void clearOverloadedTask() {
// The WalReplay is overloaded, so we need to drain all tasks promptly.
Set<Integer> nodeIds = new HashSet<>();
int dropCount = 0;
for (; ; ) {
WalReplayTask task = waitingLoadTasks.poll();
if (task == null) {
break;
}
nodeIds.add(task.wal.metadata().nodeId());
task.loadCf.complete(null);
task.replayCf.complete(null);
dropCount++;
}
nodeIds.forEach(cacheFreeListener::notifyListener);
LOGGER.warn("wal replay is overloaded, drop all {} waiting tasks and request nodes={} to commit", dropCount, nodeIds);
}

@EventLoopSafe
private void tryPutIntoCache() {
for (; ; ) {
Expand All @@ -195,6 +235,7 @@ private void tryPutIntoCache() {
}

class WalReplayTask {
final long timestampNanos = time.nanoseconds();
final WriteAheadLog wal;
final RecordOffset startOffset;
final RecordOffset endOffset;
Expand Down Expand Up @@ -389,9 +430,11 @@ public void onFree(List<LogCache.StreamRangeBound> bounds) {
requestCommitNodes.add(streamMetadata.nodeId());
}
}
listeners.forEach(listener ->
requestCommitNodes.forEach(nodeId ->
FutureUtil.suppress(() -> listener.onEvent(new RequestCommitEvent(nodeId)), LOGGER)));
requestCommitNodes.forEach(this::notifyListener);
}

public void notifyListener(int nodeId) {
listeners.forEach(listener -> FutureUtil.suppress(() -> listener.onEvent(new RequestCommitEvent(nodeId)), LOGGER));
}

public void addListener(EventListener listener) {
Expand Down
Loading