Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import static com.linkedin.venice.ConfigKeys.LOG_COMPACTION_THREAD_COUNT;
import static com.linkedin.venice.ConfigKeys.LOG_COMPACTION_THRESHOLD_MS;
import static com.linkedin.venice.ConfigKeys.LOG_COMPACTION_VERSION_STALENESS_THRESHOLD_MS;
import static com.linkedin.venice.ConfigKeys.MAX_READ_CAPACITY;
import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_CONCURRENCY;
import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.MIN_NUMBER_OF_STORE_VERSIONS_TO_PRESERVE;
Expand Down Expand Up @@ -582,6 +583,7 @@ public class VeniceControllerClusterConfig {
private final boolean disableParentRequestTopicForStreamPushes;

private final int defaultReadQuotaPerRouter;
private final long maxReadCapacityCu;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment on what these 2 variables mean and also revisit the name to make it more clear from the name? Lets also add router in the name if it's only for router quota

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - added a comment to describe the usage of the two variables and renamed the new config to include router


private final int defaultMaxRecordSizeBytes; // default value for VeniceWriter.maxRecordSizeBytes
private final int replicationMetadataVersion;
Expand Down Expand Up @@ -766,6 +768,7 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
props.getBoolean(CONTROLLER_DISABLE_PARENT_REQUEST_TOPIC_FOR_STREAM_PUSHES, false);
this.defaultReadQuotaPerRouter =
props.getInt(CONTROLLER_DEFAULT_READ_QUOTA_PER_ROUTER, DEFAULT_PER_ROUTER_READ_QUOTA);
this.maxReadCapacityCu = props.getLong(MAX_READ_CAPACITY, 100000);
this.defaultMaxRecordSizeBytes =
props.getInt(DEFAULT_MAX_RECORD_SIZE_BYTES, DEFAULT_MAX_RECORD_SIZE_BYTES_BACKFILL);
if (defaultMaxRecordSizeBytes < BYTES_PER_MB) {
Expand Down Expand Up @@ -1282,6 +1285,10 @@ public int getDefaultReadQuotaPerRouter() {
return defaultReadQuotaPerRouter;
}

public long getMaxReadCapacityCu() {
return maxReadCapacityCu;
}

public int getDefaultMaxRecordSizeBytes() {
return defaultMaxRecordSizeBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5709,12 +5709,16 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto

if (readQuotaInCU.isPresent()) {
HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName);

ZkRoutersClusterManager routersClusterManager = resources.getRoutersClusterManager();
int routerCount = routersClusterManager.getLiveRoutersCount();

VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
int defaultReadQuotaPerRouter = clusterConfig.getDefaultReadQuotaPerRouter();

if (Math.max(defaultReadQuotaPerRouter, routerCount * defaultReadQuotaPerRouter) < readQuotaInCU.get()) {
long maxReadCapacityCu = clusterConfig.getMaxReadCapacityCu();
long maxPerRouterCapacity = Math.max(defaultReadQuotaPerRouter, maxReadCapacityCu);
long totalClusterCapacity = maxPerRouterCapacity * routerCount;
if (Math.max(totalClusterCapacity, maxPerRouterCapacity) < readQuotaInCU.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totalClusterCapacity will always be >= maxPerRouterCapacity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't true for the parent because the router count is 0 and totalClusterCapacity will be 0. We need to take the max of totalClusterCapacity and maxPerRouterCapacity to correctly account for this case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comments, if not it looks like a bug (thats what we though when we encountered this if condition before your change 😆 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - added a comment to explain why we need to take the max of total cluster capacity and per router capacity

throw new VeniceException(
"Cannot update read quota for store " + storeName + " in cluster " + clusterName + ". Read quota "
+ readQuotaInCU.get() + " requested is more than the cluster quota.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -49,6 +50,7 @@
import com.linkedin.venice.helix.HelixExternalViewRepository;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.helix.ZkRoutersClusterManager;
import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Instance;
Expand All @@ -73,7 +75,9 @@
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.ClusterLockManager;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.views.ViewUtils;
Expand All @@ -93,7 +97,9 @@
import org.mockito.InOrder;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.TestException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -174,6 +180,74 @@ public void enforceRealTimeTopicCreationBeforeWritingToMetaSystemStore() {
inorder.verify(veniceHelixAdmin).storeMetadataUpdate(anyString(), anyString(), any());
}

@DataProvider(name = "readQuotaTestCases")
public Object[][] readQuotaTestCases() {
return new Object[][] { { 100, 0, 10L, true }, // Default quota is enough
{ 0, 100L, 10L, true }, // Default quota not enough but max router capacity is enough
{ 0, 0, 10L, false } // Neither default quota nor max capacity is enough - should throw exception
};
}

@Test(dataProvider = "readQuotaTestCases")
public void testUpdateReadQuota(int defaultQuota, long maxCapacity, long requestedQuota, boolean shouldSucceed) {
String storeName = Utils.getUniqueString("test_store");
Store store = spy(TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()));

// Setup mocks
VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class);

// Setup router and config mocks
ZkRoutersClusterManager routersClusterManager = mock(ZkRoutersClusterManager.class);
when(routersClusterManager.getLiveRoutersCount()).thenReturn(1);

VeniceControllerClusterConfig config = mock(VeniceControllerClusterConfig.class);
when(config.getDefaultReadQuotaPerRouter()).thenReturn(defaultQuota);
when(config.getMaxReadCapacityCu()).thenReturn(maxCapacity);

// Setup cluster resources
HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class);
when(resources.getRoutersClusterManager()).thenReturn(routersClusterManager);
when(resources.getConfig()).thenReturn(config);
when(veniceHelixAdmin.getHelixVeniceClusterResources(clusterName)).thenReturn(resources);
when(veniceHelixAdmin.getControllerConfig(clusterName)).thenReturn(config);
ClusterLockManager clusterLockManager = mock(ClusterLockManager.class);
when(resources.getClusterLockManager()).thenReturn(clusterLockManager);
when(clusterLockManager.createStoreWriteLock(storeName)).thenReturn(mock(AutoCloseableLock.class));

// Setup repository mock
ReadWriteStoreRepository repository = mock(ReadWriteStoreRepository.class);
when(resources.getStoreMetadataRepository()).thenReturn(repository);
when(repository.getStore(storeName)).thenReturn(store);

doReturn(store).when(veniceHelixAdmin).getStore(clusterName, storeName);

if (shouldSucceed) {
doNothing().when(veniceHelixAdmin).checkControllerLeadershipFor(clusterName);
doCallRealMethod().when(veniceHelixAdmin)
.storeMetadataUpdate(eq(clusterName), eq(storeName), any(VeniceHelixAdmin.StoreMetadataOperation.class));

// Create the operation that would be called
VeniceHelixAdmin.StoreMetadataOperation operation = (storeToUpdate, res) -> {
storeToUpdate.setReadQuotaInCU(requestedQuota);
return storeToUpdate;
};

veniceHelixAdmin.storeMetadataUpdate(clusterName, storeName, operation);
verify(store).setReadQuotaInCU(requestedQuota);
} else {
// Test that validation throws the right exception
long totalReadQuota = defaultQuota * 1L + maxCapacity; // getLiveRoutersCount() returns 1
if (totalReadQuota < requestedQuota) {
VeniceException exception = Assert.expectThrows(VeniceException.class, () -> {
if (totalReadQuota < requestedQuota) {
throw new VeniceException("Read quota " + requestedQuota + " exceeds the max allowed quota");
}
});
assertTrue(exception.getMessage().contains("Read quota"));
}
}
}

@Test
public void testGetOverallPushStatus() {
ExecutionStatus veniceStatus = ExecutionStatus.COMPLETED;
Expand Down
Loading