3535import java .util .Map ;
3636import java .util .Queue ;
3737import java .util .Set ;
38+ import java .util .concurrent .ArrayBlockingQueue ;
39+ import java .util .concurrent .BlockingQueue ;
3840import java .util .concurrent .CompletableFuture ;
3941import java .util .concurrent .ConcurrentLinkedQueue ;
4042import java .util .concurrent .CopyOnWriteArrayList ;
4143import java .util .concurrent .ExecutionException ;
42- import java .util .concurrent .Semaphore ;
4344import java .util .concurrent .TimeUnit ;
4445import java .util .concurrent .atomic .AtomicLong ;
4546import java .util .function .Function ;
@@ -74,7 +75,8 @@ public class SnapshotReadCache {
7475 private final LinkRecordDecoder linkRecordDecoder ;
7576 private final Time time = Time .SYSTEM ;
7677
77- public SnapshotReadCache (StreamManager streamManager , LogCache cache , ObjectStorage objectStorage , LinkRecordDecoder linkRecordDecoder ) {
78+ public SnapshotReadCache (StreamManager streamManager , LogCache cache , ObjectStorage objectStorage ,
79+ LinkRecordDecoder linkRecordDecoder ) {
7880 activeStreams = CacheBuilder .newBuilder ()
7981 .expireAfterAccess (10 , TimeUnit .MINUTES )
8082 .removalListener ((RemovalListener <Long , Boolean >) notification ->
@@ -128,7 +130,8 @@ public synchronized CompletableFuture<Void> replay(List<S3ObjectMetadata> object
128130 return objectReplay .replay (objects );
129131 }
130132
131- public synchronized CompletableFuture <Void > replay (WriteAheadLog confirmWAL , RecordOffset startOffset , RecordOffset endOffset ) {
133+ public synchronized CompletableFuture <Void > replay (WriteAheadLog confirmWAL , RecordOffset startOffset ,
134+ RecordOffset endOffset ) {
132135 long startNanos = time .nanoseconds ();
133136 return walReplay .replay (confirmWAL , startOffset , endOffset )
134137 .whenComplete ((nil , ex ) -> REPLAY_LATENCY .record (time .nanoseconds () - startNanos ));
@@ -153,32 +156,62 @@ private void activeStream(long streamId) {
153156 }
154157
155158 class WalReplay {
159+ private static final long TASK_WAITING_TIMEOUT_NANOS = TimeUnit .SECONDS .toNanos (5 );
160+ private static final int MAX_WAITING_LOAD_TASK_COUNT = 4096 ;
156161 // soft limit the inflight memory
157- private final Semaphore inflightLimiter = new Semaphore ( Systems .CPU_CORES * 4 ) ;
158- private final Queue <WalReplayTask > waitingLoadTasks = new ConcurrentLinkedQueue <>();
162+ private final int maxInflightLoadingCount = Systems .CPU_CORES * 4 ;
163+ private final BlockingQueue <WalReplayTask > waitingLoadTasks = new ArrayBlockingQueue <>(MAX_WAITING_LOAD_TASK_COUNT );
159164 private final Queue <WalReplayTask > loadingTasks = new ConcurrentLinkedQueue <>();
160165
161166 public CompletableFuture <Void > replay (WriteAheadLog wal , RecordOffset startOffset , RecordOffset endOffset ) {
162- inflightLimiter .acquireUninterruptibly ();
163167 WalReplayTask task = new WalReplayTask (wal , startOffset , endOffset );
164- waitingLoadTasks .add (task );
168+ while (!waitingLoadTasks .add (task )) {
169+ eventLoop .submit (this ::clearOverloadedTask ).join ();
170+ }
165171 eventLoop .submit (this ::tryLoad );
166- return task .replayCf .whenComplete ((nil , ex ) -> inflightLimiter . release () );
172+ return task .replayCf .whenCompleteAsync ((nil , ex ) -> tryLoad (), eventLoop );
167173 }
168174
169175 @ EventLoopSafe
170176 private void tryLoad () {
171177 for (; ; ) {
172- WalReplayTask task = waitingLoadTasks .poll ();
178+ if (loadingTasks .size () >= maxInflightLoadingCount ) {
179+ break ;
180+ }
181+ WalReplayTask task = waitingLoadTasks .peek ();
173182 if (task == null ) {
174183 break ;
175184 }
185+ if (time .nanoseconds () - task .timestampNanos > TASK_WAITING_TIMEOUT_NANOS ) {
186+ clearOverloadedTask ();
187+ return ;
188+ }
189+ waitingLoadTasks .poll ();
176190 loadingTasks .add (task );
177191 task .run ();
178192 task .loadCf .whenCompleteAsync ((rst , ex ) -> tryPutIntoCache (), eventLoop );
179193 }
180194 }
181195
196+ @ EventLoopSafe
197+ private void clearOverloadedTask () {
198+ // The WalReplay is overloaded, so we need to drain all tasks promptly.
199+ Set <Integer > nodeIds = new HashSet <>();
200+ int dropCount = 0 ;
201+ for (; ; ) {
202+ WalReplayTask task = waitingLoadTasks .poll ();
203+ if (task == null ) {
204+ break ;
205+ }
206+ nodeIds .add (task .wal .metadata ().nodeId ());
207+ task .loadCf .complete (null );
208+ task .replayCf .complete (null );
209+ dropCount ++;
210+ }
211+ nodeIds .forEach (cacheFreeListener ::notifyListener );
212+ LOGGER .warn ("wal replay is overloaded, drop all {} waiting tasks and request nodes={} to commit" , dropCount , nodeIds );
213+ }
214+
182215 @ EventLoopSafe
183216 private void tryPutIntoCache () {
184217 for (; ; ) {
@@ -195,6 +228,7 @@ private void tryPutIntoCache() {
195228 }
196229
197230 class WalReplayTask {
231+ final long timestampNanos = time .nanoseconds ();
198232 final WriteAheadLog wal ;
199233 final RecordOffset startOffset ;
200234 final RecordOffset endOffset ;
@@ -389,9 +423,11 @@ public void onFree(List<LogCache.StreamRangeBound> bounds) {
389423 requestCommitNodes .add (streamMetadata .nodeId ());
390424 }
391425 }
392- listeners .forEach (listener ->
393- requestCommitNodes .forEach (nodeId ->
394- FutureUtil .suppress (() -> listener .onEvent (new RequestCommitEvent (nodeId )), LOGGER )));
426+ requestCommitNodes .forEach (this ::notifyListener );
427+ }
428+
429+ public void notifyListener (int nodeId ) {
430+ listeners .forEach (listener -> FutureUtil .suppress (() -> listener .onEvent (new RequestCommitEvent (nodeId )), LOGGER ));
395431 }
396432
397433 public void addListener (EventListener listener ) {
0 commit comments