Skip to content

Commit 23f7509

Browse files
authored
[repush][log compaction] Add write enabled filtering to compaction manager (#2273)
1 parent 28b36c1 commit 23f7509

File tree

2 files changed

+31
-1
lines changed

2 files changed

+31
-1
lines changed

services/venice-controller/src/main/java/com/linkedin/venice/controller/logcompaction/StoreRepushCandidateFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public boolean apply(String clusterName, StoreInfo storeInfo) {
4949
}
5050

5151
// Store criteria
52-
if (!storeInfo.isCompactionEnabled() || storeInfo.isMigrating()
52+
if (!storeInfo.isCompactionEnabled() || storeInfo.isMigrating() || !storeInfo.isEnableStoreWrites()
5353
|| !isLatestVersionStale(getVersionStalenessThresholdMs(storeInfo, clusterName), storeInfo)) {
5454
return false;
5555
}

services/venice-controller/src/test/java/com/linkedin/venice/controller/logcompaction/CompactionManagerTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public void testFilterStoresForCompaction() {
9595
Version version6 = mock(Version.class);
9696
Version version7 = mock(Version.class);
9797
Version version8 = mock(Version.class);
98+
Version version9 = mock(Version.class);
9899

99100
// set version number for Version mocks
100101
when(version1.getNumber()).thenReturn(currentVersionNumber);
@@ -106,6 +107,7 @@ public void testFilterStoresForCompaction() {
106107
when(version6.getNumber()).thenReturn(currentVersionNumber);
107108
when(version7.getNumber()).thenReturn(currentVersionNumber);
108109
when(version8.getNumber()).thenReturn(currentVersionNumber);
110+
when(version9.getNumber()).thenReturn(currentVersionNumber);
109111

110112
// set createTime for Version mocks
111113

@@ -145,6 +147,10 @@ public void testFilterStoresForCompaction() {
145147
when(version8.getCreatedTime())
146148
.thenReturn(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(COMPACTION_THRESHOLD + 1));
147149

150+
// 25 hours
151+
when(version9.getCreatedTime())
152+
.thenReturn(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(COMPACTION_THRESHOLD + 1));
153+
148154
// Mock StoreInfo instances
149155
StoreInfo store1 = new StoreInfo();
150156
StoreInfo store2 = new StoreInfo();
@@ -154,6 +160,7 @@ public void testFilterStoresForCompaction() {
154160
StoreInfo store6 = new StoreInfo();
155161
StoreInfo store7 = new StoreInfo();
156162
StoreInfo store8 = new StoreInfo();
163+
StoreInfo store9 = new StoreInfo();
157164

158165
// Set store names
159166
store1.setName(TEST_STORE_NAME_PREFIX + "1");
@@ -164,6 +171,7 @@ public void testFilterStoresForCompaction() {
164171
store6.setName(TEST_STORE_NAME_PREFIX + "6");
165172
store7.setName(TEST_STORE_NAME_PREFIX + "7");
166173
store8.setName(TEST_STORE_NAME_PREFIX + "8");
174+
store9.setName(TEST_STORE_NAME_PREFIX + "9");
167175

168176
// Return Version mocks when getVersion() is called
169177
store1.setVersions(Collections.singletonList(version1));
@@ -174,6 +182,7 @@ public void testFilterStoresForCompaction() {
174182
store6.setVersions(Collections.singletonList(version6));
175183
store7.setVersions(Collections.singletonList(version7));
176184
store8.setVersions(Collections.singletonList(version8));
185+
store9.setVersions(Collections.singletonList(version9));
177186

178187
// Mock HybridStoreConfig for the first two StoreInfo instances
179188
store1.setHybridStoreConfig(mock(HybridStoreConfig.class));
@@ -183,6 +192,7 @@ public void testFilterStoresForCompaction() {
183192
store6.setHybridStoreConfig(mock(HybridStoreConfig.class));
184193
store7.setHybridStoreConfig(mock(HybridStoreConfig.class));
185194
store8.setHybridStoreConfig(mock(HybridStoreConfig.class));
195+
store9.setHybridStoreConfig(mock(HybridStoreConfig.class));
186196

187197
// Set isActiveActiveReplicationEnabled for the first two StoreInfo instances
188198
store1.setActiveActiveReplicationEnabled(true);
@@ -193,6 +203,7 @@ public void testFilterStoresForCompaction() {
193203
store6.setActiveActiveReplicationEnabled(true);
194204
store7.setActiveActiveReplicationEnabled(true);
195205
store8.setActiveActiveReplicationEnabled(true);
206+
store9.setActiveActiveReplicationEnabled(true);
196207

197208
// Set compaction enabled for all but store6
198209
store1.setCompactionEnabled(true);
@@ -203,6 +214,7 @@ public void testFilterStoresForCompaction() {
203214
store6.setCompactionEnabled(false);
204215
store7.setCompactionEnabled(true);
205216
store8.setCompactionEnabled(true);
217+
store9.setCompactionEnabled(true);
206218

207219
// Set store-level compaction threshold
208220
store1.setCompactionThreshold(-1);
@@ -213,6 +225,18 @@ public void testFilterStoresForCompaction() {
213225
store6.setCompactionThreshold(-1);
214226
store7.setCompactionThreshold(TimeUnit.HOURS.toMillis(COMPACTION_THRESHOLD) * 2);
215227
store8.setCompactionThreshold(TimeUnit.HOURS.toMillis(COMPACTION_THRESHOLD) / 2);
228+
store9.setCompactionThreshold(-1);
229+
230+
// Set store-level Writes Enabled
231+
store1.setEnableStoreWrites(true);
232+
store2.setEnableStoreWrites(true);
233+
store3.setEnableStoreWrites(true);
234+
store4.setEnableStoreWrites(true);
235+
store5.setEnableStoreWrites(true);
236+
store6.setEnableStoreWrites(true);
237+
store7.setEnableStoreWrites(true);
238+
store8.setEnableStoreWrites(true);
239+
store9.setEnableStoreWrites(false);
216240

217241
// Add StoreInfo instances to the list
218242
storeInfoList.add(store1);
@@ -223,6 +247,7 @@ public void testFilterStoresForCompaction() {
223247
storeInfoList.add(store6);
224248
storeInfoList.add(store7);
225249
storeInfoList.add(store8);
250+
storeInfoList.add(store9);
226251

227252
// Verify stores compaction-ready status
228253
assertTrue(testCompactionManager.filterStore(store1, TEST_CLUSTER_NAME_1)); // compacted more than threshold
@@ -243,6 +268,7 @@ public void testFilterStoresForCompaction() {
243268
assertFalse(testCompactionManager.filterStore(store7, TEST_CLUSTER_NAME_1)); // Store level threshold not
244269
// reached
245270
assertTrue(testCompactionManager.filterStore(store8, TEST_CLUSTER_NAME_1)); // Store level threshold reached
271+
assertFalse(testCompactionManager.filterStore(store9, TEST_CLUSTER_NAME_1)); // Store level threshold reached
246272

247273
// Test
248274
List<StoreInfo> compactionReadyStores =
@@ -257,6 +283,7 @@ public void testFilterStoresForCompaction() {
257283
verify(mockLogCompactionStats, Mockito.times(0)).recordStoreNominatedForCompactionCount(store6.getName());
258284
verify(mockLogCompactionStats, Mockito.times(0)).recordStoreNominatedForCompactionCount(store7.getName());
259285
verify(mockLogCompactionStats, Mockito.times(1)).recordStoreNominatedForCompactionCount(store8.getName());
286+
verify(mockLogCompactionStats, Mockito.times(0)).recordStoreNominatedForCompactionCount(store9.getName());
260287

261288
// Validate setCompactionEligible metric emission
262289
verify(mockLogCompactionStats, Mockito.times(1)).setCompactionEligible(store1.getName());
@@ -267,6 +294,7 @@ public void testFilterStoresForCompaction() {
267294
verify(mockLogCompactionStats, Mockito.times(0)).setCompactionEligible(store6.getName());
268295
verify(mockLogCompactionStats, Mockito.times(0)).setCompactionEligible(store7.getName());
269296
verify(mockLogCompactionStats, Mockito.times(1)).setCompactionEligible(store8.getName());
297+
verify(mockLogCompactionStats, Mockito.times(0)).setCompactionEligible(store9.getName());
270298

271299
// Test validation
272300
assertEquals(compactionReadyStores.size(), 3);
@@ -278,6 +306,7 @@ public void testFilterStoresForCompaction() {
278306
assertFalse(compactionReadyStores.contains(store6));
279307
assertFalse(compactionReadyStores.contains(store7));
280308
assertTrue(compactionReadyStores.contains(store8));
309+
assertFalse(compactionReadyStores.contains(store9));
281310
}
282311

283312
@Test(expectedExceptions = VeniceException.class)
@@ -371,6 +400,7 @@ private StoreInfo createMockStoreInfo(String storeName, Version version) {
371400
when(storeInfo.getVersions()).thenReturn(Collections.singletonList(version));
372401
when(storeInfo.isActiveActiveReplicationEnabled()).thenReturn(true);
373402
when(storeInfo.isCompactionEnabled()).thenReturn(true);
403+
when(storeInfo.isEnableStoreWrites()).thenReturn(true);
374404
return storeInfo;
375405
}
376406

0 commit comments

Comments
 (0)