Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,14 @@ public static <K, V> AvroGenericStoreClient<K, V> getAndStartGenericStoreClient(
? new DispatchingVsonStoreClient<>(storeMetadata, clientConfig)
: new DispatchingAvroGenericStoreClient<>(storeMetadata, clientConfig);

InternalAvroStoreClient<K, V> retryClient = dispatchingStoreClient;
if (clientConfig.isLongTailRetryEnabledForSingleGet() || clientConfig.isLongTailRetryEnabledForBatchGet()
|| clientConfig.isLongTailRetryEnabledForCompute()) {
retryClient = new RetriableAvroGenericStoreClient<>(
dispatchingStoreClient,
clientConfig,
/**
* Reuse the {@link TimeoutProcessor} from {@link InstanceHealthMonitor} to
* reduce the thread usage.
*/
storeMetadata.getInstanceHealthMonitor().getTimeoutProcessor());
}
InternalAvroStoreClient<K, V> retryClient = new RetriableAvroGenericStoreClient<>(
dispatchingStoreClient,
clientConfig,
/**
* Reuse the {@link TimeoutProcessor} from {@link InstanceHealthMonitor} to
* reduce the thread usage.
*/
storeMetadata.getInstanceHealthMonitor().getTimeoutProcessor());

InternalAvroStoreClient<K, V> loadControlClient = retryClient;
if (clientConfig.isStoreLoadControllerEnabled()) {
Expand All @@ -103,14 +99,10 @@ public static <K, V extends SpecificRecord> AvroSpecificStoreClient<K, V> getAnd
final DispatchingAvroSpecificStoreClient<K, V> dispatchingStoreClient =
new DispatchingAvroSpecificStoreClient<>(storeMetadata, clientConfig);

InternalAvroStoreClient<K, V> retryClient = dispatchingStoreClient;
if (clientConfig.isLongTailRetryEnabledForSingleGet() || clientConfig.isLongTailRetryEnabledForBatchGet()
|| clientConfig.isLongTailRetryEnabledForCompute()) {
retryClient = new RetriableAvroSpecificStoreClient<>(
dispatchingStoreClient,
clientConfig,
storeMetadata.getInstanceHealthMonitor().getTimeoutProcessor());
}
InternalAvroStoreClient<K, V> retryClient = new RetriableAvroSpecificStoreClient<>(
dispatchingStoreClient,
clientConfig,
storeMetadata.getInstanceHealthMonitor().getTimeoutProcessor());

InternalAvroStoreClient<K, V> loadControlClient = retryClient;
if (clientConfig.isStoreLoadControllerEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.linkedin.venice.fastclient.factory;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import com.linkedin.alpini.base.concurrency.TimeoutProcessor;
import com.linkedin.d2.balancer.D2Client;
import com.linkedin.r2.transport.common.Client;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
import com.linkedin.venice.fastclient.ClientConfig;
import com.linkedin.venice.fastclient.DelegatingAvroStoreClient;
import com.linkedin.venice.fastclient.RetriableAvroGenericStoreClient;
import com.linkedin.venice.fastclient.RetriableAvroSpecificStoreClient;
import com.linkedin.venice.fastclient.meta.InstanceHealthMonitor;
import com.linkedin.venice.fastclient.meta.StoreMetadata;
import com.linkedin.venice.read.protocol.response.streaming.StreamingFooterRecordV1;
import io.tehuti.metrics.MetricsRepository;
import java.lang.reflect.Field;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class ClientFactoryTest {
private static final String STORE_NAME = "test_store";

private StoreMetadata mockStoreMetadata;
private InstanceHealthMonitor mockHealthMonitor;
private TimeoutProcessor mockTimeoutProcessor;
private ClientConfig.ClientConfigBuilder baseConfigBuilder;

@BeforeMethod
public void setUp() {
// Mock dependencies
mockStoreMetadata = mock(StoreMetadata.class);
mockHealthMonitor = mock(InstanceHealthMonitor.class);
mockTimeoutProcessor = mock(TimeoutProcessor.class);

when(mockStoreMetadata.getInstanceHealthMonitor()).thenReturn(mockHealthMonitor);
when(mockHealthMonitor.getTimeoutProcessor()).thenReturn(mockTimeoutProcessor);

// Base config builder with minimum required fields
baseConfigBuilder = new ClientConfig.ClientConfigBuilder<>().setStoreName(STORE_NAME)
.setR2Client(mock(Client.class))
.setD2Client(mock(D2Client.class))
.setClusterDiscoveryD2Service("test_service")
.setMetricsRepository(new MetricsRepository());
}

@Test
public void testRetryClientAlwaysInGenericChain() throws Exception {
ClientConfig clientConfig = baseConfigBuilder.build();

AvroGenericStoreClient<String, Object> client =
ClientFactory.getAndStartGenericStoreClient(mockStoreMetadata, clientConfig);

assertNotNull(client);

// Verify RetriableAvroGenericStoreClient is always in the delegation chain
assertTrue(
containsRetryClientInChain(client),
"RetriableAvroGenericStoreClient should always be present in the client delegation chain");
}

@Test
public void testRetryClientAlwaysInSpecificChain() throws Exception {
ClientConfig clientConfig = baseConfigBuilder.setSpecificValueClass(StreamingFooterRecordV1.class).build();

AvroSpecificStoreClient<String, StreamingFooterRecordV1> client =
ClientFactory.getAndStartSpecificStoreClient(mockStoreMetadata, clientConfig);

assertNotNull(client);

// Verify RetriableAvroSpecificStoreClient is always in the delegation chain
assertTrue(
containsRetryClientInChainSpecific(client),
"RetriableAvroSpecificStoreClient should always be present in the client delegation chain");
}

private boolean containsRetryClientInChain(AvroGenericStoreClient<?, ?> client) throws Exception {
return findClientInDelegationChain(client, RetriableAvroGenericStoreClient.class) != null;
}

private boolean containsRetryClientInChainSpecific(AvroSpecificStoreClient<?, ?> client) throws Exception {
return findClientInDelegationChain(client, RetriableAvroSpecificStoreClient.class) != null;
}

private Object findClientInDelegationChain(Object client, Class<?> targetClass) throws Exception {
if (client == null) {
return null;
}

if (targetClass.isInstance(client)) {
return client;
}

if (client instanceof DelegatingAvroStoreClient) {
try {
Field delegateField = DelegatingAvroStoreClient.class.getDeclaredField("delegate");
delegateField.setAccessible(true);
Object delegate = delegateField.get(client);
return findClientInDelegationChain(delegate, targetClass);
} catch (NoSuchFieldException | IllegalAccessException e) {
// If we can't access the delegate field, stop traversing
return null;
}
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public void testStreamingBatchGetLimit() throws Exception {
future.get();
fail(VeniceKeyCountLimitException.class.getSimpleName() + " should be thrown");
} catch (Exception e) {
assertEquals(VeniceKeyCountLimitException.class, e.getCause().getClass());
assertEquals(VeniceKeyCountLimitException.class, e.getCause().getCause().getClass());
}
// Update the store config to increase batch-get key limit
veniceCluster.useControllerClient(
Expand Down
Loading