Skip to content

Commit f72266e

Browse files
committed
Refactor RoutingManager
1 parent 581daa2 commit f72266e

File tree

8 files changed

+437
-352
lines changed

8 files changed

+437
-352
lines changed

gateway-ha/pom.xml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,6 @@
6262
<artifactId>jackson-dataformat-yaml</artifactId>
6363
</dependency>
6464

65-
<dependency>
66-
<groupId>com.google.errorprone</groupId>
67-
<artifactId>error_prone_annotations</artifactId>
68-
</dependency>
69-
7065
<dependency>
7166
<groupId>com.google.guava</groupId>
7267
<artifactId>guava</artifactId>

gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/HealthCheckObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,6 @@ public HealthCheckObserver(RoutingManager routingManager)
2828
@Override
2929
public void observe(java.util.List<ClusterStats> clustersStats)
3030
{
31-
routingManager.updateBackEndStats(clustersStats);
31+
routingManager.updateClusterStats(clustersStats);
3232
}
3333
}
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.gateway.ha.router;
15+
16+
import com.google.common.annotations.VisibleForTesting;
17+
import com.google.common.base.Function;
18+
import com.google.common.base.Strings;
19+
import com.google.common.cache.CacheBuilder;
20+
import com.google.common.cache.CacheLoader;
21+
import com.google.common.cache.LoadingCache;
22+
import io.airlift.log.Logger;
23+
import io.trino.gateway.ha.clustermonitor.ClusterStats;
24+
import io.trino.gateway.ha.clustermonitor.TrinoStatus;
25+
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
26+
import io.trino.gateway.ha.config.RoutingConfiguration;
27+
import jakarta.annotation.Nullable;
28+
import jakarta.annotation.PreDestroy;
29+
import jakarta.ws.rs.HttpMethod;
30+
31+
import java.net.HttpURLConnection;
32+
import java.net.URI;
33+
import java.net.URL;
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Optional;
38+
import java.util.concurrent.ConcurrentHashMap;
39+
import java.util.concurrent.ExecutionException;
40+
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.Executors;
42+
import java.util.concurrent.Future;
43+
import java.util.concurrent.TimeUnit;
44+
45+
/**
46+
* This class performs health check, stats counts for each backend and provides a backend given
47+
* request object. Default implementation comes here.
48+
*/
49+
public abstract class BaseRoutingManager
50+
implements RoutingManager
51+
{
52+
private static final Logger log = Logger.get(BaseRoutingManager.class);
53+
private final ExecutorService executorService = Executors.newFixedThreadPool(5);
54+
private final GatewayBackendManager gatewayBackendManager;
55+
private final ConcurrentHashMap<String, TrinoStatus> backendToStatus;
56+
private final String defaultRoutingGroup;
57+
private final QueryHistoryManager queryHistoryManager;
58+
private final LoadingCache<String, String> queryIdBackendCache;
59+
private final LoadingCache<String, String> queryIdRoutingGroupCache;
60+
private final LoadingCache<String, String> queryIdExternalUrlCache;
61+
62+
public BaseRoutingManager(GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager, RoutingConfiguration routingConfiguration)
63+
{
64+
this.gatewayBackendManager = gatewayBackendManager;
65+
this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup();
66+
this.queryHistoryManager = queryHistoryManager;
67+
this.queryIdBackendCache = buildCache(this::findBackendForUnknownQueryId);
68+
this.queryIdRoutingGroupCache = buildCache(this::findRoutingGroupForUnknownQueryId);
69+
this.queryIdExternalUrlCache = buildCache(this::findExternalUrlForUnknownQueryId);
70+
this.backendToStatus = new ConcurrentHashMap<>();
71+
}
72+
73+
/**
74+
* Provide a strategy to select a backend out of all available backends
75+
*/
76+
protected abstract Optional<ProxyBackendConfiguration> selectBackend(List<ProxyBackendConfiguration> backends, String user);
77+
78+
@Override
79+
public void setBackendForQueryId(String queryId, String backend)
80+
{
81+
queryIdBackendCache.put(queryId, backend);
82+
}
83+
84+
@Override
85+
public void setRoutingGroupForQueryId(String queryId, String routingGroup)
86+
{
87+
queryIdRoutingGroupCache.put(queryId, routingGroup);
88+
}
89+
90+
/**
91+
* Performs routing to a default backend.
92+
*/
93+
public ProxyBackendConfiguration provideDefaultBackendConfiguration(String user)
94+
{
95+
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getActiveDefaultBackends().stream()
96+
.filter(backEnd -> isBackendHealthy(backEnd.getName()))
97+
.toList();
98+
return selectBackend(backends, user).orElseThrow(() -> new IllegalStateException("Number of active backends found zero"));
99+
}
100+
101+
/**
102+
* Performs routing to a given cluster group. This falls back to a default backend, if no scheduled
103+
* backend is found.
104+
*/
105+
@Override
106+
public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user)
107+
{
108+
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getActiveBackends(routingGroup).stream()
109+
.filter(backEnd -> isBackendHealthy(backEnd.getName()))
110+
.toList();
111+
return selectBackend(backends, user).orElseGet(() -> provideDefaultBackendConfiguration(user));
112+
}
113+
114+
/**
115+
* Performs cache look up, if a backend not found, it checks with all backends and tries to find
116+
* out which backend has info about given query id.
117+
*/
118+
@Nullable
119+
@Override
120+
public String findBackendForQueryId(String queryId)
121+
{
122+
String backendAddress = null;
123+
try {
124+
backendAddress = queryIdBackendCache.get(queryId);
125+
}
126+
catch (ExecutionException e) {
127+
log.warn("Exception while loading queryId from cache %s", e.getLocalizedMessage());
128+
}
129+
return backendAddress;
130+
}
131+
132+
@Nullable
133+
@Override
134+
public String findExternalUrlForQueryId(String queryId)
135+
{
136+
String externalUrl = null;
137+
try {
138+
externalUrl = queryIdExternalUrlCache.get(queryId);
139+
}
140+
catch (ExecutionException e) {
141+
log.warn("Exception while loading queryId from cache %s", e.getLocalizedMessage());
142+
}
143+
return externalUrl;
144+
}
145+
146+
/**
147+
* Looks up the routing group associated with the queryId in the cache.
148+
* If it's not in the cache, look up in query history
149+
*/
150+
@Nullable
151+
@Override
152+
public String findRoutingGroupForQueryId(String queryId)
153+
{
154+
String routingGroup = null;
155+
try {
156+
routingGroup = queryIdRoutingGroupCache.get(queryId);
157+
}
158+
catch (ExecutionException e) {
159+
log.warn("Exception while loading queryId from routing group cache %s", e.getLocalizedMessage());
160+
}
161+
return routingGroup;
162+
}
163+
164+
@Override
165+
public void updateBackEndHealth(String backendId, TrinoStatus value)
166+
{
167+
log.info("backend %s isHealthy %s", backendId, value);
168+
backendToStatus.put(backendId, value);
169+
}
170+
171+
@Override
172+
public void updateClusterStats(List<ClusterStats> stats)
173+
{
174+
for (ClusterStats clusterStats : stats) {
175+
updateBackEndHealth(clusterStats.clusterId(), clusterStats.trinoStatus());
176+
}
177+
}
178+
179+
@VisibleForTesting
180+
void setExternalUrlForQueryId(String queryId, String externalUrl)
181+
{
182+
queryIdExternalUrlCache.put(queryId, externalUrl);
183+
}
184+
185+
@VisibleForTesting
186+
String findBackendForUnknownQueryId(String queryId)
187+
{
188+
String backend;
189+
backend = queryHistoryManager.getBackendForQueryId(queryId);
190+
if (Strings.isNullOrEmpty(backend)) {
191+
log.debug("Unable to find backend mapping for [%s]. Searching for suitable backend", queryId);
192+
backend = searchAllBackendForQuery(queryId);
193+
}
194+
return backend;
195+
}
196+
197+
/**
198+
* This tries to find out which backend may have info about given query id. If not found returns
199+
* the first healthy backend.
200+
*/
201+
private String searchAllBackendForQuery(String queryId)
202+
{
203+
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getAllBackends();
204+
205+
Map<String, Future<Integer>> responseCodes = new HashMap<>();
206+
try {
207+
for (ProxyBackendConfiguration backend : backends) {
208+
String target = backend.getProxyTo() + "/v1/query/" + queryId;
209+
210+
Future<Integer> call =
211+
executorService.submit(
212+
() -> {
213+
URL url = URI.create(target).toURL();
214+
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
215+
conn.setConnectTimeout((int) TimeUnit.SECONDS.toMillis(5));
216+
conn.setReadTimeout((int) TimeUnit.SECONDS.toMillis(5));
217+
conn.setRequestMethod(HttpMethod.HEAD);
218+
return conn.getResponseCode();
219+
});
220+
responseCodes.put(backend.getProxyTo(), call);
221+
}
222+
for (Map.Entry<String, Future<Integer>> entry : responseCodes.entrySet()) {
223+
if (entry.getValue().isDone()) {
224+
int responseCode = entry.getValue().get();
225+
if (responseCode == 200) {
226+
log.info("Found query [%s] on backend [%s]", queryId, entry.getKey());
227+
setBackendForQueryId(queryId, entry.getKey());
228+
return entry.getKey();
229+
}
230+
}
231+
}
232+
}
233+
catch (Exception e) {
234+
log.warn("Query id [%s] not found", queryId);
235+
}
236+
// Fallback on first active backend if queryId mapping not found.
237+
return gatewayBackendManager.getActiveBackends(defaultRoutingGroup).stream()
238+
.findFirst()
239+
.map(ProxyBackendConfiguration::getProxyTo)
240+
.orElseThrow(() -> new IllegalStateException("No active backends available for default routing group: " + defaultRoutingGroup));
241+
}
242+
243+
/**
244+
* Attempts to look up the routing group associated with the query id from query history table
245+
*/
246+
private String findRoutingGroupForUnknownQueryId(String queryId)
247+
{
248+
String routingGroup = queryHistoryManager.getRoutingGroupForQueryId(queryId);
249+
setRoutingGroupForQueryId(queryId, routingGroup);
250+
return routingGroup;
251+
}
252+
253+
/**
254+
* Attempts to look up the external url associated with the query id from query history table
255+
*/
256+
private String findExternalUrlForUnknownQueryId(String queryId)
257+
{
258+
String externalUrl = queryHistoryManager.getExternalUrlForQueryId(queryId);
259+
setExternalUrlForQueryId(queryId, externalUrl);
260+
return externalUrl;
261+
}
262+
263+
private LoadingCache<String, String> buildCache(Function<String, String> loader)
264+
{
265+
return CacheBuilder.newBuilder()
266+
.maximumSize(10000)
267+
.expireAfterAccess(30, TimeUnit.MINUTES)
268+
.build(
269+
new CacheLoader<>()
270+
{
271+
@Override
272+
public String load(String queryId)
273+
{
274+
return loader.apply(queryId);
275+
}
276+
});
277+
}
278+
279+
private boolean isBackendHealthy(String backendId)
280+
{
281+
TrinoStatus status = backendToStatus.getOrDefault(backendId, TrinoStatus.UNKNOWN);
282+
if (status == TrinoStatus.UNKNOWN) {
283+
log.warn("Backend health for '%s' is UNKNOWN and not tracked.", backendId);
284+
return false;
285+
}
286+
return status == TrinoStatus.HEALTHY;
287+
}
288+
289+
@PreDestroy
290+
public void shutdown()
291+
{
292+
executorService.shutdownNow();
293+
}
294+
}

0 commit comments

Comments
 (0)