3434import java .util .concurrent .ExecutorService ;
3535import java .util .concurrent .ScheduledExecutorService ;
3636import java .util .concurrent .TimeUnit ;
37+ import java .util .concurrent .atomic .AtomicLong ;
3738import java .util .concurrent .locks .Condition ;
3839import java .util .concurrent .locks .Lock ;
3940import java .util .concurrent .locks .ReentrantLock ;
@@ -51,7 +52,7 @@ public class AsyncNetworkBandwidthLimiter implements NetworkBandwidthLimiter {
5152 private final Queue <BucketItem > queuedCallbacks ;
5253 private final Type type ;
5354 private final long tokenSize ;
54- private long availableTokens ;
55+ private final AtomicLong availableTokens ;
5556
5657 public AsyncNetworkBandwidthLimiter (Type type , long tokenSize , int refillIntervalMs ) {
5758 this (type , tokenSize , refillIntervalMs , tokenSize );
@@ -61,12 +62,13 @@ public AsyncNetworkBandwidthLimiter(Type type, long tokenSize, int refillInterva
6162 public AsyncNetworkBandwidthLimiter (Type type , long tokenSize , int refillIntervalMs , long maxTokens ) {
6263 this .type = type ;
6364 this .tokenSize = tokenSize ;
64- this .availableTokens = this .tokenSize ;
65+ this .availableTokens = new AtomicLong ( this .tokenSize ) ;
6566 this .maxTokens = maxTokens ;
6667 this .queuedCallbacks = new PriorityQueue <>();
6768 this .refillThreadPool =
6869 Threads .newSingleThreadScheduledExecutor (new DefaultThreadFactory ("refill-bucket-thread" ), LOGGER );
69- this .callbackThreadPool = Threads .newFixedFastThreadLocalThreadPoolWithMonitor (1 , "callback-thread" , true , LOGGER );
70+ // The threads number must be larger than 1 because the #run will occupy one thread.
71+ this .callbackThreadPool = Threads .newFixedFastThreadLocalThreadPoolWithMonitor (2 , "callback-thread" , true , LOGGER );
7072 this .callbackThreadPool .execute (this ::run );
7173 this .refillThreadPool .scheduleAtFixedRate (this ::refillToken , refillIntervalMs , refillIntervalMs , TimeUnit .MILLISECONDS );
7274 S3StreamMetricsManager .registerNetworkLimiterQueueSizeSupplier (type , this ::getQueueSize );
@@ -88,7 +90,7 @@ private void run() {
8890 }
8991 long size = Math .min (head .size , MAX_TOKEN_PART_SIZE );
9092 reduceToken (size );
91- if (head .complete (size )) {
93+ if (head .complete (size , callbackThreadPool )) {
9294 queuedCallbacks .poll ();
9395 }
9496 }
@@ -103,7 +105,7 @@ private void run() {
103105 private void refillToken () {
104106 lock .lock ();
105107 try {
106- availableTokens = Math .min (availableTokens + this .tokenSize , this .maxTokens );
108+ this . availableTokens . getAndUpdate ( old -> Math .min (old + this .tokenSize , this .maxTokens ) );
107109 condition .signalAll ();
108110 } finally {
109111 lock .unlock ();
@@ -114,7 +116,7 @@ private boolean ableToConsume() {
114116 if (queuedCallbacks .isEmpty ()) {
115117 return false ;
116118 }
117- return availableTokens > 0 ;
119+ return availableTokens . get () > 0 ;
118120 }
119121
120122 public void shutdown () {
@@ -127,12 +129,7 @@ public long getMaxTokens() {
127129 }
128130
129131 public long getAvailableTokens () {
130- lock .lock ();
131- try {
132- return availableTokens ;
133- } finally {
134- lock .unlock ();
135- }
132+ return availableTokens .get ();
136133 }
137134
138135 public int getQueueSize () {
@@ -145,12 +142,7 @@ public int getQueueSize() {
145142 }
146143
147144 private void forceConsume (long size ) {
148- lock .lock ();
149- try {
150- reduceToken (size );
151- } finally {
152- lock .unlock ();
153- }
145+ reduceToken (size );
154146 }
155147
156148 public CompletableFuture <Void > consume (ThrottleStrategy throttleStrategy , long size ) {
@@ -162,7 +154,7 @@ public CompletableFuture<Void> consume(ThrottleStrategy throttleStrategy, long s
162154 } else {
163155 lock .lock ();
164156 try {
165- if (availableTokens <= 0 || !queuedCallbacks .isEmpty ()) {
157+ if (availableTokens . get () <= 0 || !queuedCallbacks .isEmpty ()) {
166158 queuedCallbacks .offer (new BucketItem (throttleStrategy , size , cf ));
167159 condition .signalAll ();
168160 } else {
@@ -177,7 +169,7 @@ public CompletableFuture<Void> consume(ThrottleStrategy throttleStrategy, long s
177169 }
178170
179171 private void reduceToken (long size ) {
180- this .availableTokens = Math .max (-maxTokens , availableTokens - size );
172+ this .availableTokens . getAndUpdate ( old -> Math .max (-maxTokens , old - size ) );
181173 }
182174
183175 public enum Type {
@@ -216,10 +208,10 @@ public int compareTo(BucketItem o) {
216208 return Long .compare (strategy .priority (), o .strategy .priority ());
217209 }
218210
219- public boolean complete (long completeSize ) {
211+ public boolean complete (long completeSize , ExecutorService executor ) {
220212 size -= completeSize ;
221213 if (size <= 0 ) {
222- cf .complete (null );
214+ executor . submit (() -> cf .complete (null ) );
223215 return true ;
224216 }
225217 return false ;
0 commit comments