-
Notifications
You must be signed in to change notification settings - Fork 522
feat(metadata): extract stream range index by lazy load StreamSetObject #2710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a12b947
48c2323
c6abb79
50e74ec
2f36aa0
351c991
8e166a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -16,7 +16,6 @@ | |||||||
| * See the License for the specific language governing permissions and | ||||||||
| * limitations under the License. | ||||||||
| */ | ||||||||
|
|
||||||||
| package kafka.log.stream.s3.metadata; | ||||||||
|
|
||||||||
| import kafka.server.BrokerServer; | ||||||||
|
|
@@ -35,8 +34,10 @@ | |||||||
| import org.apache.kafka.metadata.stream.S3StreamSetObject; | ||||||||
|
|
||||||||
| import com.automq.stream.s3.ObjectReader; | ||||||||
| import com.automq.stream.s3.cache.LRUCache; | ||||||||
| import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory; | ||||||||
| import com.automq.stream.s3.index.LocalStreamRangeIndexCache; | ||||||||
| import com.automq.stream.s3.index.lazy.StreamSetObjectRangeIndex; | ||||||||
| import com.automq.stream.s3.metadata.ObjectUtils; | ||||||||
| import com.automq.stream.s3.metadata.S3ObjectMetadata; | ||||||||
| import com.automq.stream.s3.metadata.S3StreamConstant; | ||||||||
|
|
@@ -48,11 +49,14 @@ | |||||||
| import com.automq.stream.s3.streams.StreamMetadataListener; | ||||||||
| import com.automq.stream.utils.FutureUtil; | ||||||||
| import com.automq.stream.utils.Threads; | ||||||||
| import com.google.common.collect.Sets; | ||||||||
|
|
||||||||
| import org.apache.orc.util.BloomFilter; | ||||||||
| import org.slf4j.Logger; | ||||||||
| import org.slf4j.LoggerFactory; | ||||||||
|
|
||||||||
| import java.util.ArrayList; | ||||||||
| import java.util.Collections; | ||||||||
| import java.util.LinkedList; | ||||||||
| import java.util.List; | ||||||||
| import java.util.Map; | ||||||||
|
|
@@ -67,6 +71,7 @@ | |||||||
| import io.netty.util.concurrent.DefaultThreadFactory; | ||||||||
|
|
||||||||
| import static com.automq.stream.utils.FutureUtil.exec; | ||||||||
| import static kafka.log.stream.s3.metadata.StreamMetadataManager.DefaultRangeGetter.STREAM_ID_BLOOM_FILTER; | ||||||||
|
|
||||||||
| public class StreamMetadataManager implements InRangeObjectsFetcher, MetadataPublisher { | ||||||||
| private static final Logger LOGGER = LoggerFactory.getLogger(StreamMetadataManager.class); | ||||||||
|
|
@@ -78,6 +83,8 @@ public class StreamMetadataManager implements InRangeObjectsFetcher, MetadataPub | |||||||
| private final LocalStreamRangeIndexCache indexCache; | ||||||||
| private final Map<Long, StreamMetadataListener> streamMetadataListeners = new ConcurrentHashMap<>(); | ||||||||
|
|
||||||||
| private Set<Long> streamSetObjectIds = Collections.emptySet(); | ||||||||
|
|
||||||||
| public StreamMetadataManager(BrokerServer broker, int nodeId, ObjectReaderFactory objectReaderFactory, | ||||||||
| LocalStreamRangeIndexCache indexCache) { | ||||||||
| this.nodeId = nodeId; | ||||||||
|
|
@@ -98,16 +105,23 @@ public String name() { | |||||||
| @Override | ||||||||
| public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, LoaderManifest manifest) { | ||||||||
| Set<Long> changedStreams; | ||||||||
| Set<Long> streamSetObjectIds = this.streamSetObjectIds; | ||||||||
| synchronized (this) { | ||||||||
| if (newImage.highestOffsetAndEpoch().equals(this.metadataImage.highestOffsetAndEpoch())) { | ||||||||
| return; | ||||||||
| } | ||||||||
| this.metadataImage = newImage; | ||||||||
| changedStreams = delta.getOrCreateStreamsMetadataDelta().changedStreams(); | ||||||||
| } | ||||||||
| this.streamSetObjectIds = Collections.unmodifiableSet(getStreamSetObjectIds()); | ||||||||
|
|
||||||||
| // update streamBloomFilter | ||||||||
| Set<Long> sets = Sets.difference(this.streamSetObjectIds, streamSetObjectIds); | ||||||||
| sets.forEach(STREAM_ID_BLOOM_FILTER::removeObject); | ||||||||
|
||||||||
| sets.forEach(STREAM_ID_BLOOM_FILTER::removeObject); | |
| Set<Long> removedStreamSetObjectIds = Sets.difference(this.streamSetObjectIds, streamSetObjectIds); | |
| removedStreamSetObjectIds.forEach(STREAM_ID_BLOOM_FILTER::removeObject); |
Copilot
AI
Aug 8, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lambda captures the local variable 'streamSetObjectIds' which refers to the old set of objects. This should capture 'this.streamSetObjectIds' to use the updated set of stream set object IDs.
| this.indexCache.asyncPrune(() -> streamSetObjectIds); | |
| this.indexCache.asyncPrune(() -> this.streamSetObjectIds); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class could be extracted to a separated file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#getStreamSetObjectIds only returns the current node's SSO. I think that STREAM_ID_BLOOM_FILTER#removeObject is expected to remove all deleted SSO.