Skip to content

Commit ecbe521

Browse files
authored
fix(s3stream): Fix "this escape" issues in constructors(#2442) (#2463)
* Fix "this escape" issues in constructors * DefaultS3Client fix this escape * confilct with the main branch is solved used factory method in LocalStreamRangeIndexCacheTest.java
1 parent ea306ab commit ecbe521

File tree

8 files changed

+49
-21
lines changed

8 files changed

+49
-21
lines changed

core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public void start() {
139139
config.networkBaselineBandwidth() - (long) networkOutboundRate.derive(
140140
TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()), NetworkStats.getInstance().networkOutboundUsageTotal().get()));
141141

142-
this.localIndexCache = new LocalStreamRangeIndexCache();
142+
this.localIndexCache = LocalStreamRangeIndexCache.create();
143143
this.objectReaderFactory = new DefaultObjectReaderFactory(() -> this.mainObjectStorage);
144144
this.metadataManager = new StreamMetadataManager(brokerServer, config.nodeId(), objectReaderFactory, localIndexCache);
145145
this.requestSender = new ControllerRequestSender(brokerServer, new ControllerRequestSender.RetryPolicyContext(config.controllerRequestRetryMaxCount(),

s3stream/src/main/java/com/automq/stream/s3/S3Stream.java

+17-6
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,7 @@ public class S3Stream implements Stream, StreamMetadataListener {
100100
private CompletableFuture<Void> closeCf;
101101
private StreamMetadataListener.Handle listenerHandle;
102102

103-
public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
104-
StreamManager streamManager) {
105-
this(streamId, epoch, startOffset, nextOffset, storage, streamManager, null, null, OpenStreamOptions.DEFAULT);
106-
}
107-
108-
public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
103+
private S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
109104
StreamManager streamManager, NetworkBandwidthLimiter networkInboundLimiter,
110105
NetworkBandwidthLimiter networkOutboundLimiter, OpenStreamOptions options) {
111106
this.streamId = streamId;
@@ -121,6 +116,22 @@ public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, St
121116
this.networkInboundLimiter = networkInboundLimiter;
122117
this.networkOutboundLimiter = networkOutboundLimiter;
123118
this.options = options;
119+
}
120+
121+
public static S3Stream create(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
122+
StreamManager streamManager) {
123+
return create(streamId, epoch, startOffset, nextOffset, storage, streamManager, null, null, OpenStreamOptions.DEFAULT);
124+
}
125+
126+
public static S3Stream create(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
127+
StreamManager streamManager, NetworkBandwidthLimiter networkInboundLimiter,
128+
NetworkBandwidthLimiter networkOutboundLimiter, OpenStreamOptions options) {
129+
S3Stream s3Stream = new S3Stream(streamId, epoch, startOffset, nextOffset, storage, streamManager, networkInboundLimiter, networkOutboundLimiter, options);
130+
s3Stream.completeInitialization();
131+
return s3Stream;
132+
}
133+
134+
private void completeInitialization() {
124135
if (snapshotRead()) {
125136
listenerHandle = streamManager.addMetadataListener(streamId, this);
126137
}

s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ private CompletableFuture<Stream> openStream0(long streamId, long epoch, Map<Str
203203
}
204204

205205
S3Stream newStream(StreamMetadata metadata, OpenStreamOptions options) {
206-
return new S3Stream(
206+
return S3Stream.create(
207207
metadata.streamId(), metadata.epoch(),
208208
metadata.startOffset(), metadata.endOffset(),
209209
storage, streamManager, networkInboundBucket, networkOutboundBucket, options);

s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,21 @@ public class AsyncLRUCache<K, V extends AsyncMeasurable> {
4848
final Set<V> completedSet = new HashSet<>();
4949
final Set<V> removedSet = new HashSet<>();
5050

51-
public AsyncLRUCache(String cacheName, long maxSize) {
51+
protected AsyncLRUCache(String cacheName, long maxSize) {
5252
this.cacheName = cacheName;
5353
if (maxSize <= 0) {
5454
throw new IllegalArgumentException("maxSize must be positive");
5555
}
5656
this.maxSize = maxSize;
57+
}
58+
59+
public static <K, V extends AsyncMeasurable> AsyncLRUCache<K, V> create(String cacheName, long maxSize) {
60+
AsyncLRUCache<K, V> asyncLRUCache = new AsyncLRUCache<>(cacheName, maxSize);
61+
asyncLRUCache.completeInitialization();
62+
return asyncLRUCache;
63+
}
5764

65+
private void completeInitialization() {
5866
S3StreamMetricsManager.registerAsyncCacheSizeSupplier(this::totalSize, cacheName);
5967
S3StreamMetricsManager.registerAsyncCacheMaxSizeSupplier(() -> maxSize, cacheName);
6068
S3StreamMetricsManager.registerAsyncCacheItemNumberSupplier(this::size, cacheName);

s3stream/src/main/java/com/automq/stream/s3/exceptions/ObjectNotExistException.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ public ObjectNotExistException(long objectId) {
2929
}
3030

3131
public ObjectNotExistException(Throwable cause) {
32-
super(cause.getMessage());
33-
this.addSuppressed(cause);
32+
super(cause.getMessage(), cause);
3433
}
3534

3635
}

s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,17 @@ public class LocalStreamRangeIndexCache implements S3StreamClient.StreamLifeCycl
8080
private CompletableFuture<Void> uploadCf = CompletableFuture.completedFuture(null);
8181
private long lastUploadTime = 0L;
8282

83-
public LocalStreamRangeIndexCache() {
83+
private LocalStreamRangeIndexCache() {
84+
85+
}
86+
87+
public static LocalStreamRangeIndexCache create() {
88+
LocalStreamRangeIndexCache cache = new LocalStreamRangeIndexCache();
89+
cache.completeInitialization();
90+
return cache;
91+
}
92+
93+
private void completeInitialization() {
8494
S3StreamMetricsManager.registerLocalStreamRangeIndexCacheSizeSupplier(this::totalSize);
8595
S3StreamMetricsManager.registerLocalStreamRangeIndexCacheStreamNumSupplier(() -> {
8696
readLock.lock();

s3stream/src/test/java/com/automq/stream/s3/S3StreamTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class S3StreamTest {
5252
public void setup() {
5353
storage = mock(Storage.class);
5454
streamManager = mock(StreamManager.class);
55-
stream = new S3Stream(233, 1, 100, 233, storage, streamManager);
55+
stream = S3Stream.create(233L, 1L, 100L, 233L, storage, streamManager);
5656
}
5757

5858
@Test

s3stream/src/test/java/com/automq/stream/s3/index/LocalStreamRangeIndexCacheTest.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class LocalStreamRangeIndexCacheTest {
4444
public void testInit() {
4545
ObjectStorage objectStorage = new MemoryObjectStorage();
4646
// init with empty index
47-
LocalStreamRangeIndexCache cache = new LocalStreamRangeIndexCache();
47+
LocalStreamRangeIndexCache cache = LocalStreamRangeIndexCache.create();
4848
cache.start();
4949
cache.init(NODE_0, objectStorage);
5050
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 0).join());
@@ -55,7 +55,7 @@ public void testInit() {
5555
cache.updateIndexFromRequest(request);
5656
cache.upload().join();
5757

58-
cache = new LocalStreamRangeIndexCache();
58+
cache = LocalStreamRangeIndexCache.create();
5959
cache.start();
6060
cache.init(NODE_0, objectStorage);
6161
cache.initCf().join();
@@ -69,7 +69,7 @@ public void testInit() {
6969
@Test
7070
public void testAppend() {
7171
ObjectStorage objectStorage = new MemoryObjectStorage();
72-
LocalStreamRangeIndexCache cache = new LocalStreamRangeIndexCache();
72+
LocalStreamRangeIndexCache cache = LocalStreamRangeIndexCache.create();
7373
cache.start();
7474
cache.init(NODE_0, objectStorage);
7575
CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest();
@@ -93,7 +93,7 @@ public void testAppend() {
9393
@Test
9494
public void testPrune() {
9595
ObjectStorage objectStorage = new MemoryObjectStorage();
96-
LocalStreamRangeIndexCache cache = new LocalStreamRangeIndexCache();
96+
LocalStreamRangeIndexCache cache = LocalStreamRangeIndexCache.create();
9797
cache.start();
9898
cache.init(NODE_0, objectStorage);
9999
CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest();
@@ -130,7 +130,7 @@ public void testPrune() {
130130
Assertions.assertEquals(97, cache.searchObjectId(STREAM_0, 1500).join());
131131

132132
// test load from object storage
133-
cache = new LocalStreamRangeIndexCache();
133+
cache = LocalStreamRangeIndexCache.create();
134134
cache.start();
135135
cache.init(NODE_0, objectStorage);
136136
cache.initCf().join();
@@ -155,7 +155,7 @@ public void testPrune() {
155155
@Test
156156
public void testEvict() {
157157
ObjectStorage objectStorage = new MemoryObjectStorage();
158-
LocalStreamRangeIndexCache cache = new LocalStreamRangeIndexCache();
158+
LocalStreamRangeIndexCache cache = LocalStreamRangeIndexCache.create();
159159
cache.start();
160160
cache.init(NODE_0, objectStorage);
161161
int streamNum = 500;
@@ -180,7 +180,7 @@ public void testEvict() {
180180
@Test
181181
public void testCompact() {
182182
ObjectStorage objectStorage = new MemoryObjectStorage();
183-
LocalStreamRangeIndexCache cache = new LocalStreamRangeIndexCache();
183+
LocalStreamRangeIndexCache cache = LocalStreamRangeIndexCache.create();
184184
cache.start();
185185
cache.init(NODE_0, objectStorage);
186186
CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest();
@@ -260,7 +260,7 @@ public void testCompact() {
260260
@Test
261261
public void testCompactWithStreamDeleted() {
262262
ObjectStorage objectStorage = new MemoryObjectStorage();
263-
LocalStreamRangeIndexCache cache = new LocalStreamRangeIndexCache();
263+
LocalStreamRangeIndexCache cache = LocalStreamRangeIndexCache.create();
264264
cache.start();
265265
cache.init(NODE_0, objectStorage);
266266
CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest();

0 commit comments

Comments
 (0)