From af9882301d0cd57848d1d49395f71d57b33c9e6b Mon Sep 17 00:00:00 2001 From: Lei Lu Date: Thu, 13 Nov 2025 22:12:57 -0800 Subject: [PATCH 1/2] [all] Include PubSubMessageHeaders size in ImmutablePubSubMessage heapsize estimation We estimate the size of pubsubmessage and based on it to compute the aggregate size of drainer buffer queue. A limit is set on the queueto control its occupied heapsize. In a recent OOM heap dump analysis, we found that this estimation can inaccurate e.g. buffer size is about 400MB, whereas the limit setting is only 10MB, and the reason is that we use the sum of key, value, and pubSubPosition to estimate an ImmutablePubSubMessage size. However, when examining the dump, in this particular case, the major contributor was PubSubMessageHeaders which was about 16KB and we don't take it into account today. This PR includes the header size inside getHeapSize estimation for ImmutablePubSubMessage. --- .../com/linkedin/venice/pubsub/ImmutablePubSubMessage.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java index 54ffb2e3e24..b48ff41f0f1 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/ImmutablePubSubMessage.java @@ -98,6 +98,7 @@ public String toString() { public int getHeapSize() { /** The {@link #topicPartition} is supposed to be a shared instance, and is therefore ignored. */ return SHALLOW_CLASS_OVERHEAD + InstanceSizeEstimator.getObjectSize(key) - + InstanceSizeEstimator.getObjectSize(value) + InstanceSizeEstimator.getObjectSize(pubSubPosition); + + InstanceSizeEstimator.getObjectSize(value) + InstanceSizeEstimator.getObjectSize(pubSubPosition) + + (pubSubMessageHeaders != null ? InstanceSizeEstimator.getObjectSize(pubSubMessageHeaders) : 0); } } From 4f9a823d5af3a883a96d4f2103c04687a51af7bc Mon Sep 17 00:00:00 2001 From: Lei Lu Date: Fri, 14 Nov 2025 00:07:13 -0800 Subject: [PATCH 2/2] increase buffercapacity in test --- .../linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java index 924c27184e1..6a0b666b407 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java @@ -494,7 +494,7 @@ public void suiteSetUp() throws Exception { taskPollingService = Executors.newFixedThreadPool(1); storeBufferService = new StoreBufferService( 3, - 10000, + 100000, 1000, isStoreWriterBufferAfterLeaderLogicEnabled(), null,