Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ public class VeniceConstants {
public static final String SYSTEM_STORE_OWNER = "venice-internal";

/**
* Default per router max read quota; notice that this value is used in controller;
* the actual per router max read quota is defined as a router config "max.read.capacity".
*
* TODO: Support common configs among different components, so that we can define the config value once
* and used everywhere.
* The default max read quota per router defined in "max.read.capacity". This value is used in both
* the controller and router
*/
public static final int DEFAULT_PER_ROUTER_READ_QUOTA = 20_000_000;
public static final int MAX_ROUTER_READ_CAPACITY_CU = 20_000_000;

/**
* Compute request version 2.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.STORE_WRITER_BUFFER_AFTER_LEADER_LOGIC_ENABLED;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
import static com.linkedin.venice.VeniceConstants.DEFAULT_PER_ROUTER_READ_QUOTA;
import static com.linkedin.venice.VeniceConstants.MAX_ROUTER_READ_CAPACITY_CU;
import static com.linkedin.venice.integration.utils.VeniceServerWrapper.CLIENT_CONFIG_FOR_CONSUMER;
import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVER_ENABLE_SERVER_ALLOW_LIST;
import static com.linkedin.venice.integration.utils.VeniceServerWrapper.SERVER_ENABLE_SSL;
Expand Down Expand Up @@ -1143,7 +1143,7 @@ public void createStoreForJob(Schema recordSchema, Properties props) {
new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA)
.setCompressionStrategy(CompressionStrategy.NO_OP)
.setBatchGetLimit(2000)
.setReadQuotaInCU(DEFAULT_PER_ROUTER_READ_QUOTA)
.setReadQuotaInCU(MAX_ROUTER_READ_CAPACITY_CU)
.setChunkingEnabled(false)
.setIncrementalPushEnabled(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static com.linkedin.venice.ConfigKeys.SSL_TO_STORAGE_NODES;
import static com.linkedin.venice.ConfigKeys.SYSTEM_SCHEMA_CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
import static com.linkedin.venice.VeniceConstants.DEFAULT_PER_ROUTER_READ_QUOTA;
import static com.linkedin.venice.VeniceConstants.MAX_ROUTER_READ_CAPACITY_CU;
import static com.linkedin.venice.integration.utils.VeniceClusterWrapperConstants.ROUTER_PORT_TO_USE_IN_VENICE_ROUTER_WRAPPER;
import static com.linkedin.venice.router.RouterServer.ROUTER_SERVICE_METRIC_ENTITIES;
import static com.linkedin.venice.stats.VeniceMetricsConfig.OTEL_EXPORTER_OTLP_METRICS_DEFAULT_HISTOGRAM_AGGREGATION;
Expand Down Expand Up @@ -158,7 +158,7 @@ static StatefulServiceProvider<VeniceRouterWrapper> generateService(
.put(ROUTER_MAX_OUTGOING_CONNECTION, 10)
// To speed up test
.put(ROUTER_NETTY_GRACEFUL_SHUTDOWN_PERIOD_SECONDS, 1)
.put(MAX_READ_CAPACITY, DEFAULT_PER_ROUTER_READ_QUOTA)
.put(MAX_READ_CAPACITY, MAX_ROUTER_READ_CAPACITY_CU)
.put(SYSTEM_SCHEMA_CLUSTER_NAME, clusterName)
.put(ROUTER_STORAGE_NODE_CLIENT_TYPE, StorageNodeClientType.APACHE_HTTP_ASYNC_CLIENT.name())
// OpenTelemetry configs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.router;

import static com.linkedin.venice.VeniceConstants.DEFAULT_PER_ROUTER_READ_QUOTA;
import static com.linkedin.venice.VeniceConstants.MAX_ROUTER_READ_CAPACITY_CU;
import static com.linkedin.venice.router.httpclient.StorageNodeClientType.APACHE_HTTP_ASYNC_CLIENT;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -138,7 +138,7 @@ public void setUp() throws InterruptedException, ExecutionException, VeniceClien
VersionCreationResponse creationResponse = veniceCluster.getNewVersion(storeName, false);

storeVersionName = creationResponse.getKafkaTopic();
veniceCluster.updateStore(storeName, new UpdateStoreQueryParams().setReadQuotaInCU(DEFAULT_PER_ROUTER_READ_QUOTA));
veniceCluster.updateStore(storeName, new UpdateStoreQueryParams().setReadQuotaInCU(MAX_ROUTER_READ_CAPACITY_CU));

valueSchemaId = HelixReadOnlySchemaRepository.VALUE_SCHEMA_STARTING_ID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import static com.linkedin.venice.CommonConfigKeys.SSL_ENABLED;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.MULTI_REGION;
import static com.linkedin.venice.VeniceConstants.DEFAULT_PER_ROUTER_READ_QUOTA;
import static com.linkedin.venice.VeniceConstants.MAX_ROUTER_READ_CAPACITY_CU;
import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.D2_SERVICE_NAME;
import static com.linkedin.venice.integration.utils.VeniceControllerWrapper.PARENT_D2_SERVICE_NAME;
import static com.linkedin.venice.samza.VeniceSystemFactory.DEPLOYMENT_ID;
Expand Down Expand Up @@ -382,7 +382,7 @@ public static ControllerClient createStoreForJob(
new UpdateStoreQueryParams().setStorageQuotaInByte(Store.UNLIMITED_STORAGE_QUOTA)
.setCompressionStrategy(compressionStrategy)
.setBatchGetLimit(2000)
.setReadQuotaInCU(DEFAULT_PER_ROUTER_READ_QUOTA)
.setReadQuotaInCU(MAX_ROUTER_READ_CAPACITY_CU)
.setChunkingEnabled(chunkingEnabled)
.setIncrementalPushEnabled(incrementalPushEnabled);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import static com.linkedin.venice.ConfigKeys.CONTROLLER_CLUSTER_ZK_ADDRESSS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DANGLING_TOPIC_CLEAN_UP_INTERVAL_SECOND;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DANGLING_TOPIC_OCCURRENCE_THRESHOLD_FOR_CLEANUP;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFAULT_READ_QUOTA_PER_ROUTER;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SERVICE_ENABLED;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DEFERRED_VERSION_SWAP_SLEEP_MS;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_DISABLED_REPLICA_ENABLER_INTERVAL_MS;
Expand Down Expand Up @@ -157,6 +156,7 @@
import static com.linkedin.venice.ConfigKeys.LOG_COMPACTION_THREAD_COUNT;
import static com.linkedin.venice.ConfigKeys.LOG_COMPACTION_THRESHOLD_MS;
import static com.linkedin.venice.ConfigKeys.LOG_COMPACTION_VERSION_STALENESS_THRESHOLD_MS;
import static com.linkedin.venice.ConfigKeys.MAX_READ_CAPACITY;
import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_CONCURRENCY;
import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.MIN_NUMBER_OF_STORE_VERSIONS_TO_PRESERVE;
Expand Down Expand Up @@ -204,8 +204,8 @@
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
import static com.linkedin.venice.PushJobCheckpoints.DEFAULT_PUSH_JOB_USER_ERROR_CHECKPOINTS;
import static com.linkedin.venice.SSLConfig.DEFAULT_CONTROLLER_SSL_ENABLED;
import static com.linkedin.venice.VeniceConstants.DEFAULT_PER_ROUTER_READ_QUOTA;
import static com.linkedin.venice.VeniceConstants.DEFAULT_SSL_FACTORY_CLASS_NAME;
import static com.linkedin.venice.VeniceConstants.MAX_ROUTER_READ_CAPACITY_CU;
import static com.linkedin.venice.controller.ParentControllerRegionState.ACTIVE;
import static com.linkedin.venice.pubsub.PubSubConstants.DEFAULT_KAFKA_REPLICATION_FACTOR;
import static com.linkedin.venice.pubsub.PubSubConstants.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_CONSUMER_POOL_SIZE_DEFAULT_VALUE;
Expand Down Expand Up @@ -581,7 +581,12 @@ public class VeniceControllerClusterConfig {
*/
private final boolean disableParentRequestTopicForStreamPushes;

private final int defaultReadQuotaPerRouter;
/**
* Configs to specify the default and max per router quota. This is used in {@link VeniceHelixAdmin} to determine whether
* a quota change should be approved or denied and for throttling reads. This value represents the maximum capacity a single
* router can handle
*/
private final long maxRouterReadCapacityCu;

private final int defaultMaxRecordSizeBytes; // default value for VeniceWriter.maxRecordSizeBytes
private final int replicationMetadataVersion;
Expand Down Expand Up @@ -764,8 +769,7 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.jettyConfigOverrides = props.clipAndFilterNamespace(CONTROLLER_JETTY_CONFIG_OVERRIDE_PREFIX);
this.disableParentRequestTopicForStreamPushes =
props.getBoolean(CONTROLLER_DISABLE_PARENT_REQUEST_TOPIC_FOR_STREAM_PUSHES, false);
this.defaultReadQuotaPerRouter =
props.getInt(CONTROLLER_DEFAULT_READ_QUOTA_PER_ROUTER, DEFAULT_PER_ROUTER_READ_QUOTA);
this.maxRouterReadCapacityCu = props.getLong(MAX_READ_CAPACITY, MAX_ROUTER_READ_CAPACITY_CU);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config needs to be done on both the router and controller from now on?

Also, from router code, I see MAX_READ_CAPACITY with default of 100k and ROUTER_MAX_READ_CAPACITY with default of 6000. how are those different? can we also use the same static variable in router code as well to be consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it needs to be on both the controller and router from now on so that they share the same value.

ROUTER_MAX_READ_CAPACITY is used as an early throttler before any requests are processed and it will reject the request if the current number of requests for all stores is larger than the configured limit. I believe it's to prevent the router from being overwhelmed from too many requests at once. MAX_READ_CAPACITY is used to distribute the router quota fairly per store and it will decrease each store's quota by a factor if the total store quota is larger than the MAX_READ_CAPACITY value

this.defaultMaxRecordSizeBytes =
props.getInt(DEFAULT_MAX_RECORD_SIZE_BYTES, DEFAULT_MAX_RECORD_SIZE_BYTES_BACKFILL);
if (defaultMaxRecordSizeBytes < BYTES_PER_MB) {
Expand Down Expand Up @@ -1278,8 +1282,8 @@ public boolean isErrorLeaderReplicaFailOverEnabled() {
return errorLeaderReplicaFailOverEnabled;
}

public int getDefaultReadQuotaPerRouter() {
return defaultReadQuotaPerRouter;
public long getMaxRouterReadCapacityCu() {
return maxRouterReadCapacityCu;
}

public int getDefaultMaxRecordSizeBytes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5709,12 +5709,15 @@ private void internalUpdateStore(String clusterName, String storeName, UpdateSto

if (readQuotaInCU.isPresent()) {
HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName);

ZkRoutersClusterManager routersClusterManager = resources.getRoutersClusterManager();
int routerCount = routersClusterManager.getLiveRoutersCount();
VeniceControllerClusterConfig clusterConfig = getHelixVeniceClusterResources(clusterName).getConfig();
int defaultReadQuotaPerRouter = clusterConfig.getDefaultReadQuotaPerRouter();
long maxRouterReadCapacityCu = clusterConfig.getMaxRouterReadCapacityCu();

if (Math.max(defaultReadQuotaPerRouter, routerCount * defaultReadQuotaPerRouter) < readQuotaInCU.get()) {
// Take the max of total cluster capacity based on live routers and per router capacity because parent
// admin has 0 live routers and total cluster capacity will always be 0
if (Math.max(maxRouterReadCapacityCu * routerCount, maxRouterReadCapacityCu) < readQuotaInCU.get()) {
throw new VeniceException(
"Cannot update read quota for store " + storeName + " in cluster " + clusterName + ". Read quota "
+ readQuotaInCU.get() + " requested is more than the cluster quota.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -49,6 +50,7 @@
import com.linkedin.venice.helix.HelixExternalViewRepository;
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.helix.ZkRoutersClusterManager;
import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Instance;
Expand All @@ -73,7 +75,9 @@
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.ClusterLockManager;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.views.ViewUtils;
Expand All @@ -93,7 +97,9 @@
import org.mockito.InOrder;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.TestException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -174,6 +180,72 @@ public void enforceRealTimeTopicCreationBeforeWritingToMetaSystemStore() {
inorder.verify(veniceHelixAdmin).storeMetadataUpdate(anyString(), anyString(), any());
}

@DataProvider(name = "readQuotaTestCases")
public Object[][] readQuotaTestCases() {
return new Object[][] { { 100, 10L, true }, // Quota is enough
{ 0, 10L, false } // Quota is not enough - should throw exception
};
}

@Test(dataProvider = "readQuotaTestCases")
public void testUpdateReadQuota(long maxCapacity, long requestedQuota, boolean shouldSucceed) {
String storeName = Utils.getUniqueString("test_store");
Store store = spy(TestUtils.createTestStore(storeName, "test", System.currentTimeMillis()));

// Setup mocks
VeniceHelixAdmin veniceHelixAdmin = mock(VeniceHelixAdmin.class);

// Setup router and config mocks
ZkRoutersClusterManager routersClusterManager = mock(ZkRoutersClusterManager.class);
when(routersClusterManager.getLiveRoutersCount()).thenReturn(1);

VeniceControllerClusterConfig config = mock(VeniceControllerClusterConfig.class);
when(config.getMaxRouterReadCapacityCu()).thenReturn(maxCapacity);

// Setup cluster resources
HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class);
when(resources.getRoutersClusterManager()).thenReturn(routersClusterManager);
when(resources.getConfig()).thenReturn(config);
when(veniceHelixAdmin.getHelixVeniceClusterResources(clusterName)).thenReturn(resources);
when(veniceHelixAdmin.getControllerConfig(clusterName)).thenReturn(config);
ClusterLockManager clusterLockManager = mock(ClusterLockManager.class);
when(resources.getClusterLockManager()).thenReturn(clusterLockManager);
when(clusterLockManager.createStoreWriteLock(storeName)).thenReturn(mock(AutoCloseableLock.class));

// Setup repository mock
ReadWriteStoreRepository repository = mock(ReadWriteStoreRepository.class);
when(resources.getStoreMetadataRepository()).thenReturn(repository);
when(repository.getStore(storeName)).thenReturn(store);

doReturn(store).when(veniceHelixAdmin).getStore(clusterName, storeName);

if (shouldSucceed) {
doNothing().when(veniceHelixAdmin).checkControllerLeadershipFor(clusterName);
doCallRealMethod().when(veniceHelixAdmin)
.storeMetadataUpdate(eq(clusterName), eq(storeName), any(VeniceHelixAdmin.StoreMetadataOperation.class));

// Create the operation that would be called
VeniceHelixAdmin.StoreMetadataOperation operation = (storeToUpdate, res) -> {
storeToUpdate.setReadQuotaInCU(requestedQuota);
return storeToUpdate;
};

veniceHelixAdmin.storeMetadataUpdate(clusterName, storeName, operation);
verify(store).setReadQuotaInCU(requestedQuota);
} else {
// Test that validation throws the right exception
long totalReadQuota = 1L + maxCapacity; // getLiveRoutersCount() returns 1
if (totalReadQuota < requestedQuota) {
VeniceException exception = Assert.expectThrows(VeniceException.class, () -> {
if (totalReadQuota < requestedQuota) {
throw new VeniceException("Read quota " + requestedQuota + " exceeds the max allowed quota");
}
});
assertTrue(exception.getMessage().contains("Read quota"));
}
}
}

@Test
public void testGetOverallPushStatus() {
ExecutionStatus veniceStatus = ExecutionStatus.COMPLETED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import static com.linkedin.venice.ConfigKeys.SYSTEM_SCHEMA_CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
import static com.linkedin.venice.VeniceConstants.MAX_ROUTER_READ_CAPACITY_CU;
import static com.linkedin.venice.router.api.VeniceMultiKeyRoutingStrategy.LEAST_LOADED_ROUTING;
import static com.linkedin.venice.router.api.routing.helix.HelixGroupSelectionStrategyEnum.LEAST_LOADED;

Expand Down Expand Up @@ -261,7 +262,7 @@ public VeniceRouterConfig(VeniceProperties props) {
heartbeatTimeoutMs = props.getDouble(HEARTBEAT_TIMEOUT, TimeUnit.MINUTES.toMillis(1));
heartbeatCycleMs = props.getLong(HEARTBEAT_CYCLE, TimeUnit.SECONDS.toMillis(5));
sslToStorageNodes = props.getBoolean(SSL_TO_STORAGE_NODES, false);
maxReadCapacityCu = props.getLong(MAX_READ_CAPACITY, 100000);
maxReadCapacityCu = props.getLong(MAX_READ_CAPACITY, MAX_ROUTER_READ_CAPACITY_CU);
longTailRetryForSingleGetThresholdMs = props.getInt(ROUTER_LONG_TAIL_RETRY_FOR_SINGLE_GET_THRESHOLD_MS, 15);
longTailRetryForBatchGetThresholdMs = parseRetryThresholdForBatchGet(
props.getString(
Expand Down
Loading