Skip to content

Commit bca70a2

Browse files
committed
[fc] enabling retriable client by default
1 parent 5c6fbaa commit bca70a2

File tree

2 files changed

+125
-20
lines changed

2 files changed

+125
-20
lines changed

clients/venice-client/src/main/java/com/linkedin/venice/fastclient/factory/ClientFactory.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,14 @@ public static <K, V> AvroGenericStoreClient<K, V> getAndStartGenericStoreClient(
6767
? new DispatchingVsonStoreClient<>(storeMetadata, clientConfig)
6868
: new DispatchingAvroGenericStoreClient<>(storeMetadata, clientConfig);
6969

70-
InternalAvroStoreClient<K, V> retryClient = dispatchingStoreClient;
71-
if (clientConfig.isLongTailRetryEnabledForSingleGet() || clientConfig.isLongTailRetryEnabledForBatchGet()
72-
|| clientConfig.isLongTailRetryEnabledForCompute()) {
73-
retryClient = new RetriableAvroGenericStoreClient<>(
74-
dispatchingStoreClient,
75-
clientConfig,
76-
/**
77-
* Reuse the {@link TimeoutProcessor} from {@link InstanceHealthMonitor} to
78-
* reduce the thread usage.
79-
*/
80-
storeMetadata.getInstanceHealthMonitor().getTimeoutProcessor());
81-
}
70+
InternalAvroStoreClient<K, V> retryClient = new RetriableAvroGenericStoreClient<>(
71+
dispatchingStoreClient,
72+
clientConfig,
73+
/**
74+
* Reuse the {@link TimeoutProcessor} from {@link InstanceHealthMonitor} to
75+
* reduce the thread usage.
76+
*/
77+
storeMetadata.getInstanceHealthMonitor().getTimeoutProcessor());
8278

8379
InternalAvroStoreClient<K, V> loadControlClient = retryClient;
8480
if (clientConfig.isStoreLoadControllerEnabled()) {
@@ -103,14 +99,10 @@ public static <K, V extends SpecificRecord> AvroSpecificStoreClient<K, V> getAnd
10399
final DispatchingAvroSpecificStoreClient<K, V> dispatchingStoreClient =
104100
new DispatchingAvroSpecificStoreClient<>(storeMetadata, clientConfig);
105101

106-
InternalAvroStoreClient<K, V> retryClient = dispatchingStoreClient;
107-
if (clientConfig.isLongTailRetryEnabledForSingleGet() || clientConfig.isLongTailRetryEnabledForBatchGet()
108-
|| clientConfig.isLongTailRetryEnabledForCompute()) {
109-
retryClient = new RetriableAvroSpecificStoreClient<>(
110-
dispatchingStoreClient,
111-
clientConfig,
112-
storeMetadata.getInstanceHealthMonitor().getTimeoutProcessor());
113-
}
102+
InternalAvroStoreClient<K, V> retryClient = new RetriableAvroSpecificStoreClient<>(
103+
dispatchingStoreClient,
104+
clientConfig,
105+
storeMetadata.getInstanceHealthMonitor().getTimeoutProcessor());
114106

115107
InternalAvroStoreClient<K, V> loadControlClient = retryClient;
116108
if (clientConfig.isStoreLoadControllerEnabled()) {
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package com.linkedin.venice.fastclient.factory;
2+
3+
import static org.mockito.Mockito.mock;
4+
import static org.mockito.Mockito.when;
5+
import static org.testng.Assert.assertNotNull;
6+
import static org.testng.Assert.assertTrue;
7+
8+
import com.linkedin.alpini.base.concurrency.TimeoutProcessor;
9+
import com.linkedin.d2.balancer.D2Client;
10+
import com.linkedin.r2.transport.common.Client;
11+
import com.linkedin.venice.client.store.AvroGenericStoreClient;
12+
import com.linkedin.venice.client.store.AvroSpecificStoreClient;
13+
import com.linkedin.venice.fastclient.ClientConfig;
14+
import com.linkedin.venice.fastclient.DelegatingAvroStoreClient;
15+
import com.linkedin.venice.fastclient.RetriableAvroGenericStoreClient;
16+
import com.linkedin.venice.fastclient.RetriableAvroSpecificStoreClient;
17+
import com.linkedin.venice.fastclient.meta.InstanceHealthMonitor;
18+
import com.linkedin.venice.fastclient.meta.StoreMetadata;
19+
import com.linkedin.venice.read.protocol.response.streaming.StreamingFooterRecordV1;
20+
import io.tehuti.metrics.MetricsRepository;
21+
import java.lang.reflect.Field;
22+
import org.testng.annotations.BeforeMethod;
23+
import org.testng.annotations.Test;
24+
25+
26+
public class ClientFactoryTest {
27+
private static final String STORE_NAME = "test_store";
28+
29+
private StoreMetadata mockStoreMetadata;
30+
private InstanceHealthMonitor mockHealthMonitor;
31+
private TimeoutProcessor mockTimeoutProcessor;
32+
private ClientConfig.ClientConfigBuilder baseConfigBuilder;
33+
34+
@BeforeMethod
35+
public void setUp() {
36+
// Mock dependencies
37+
mockStoreMetadata = mock(StoreMetadata.class);
38+
mockHealthMonitor = mock(InstanceHealthMonitor.class);
39+
mockTimeoutProcessor = mock(TimeoutProcessor.class);
40+
41+
when(mockStoreMetadata.getInstanceHealthMonitor()).thenReturn(mockHealthMonitor);
42+
when(mockHealthMonitor.getTimeoutProcessor()).thenReturn(mockTimeoutProcessor);
43+
44+
// Base config builder with minimum required fields
45+
baseConfigBuilder = new ClientConfig.ClientConfigBuilder<>().setStoreName(STORE_NAME)
46+
.setR2Client(mock(Client.class))
47+
.setD2Client(mock(D2Client.class))
48+
.setClusterDiscoveryD2Service("test_service")
49+
.setMetricsRepository(new MetricsRepository());
50+
}
51+
52+
@Test
53+
public void testRetryClientAlwaysInGenericChain() throws Exception {
54+
ClientConfig clientConfig = baseConfigBuilder.build();
55+
56+
AvroGenericStoreClient<String, Object> client =
57+
ClientFactory.getAndStartGenericStoreClient(mockStoreMetadata, clientConfig);
58+
59+
assertNotNull(client);
60+
61+
// Verify RetriableAvroGenericStoreClient is always in the delegation chain
62+
assertTrue(
63+
containsRetryClientInChain(client),
64+
"RetriableAvroGenericStoreClient should always be present in the client delegation chain");
65+
}
66+
67+
@Test
68+
public void testRetryClientAlwaysInSpecificChain() throws Exception {
69+
ClientConfig clientConfig = baseConfigBuilder.setSpecificValueClass(StreamingFooterRecordV1.class).build();
70+
71+
AvroSpecificStoreClient<String, StreamingFooterRecordV1> client =
72+
ClientFactory.getAndStartSpecificStoreClient(mockStoreMetadata, clientConfig);
73+
74+
assertNotNull(client);
75+
76+
// Verify RetriableAvroSpecificStoreClient is always in the delegation chain
77+
assertTrue(
78+
containsRetryClientInChainSpecific(client),
79+
"RetriableAvroSpecificStoreClient should always be present in the client delegation chain");
80+
}
81+
82+
private boolean containsRetryClientInChain(AvroGenericStoreClient<?, ?> client) throws Exception {
83+
return findClientInDelegationChain(client, RetriableAvroGenericStoreClient.class) != null;
84+
}
85+
86+
private boolean containsRetryClientInChainSpecific(AvroSpecificStoreClient<?, ?> client) throws Exception {
87+
return findClientInDelegationChain(client, RetriableAvroSpecificStoreClient.class) != null;
88+
}
89+
90+
private Object findClientInDelegationChain(Object client, Class<?> targetClass) throws Exception {
91+
if (client == null) {
92+
return null;
93+
}
94+
95+
if (targetClass.isInstance(client)) {
96+
return client;
97+
}
98+
99+
if (client instanceof DelegatingAvroStoreClient) {
100+
try {
101+
Field delegateField = DelegatingAvroStoreClient.class.getDeclaredField("delegate");
102+
delegateField.setAccessible(true);
103+
Object delegate = delegateField.get(client);
104+
return findClientInDelegationChain(delegate, targetClass);
105+
} catch (NoSuchFieldException | IllegalAccessException e) {
106+
// If we can't access the delegate field, stop traversing
107+
return null;
108+
}
109+
}
110+
return null;
111+
}
112+
113+
}

0 commit comments

Comments
 (0)