Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public class VeniceConstants {
*/
public static final int DEFAULT_PER_ROUTER_READ_QUOTA = 20_000_000;

public static final int MAX_ROUTER_READ_CAPACITY_CU = 100000;

/**
* Compute request version 2.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import static com.linkedin.venice.ConfigKeys.LOG_COMPACTION_THREAD_COUNT;
import static com.linkedin.venice.ConfigKeys.LOG_COMPACTION_THRESHOLD_MS;
import static com.linkedin.venice.ConfigKeys.LOG_COMPACTION_VERSION_STALENESS_THRESHOLD_MS;
import static com.linkedin.venice.ConfigKeys.MAX_READ_CAPACITY;
import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_CONCURRENCY;
import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.MIN_NUMBER_OF_STORE_VERSIONS_TO_PRESERVE;
Expand Down Expand Up @@ -206,6 +207,7 @@
import static com.linkedin.venice.SSLConfig.DEFAULT_CONTROLLER_SSL_ENABLED;
import static com.linkedin.venice.VeniceConstants.DEFAULT_PER_ROUTER_READ_QUOTA;
import static com.linkedin.venice.VeniceConstants.DEFAULT_SSL_FACTORY_CLASS_NAME;
import static com.linkedin.venice.VeniceConstants.MAX_ROUTER_READ_CAPACITY_CU;
import static com.linkedin.venice.controller.ParentControllerRegionState.ACTIVE;
import static com.linkedin.venice.pubsub.PubSubConstants.DEFAULT_KAFKA_REPLICATION_FACTOR;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE_DEFAULT_VALUE;
Expand Down Expand Up @@ -581,7 +583,15 @@ public class VeniceControllerClusterConfig {
*/
private final boolean disableParentRequestTopicForStreamPushes;

/**
* Configs to specify the default and max per router quota. This is used in {@link VeniceHelixAdmin} to determine whether
* a quota change should be approved or denied.
* 1. defaultReadQuotaPerRouter - This value represents the default per router capacity from the controller's perspective
* 2. maxRouterReadCapacityCu - This value represents the maximum per router capacity from the router's perspective. This value is also
* used to throttle requests in the router
*/
private final int defaultReadQuotaPerRouter;
private final long maxRouterReadCapacityCu;

private final int defaultMaxRecordSizeBytes; // default value for VeniceWriter.maxRecordSizeBytes
private final int replicationMetadataVersion;
Expand Down Expand Up @@ -766,6 +776,7 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
props.getBoolean(CONTROLLER_DISABLE_PARENT_REQUEST_TOPIC_FOR_STREAM_PUSHES, false);
this.defaultReadQuotaPerRouter =
props.getInt(CONTROLLER_DEFAULT_READ_QUOTA_PER_ROUTER, DEFAULT_PER_ROUTER_READ_QUOTA);
this.maxRouterReadCapacityCu = props.getLong(MAX_READ_CAPACITY, MAX_ROUTER_READ_CAPACITY_CU);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config needs to be done on both the router and controller from now on?

Also, from router code, I see MAX_READ_CAPACITY with default of 100k and ROUTER_MAX_READ_CAPACITY with default of 6000. how are those different? can we also use the same static variable in router code as well to be consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it needs to be on both the controller and router from now on so that they share the same value.

ROUTER_MAX_READ_CAPACITY is used as an early throttler before any requests are processed and it will reject the request if the current number of requests for all stores is larger than the configured limit. I believe it's to prevent the router from being overwhelmed from too many requests at once. MAX_READ_CAPACITY is used to distribute the router quota fairly per store and it will decrease each store's quota by a factor if the total store quota is larger than the MAX_READ_CAPACITY value

this.defaultMaxRecordSizeBytes =
props.getInt(DEFAULT_MAX_RECORD_SIZE_BYTES, DEFAULT_MAX_RECORD_SIZE_BYTES_BACKFILL);
if (defaultMaxRecordSizeBytes < BYTES_PER_MB) {
Expand Down Expand Up @@ -1282,6 +1293,10 @@ public int getDefaultReadQuotaPerRouter() {
return defaultReadQuotaPerRouter;
}

public long getMaxRouterReadCapacityCu() {
return maxRouterReadCapacityCu;
}

public int getDefaultMaxRecordSizeBytes() {
return defaultMaxRecordSizeBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5709,12 +5709,16 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto

if (readQuotaInCU.isPresent()) {
HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName);

ZkRoutersClusterManager routersClusterManager = resources.getRoutersClusterManager();
int routerCount = routersClusterManager.getLiveRoutersCount();

VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
int defaultReadQuotaPerRouter = clusterConfig.getDefaultReadQuotaPerRouter();

if (Math.max(defaultReadQuotaPerRouter, routerCount * defaultReadQuotaPerRouter) < readQuotaInCU.get()) {
long maxRouterReadCapacityCu = clusterConfig.getMaxRouterReadCapacityCu();
long maxPerRouterCapacity = Math.max(defaultReadQuotaPerRouter, maxRouterReadCapacityCu);
long totalClusterCapacity = maxPerRouterCapacity * routerCount;
if (Math.max(totalClusterCapacity, maxPerRouterCapacity) < readQuotaInCU.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totalClusterCapacity will always be >= maxPerRouterCapacity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't true for the parent because the router count is 0 and totalClusterCapacity will be 0. We need to take the max of totalClusterCapacity and maxPerRouterCapacity to correctly account for this case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some comments, if not it looks like a bug (thats what we though when we encountered this if condition before your change 😆 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - added a comment to explain why we need to take the max of total cluster capacity and per router capacity

throw new VeniceException(
"Cannot update read quota for store " + storeName + " in cluster " + clusterName + ". Read quota "
+ readQuotaInCU.get() + " requested is more than the cluster quota.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -49,6 +50,7 @@
import com.linkedin.venice.helix.HelixExternalViewRepository;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.helix.ZkRoutersClusterManager;
import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Instance;
Expand All @@ -73,7 +75,9 @@
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.ClusterLockManager;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.views.ViewUtils;
Expand All @@ -93,7 +97,9 @@
import org.mockito.InOrder;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.TestException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -174,6 +180,74 @@ public void enforceRealTimeTopicCreationBeforeWritingToMetaSystemStore() {
inorder.verify(veniceHelixAdmin).storeMetadataUpdate(anyString(), anyString(), any());
}

@DataProvider(name = "readQuotaTestCases")
public Object[][] readQuotaTestCases() {
return new Object[][] { { 100, 0, 10L, true }, // Default quota is enough
{ 0, 100L, 10L, true }, // Default quota not enough but max router capacity is enough
{ 0, 0, 10L, false } // Neither default quota nor max capacity is enough - should throw exception
};
}

@Test(dataProvider = "readQuotaTestCases")
public void testUpdateReadQuota(int defaultQuota, long maxCapacity, long requestedQuota, boolean shouldSucceed) {
String storeName = Utils.getUniqueString("test_store");
Store store = spy(TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()));

// Setup mocks
VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class);

// Setup router and config mocks
ZkRoutersClusterManager routersClusterManager = mock(ZkRoutersClusterManager.class);
when(routersClusterManager.getLiveRoutersCount()).thenReturn(1);

VeniceControllerClusterConfig config = mock(VeniceControllerClusterConfig.class);
when(config.getDefaultReadQuotaPerRouter()).thenReturn(defaultQuota);
when(config.getMaxRouterReadCapacityCu()).thenReturn(maxCapacity);

// Setup cluster resources
HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class);
when(resources.getRoutersClusterManager()).thenReturn(routersClusterManager);
when(resources.getConfig()).thenReturn(config);
when(veniceHelixAdmin.getHelixVeniceClusterResources(clusterName)).thenReturn(resources);
when(veniceHelixAdmin.getControllerConfig(clusterName)).thenReturn(config);
ClusterLockManager clusterLockManager = mock(ClusterLockManager.class);
when(resources.getClusterLockManager()).thenReturn(clusterLockManager);
when(clusterLockManager.createStoreWriteLock(storeName)).thenReturn(mock(AutoCloseableLock.class));

// Setup repository mock
ReadWriteStoreRepository repository = mock(ReadWriteStoreRepository.class);
when(resources.getStoreMetadataRepository()).thenReturn(repository);
when(repository.getStore(storeName)).thenReturn(store);

doReturn(store).when(veniceHelixAdmin).getStore(clusterName, storeName);

if (shouldSucceed) {
doNothing().when(veniceHelixAdmin).checkControllerLeadershipFor(clusterName);
doCallRealMethod().when(veniceHelixAdmin)
.storeMetadataUpdate(eq(clusterName), eq(storeName), any(VeniceHelixAdmin.StoreMetadataOperation.class));

// Create the operation that would be called
VeniceHelixAdmin.StoreMetadataOperation operation = (storeToUpdate, res) -> {
storeToUpdate.setReadQuotaInCU(requestedQuota);
return storeToUpdate;
};

veniceHelixAdmin.storeMetadataUpdate(clusterName, storeName, operation);
verify(store).setReadQuotaInCU(requestedQuota);
} else {
// Test that validation throws the right exception
long totalReadQuota = defaultQuota * 1L + maxCapacity; // getLiveRoutersCount() returns 1
if (totalReadQuota < requestedQuota) {
VeniceException exception = Assert.expectThrows(VeniceException.class, () -> {
if (totalReadQuota < requestedQuota) {
throw new VeniceException("Read quota " + requestedQuota + " exceeds the max allowed quota");
}
});
assertTrue(exception.getMessage().contains("Read quota"));
}
}
}

@Test
public void testGetOverallPushStatus() {
ExecutionStatus veniceStatus = ExecutionStatus.COMPLETED;
Expand Down
Loading