Skip to content

Commit e28d4c2

Browse files
committed
wip
Apply local changes from detached HEAD wip wip erroeous ver wip clean build add comments add tests clean up front end code wip update routing setting clean up frontend code
1 parent 5a36940 commit e28d4c2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+395
-244
lines changed

docs/routing-rules.md

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ return a result with the following criteria:
101101

102102
* Response status code of OK (200)
103103
* Message in JSON format
104-
* Only one group can be returned
104+
* Only one group or one cluster can be returned
105105
* If `errors` is not null, the query is routed to the configured default group
106106

107107
#### Request headers modification
@@ -126,7 +126,16 @@ or setting client tags before the request reaches the Trino cluster.
126126
}
127127
}
128128
```
129-
129+
```json
130+
{
131+
"routingCluster": "test-cluster",
132+
"errors": [
133+
"Error1",
134+
"Error2",
135+
"Error3"
136+
]
137+
}
138+
```
130139
### Configure routing rules with a file
131140

132141
Rules consist of a name, description, condition, and list
@@ -158,10 +167,10 @@ In addition to the default objects, rules may optionally utilize
158167
[trinoRequestUser](#trinorequestuser) and
159168
[trinoQueryProperties](#trinoqueryproperties)
160169
, which provide information about the user and query respectively.
161-
You must include an action of the form `result.put(\"routingGroup\", \"foo\")`
162-
to trigger routing of a request that satisfies the condition to the specific
163-
routing group. Without this action, the configured default group is used and the
164-
whole routing rule is redundant.
170+
You must include an action of the form `result.put(\"routingGroup\", \"foo\")` or
171+
`result.put(\"routingCluster\", \"bar\")` to trigger routing of a request that satisfies
172+
the condition to the specific routing group. Without this action, the configured default
173+
group is used and the whole routing rule is redundant.
165174

166175
The condition and actions are written in [MVEL](http://mvel.documentnode.com/),
167176
an expression language with Java-like syntax. Classes from `java.util`, data-type
@@ -373,15 +382,28 @@ priority: 1
373382
condition: 'request.getHeader("X-Trino-Source") == "airflow" && request.getHeader("X-Trino-Client-Tags") contains "label=special"'
374383
actions:
375384
- 'result.put("routingGroup", "etl-special")'
385+
---
386+
name: "airflow cluster"
387+
description: "query can also be pinned to a specific cluster"
388+
priority: 10
389+
condition: 'request.getHeader("X-Trino-Source") == "airflow" && request.getHeader("X-Trino-Client-Tags") contains "label=airflow-cluster"'
390+
actions:
391+
- 'result.put("routingCluster", "airflow-cluster")'
376392
```
377393

378394
Note that both rules still fire. The difference is that you are guaranteed
379395
that the first rule (priority 0) is fired before the second rule (priority 1).
380396
Thus `routingGroup` is set to `etl` and then to `etl-special`, so the
381397
`routingGroup` is always `etl-special` in the end.
382398

399+
When mixing cluster and group actions, the same rule priority semantics apply.
400+
If a higher-priority rule (evaluated later) sets `routingCluster`, it overwrites
401+
any previously set group, and vice versa. In practice, the
402+
last assignment wins and `RoutingTargetHandler` inspects the resulting
403+
`RoutingDecision`, using the cluster when both values are present.
404+
383405
More specific rules must be set to a higher priority so they are evaluated last
384-
to set a `routingGroup`.
406+
to set a `routingGroup` or a `routingCluster`.
385407

386408
##### Passing State
387409

gateway-ha/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ serverConfig:
44

55
routingRules:
66
rulesEngineEnabled: False
7-
# rulesConfigPath: "src/main/resources/rules/routing_rules.yml"
7+
# rulesConfigPath: "gateway-ha/src/main/resources/rules/routing_rules.yml"
88

99
dataStore:
1010
jdbcUrl: jdbc:postgresql://localhost:5432/trino_gateway_db

gateway-ha/src/main/java/io/trino/gateway/ha/handler/RoutingTargetHandler.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import io.trino.gateway.ha.handler.schema.RoutingDestination;
2222
import io.trino.gateway.ha.handler.schema.RoutingTargetResponse;
2323
import io.trino.gateway.ha.router.GatewayCookie;
24-
import io.trino.gateway.ha.router.RoutingGroupSelector;
2524
import io.trino.gateway.ha.router.RoutingManager;
25+
import io.trino.gateway.ha.router.RoutingSelector;
2626
import io.trino.gateway.ha.router.schema.RoutingSelectorResponse;
2727
import jakarta.servlet.http.HttpServletRequest;
2828
import jakarta.servlet.http.HttpServletRequestWrapper;
@@ -45,7 +45,7 @@ public class RoutingTargetHandler
4545
{
4646
private static final Logger log = Logger.get(RoutingTargetHandler.class);
4747
private final RoutingManager routingManager;
48-
private final RoutingGroupSelector routingGroupSelector;
48+
private final RoutingSelector routingSelector;
4949
private final String defaultRoutingGroup;
5050
private final List<String> statementPaths;
5151
private final boolean requestAnalyserClientsUseV2Format;
@@ -55,11 +55,11 @@ public class RoutingTargetHandler
5555
@Inject
5656
public RoutingTargetHandler(
5757
RoutingManager routingManager,
58-
RoutingGroupSelector routingGroupSelector,
58+
RoutingSelector routingSelector,
5959
HaGatewayConfiguration haGatewayConfiguration)
6060
{
6161
this.routingManager = requireNonNull(routingManager);
62-
this.routingGroupSelector = requireNonNull(routingGroupSelector);
62+
this.routingSelector = requireNonNull(routingSelector);
6363
this.defaultRoutingGroup = haGatewayConfiguration.getRouting().getDefaultRoutingGroup();
6464
statementPaths = requireNonNull(haGatewayConfiguration.getStatementPaths());
6565
requestAnalyserClientsUseV2Format = haGatewayConfiguration.getRequestAnalyzerConfig().isClientsUseV2Format();
@@ -73,12 +73,12 @@ public RoutingTargetResponse resolveRouting(HttpServletRequest request)
7373
Optional<String> previousCluster = getPreviousCluster(queryId, request);
7474

7575
RoutingTargetResponse routingTargetResponse = previousCluster.map(cluster -> {
76-
String routingGroup = queryId.map(routingManager::findRoutingGroupForQueryId)
76+
String routingDecision = queryId.map(routingManager::findRoutingDecisionForQueryId)
7777
.orElse(defaultRoutingGroup);
7878
String externalUrl = queryId.map(routingManager::findExternalUrlForQueryId)
7979
.orElse(cluster);
8080
return new RoutingTargetResponse(
81-
new RoutingDestination(routingGroup, cluster, buildUriWithNewCluster(cluster, request), externalUrl),
81+
new RoutingDestination(routingDecision, cluster, buildUriWithNewCluster(cluster, request), externalUrl),
8282
request);
8383
}).orElse(getRoutingTargetResponse(request));
8484

@@ -88,23 +88,26 @@ public RoutingTargetResponse resolveRouting(HttpServletRequest request)
8888

8989
private RoutingTargetResponse getRoutingTargetResponse(HttpServletRequest request)
9090
{
91-
RoutingSelectorResponse routingDestination = routingGroupSelector.findRoutingDestination(request);
91+
RoutingSelectorResponse routingDestination = routingSelector.findRoutingDestination(request);
9292
String user = request.getHeader(USER_HEADER);
9393

9494
// This falls back on default routing group backend if there is no cluster found for the routing group.
95+
String routingCluster = routingDestination.routingCluster();
9596
String routingGroup = !isNullOrEmpty(routingDestination.routingGroup())
9697
? routingDestination.routingGroup()
9798
: defaultRoutingGroup;
98-
ProxyBackendConfiguration backendConfiguration = routingManager.provideBackendConfiguration(routingGroup, user);
99+
ProxyBackendConfiguration backendConfiguration = routingManager.provideBackendConfiguration(routingGroup, routingCluster, user);
99100
String clusterHost = backendConfiguration.getProxyTo();
100101
String externalUrl = backendConfiguration.getExternalUrl();
101102
// Apply headers from RoutingDestination if there are any
102103
HttpServletRequest modifiedRequest = request;
103104
if (!routingDestination.externalHeaders().isEmpty()) {
104105
modifiedRequest = new HeaderModifyingRequestWrapper(request, routingDestination.externalHeaders());
105106
}
107+
// routingCluster and routingGroup are mutually exclusive. If neither is set, fall back to the default routing group.
108+
String routingDecision = !isNullOrEmpty(routingCluster) ? routingCluster : routingGroup;
106109
return new RoutingTargetResponse(
107-
new RoutingDestination(routingGroup, clusterHost, buildUriWithNewCluster(clusterHost, request), externalUrl),
110+
new RoutingDestination(routingDecision, clusterHost, buildUriWithNewCluster(clusterHost, request), externalUrl),
108111
modifiedRequest);
109112
}
110113

gateway-ha/src/main/java/io/trino/gateway/ha/handler/schema/RoutingDestination.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515

1616
import java.net.URI;
1717

18-
public record RoutingDestination(String routingGroup, String clusterHost, URI clusterUri, String externalUrl) {}
18+
public record RoutingDestination(String routingDecision, String clusterHost, URI clusterUri, String externalUrl) {}

gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@
5050
import io.trino.gateway.ha.router.PathFilter;
5151
import io.trino.gateway.ha.router.QueryHistoryManager;
5252
import io.trino.gateway.ha.router.ResourceGroupsManager;
53-
import io.trino.gateway.ha.router.RoutingGroupSelector;
5453
import io.trino.gateway.ha.router.RoutingManager;
54+
import io.trino.gateway.ha.router.RoutingSelector;
5555
import io.trino.gateway.ha.security.ApiAuthenticator;
5656
import io.trino.gateway.ha.security.AuthorizationManager;
5757
import io.trino.gateway.ha.security.BasicAuthFilter;
@@ -212,27 +212,27 @@ public AuthorizationManager getAuthorizationManager()
212212

213213
@Provides
214214
@Singleton
215-
public RoutingGroupSelector getRoutingGroupSelector(@ForRouter HttpClient httpClient)
215+
public RoutingSelector getRoutingSelector(@ForRouter HttpClient httpClient)
216216
{
217217
RoutingRulesConfiguration routingRulesConfig = configuration.getRoutingRules();
218218
if (routingRulesConfig.isRulesEngineEnabled()) {
219219
try {
220220
return switch (routingRulesConfig.getRulesType()) {
221-
case FILE -> RoutingGroupSelector.byRoutingRulesEngine(
221+
case FILE -> RoutingSelector.byRoutingRulesEngine(
222222
routingRulesConfig.getRulesConfigPath(),
223223
routingRulesConfig.getRulesRefreshPeriod(),
224224
configuration.getRequestAnalyzerConfig());
225225
case EXTERNAL -> {
226226
RulesExternalConfiguration rulesExternalConfiguration = routingRulesConfig.getRulesExternalConfiguration();
227-
yield RoutingGroupSelector.byRoutingExternal(httpClient, rulesExternalConfiguration, configuration.getRequestAnalyzerConfig());
227+
yield RoutingSelector.byRoutingExternal(httpClient, rulesExternalConfiguration, configuration.getRequestAnalyzerConfig());
228228
}
229229
};
230230
}
231231
catch (Exception e) {
232-
return RoutingGroupSelector.byRoutingGroupHeader();
232+
return RoutingSelector.byRoutingGroupHeader();
233233
}
234234
}
235-
return RoutingGroupSelector.byRoutingGroupHeader();
235+
return RoutingSelector.byRoutingGroupHeader();
236236
}
237237

238238
@Provides

gateway-ha/src/main/java/io/trino/gateway/ha/persistence/dao/QueryHistory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public record QueryHistory(
2525
@ColumnName("user_name") @Nullable String userName,
2626
@ColumnName("source") @Nullable String source,
2727
@ColumnName("created") long created,
28-
@ColumnName("routing_group") String routingGroup,
28+
@ColumnName("routing_decision") String routingDecision,
2929
@ColumnName("external_url") String externalUrl)
3030
{
3131
public QueryHistory

gateway-ha/src/main/java/io/trino/gateway/ha/persistence/dao/QueryHistoryDao.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,10 @@ default List<QueryHistory> findRecentQueriesByUserName(String userName, boolean
7878
String findBackendUrlByQueryId(String queryId);
7979

8080
@SqlQuery("""
81-
SELECT routing_group FROM query_history
81+
SELECT routing_decision FROM query_history
8282
WHERE query_id = :queryId
8383
""")
84-
String findRoutingGroupByQueryId(String queryId);
84+
String findRoutingDecisionByQueryId(String queryId);
8585

8686
@SqlQuery("""
8787
SELECT external_url FROM query_history
@@ -116,10 +116,10 @@ GROUP BY FLOOR(created / 1000 / 60), backend_url
116116
List<Map<String, Object>> findDistribution(long created);
117117

118118
@SqlUpdate("""
119-
INSERT INTO query_history (query_id, query_text, backend_url, user_name, source, created, routing_group, external_url)
120-
VALUES (:queryId, :queryText, :backendUrl, :userName, :source, :created, :routingGroup, :externalUrl)
119+
INSERT INTO query_history (query_id, query_text, backend_url, user_name, source, created, routing_decision, external_url)
120+
VALUES (:queryId, :queryText, :backendUrl, :userName, :source, :created, :routingDecision, :externalUrl)
121121
""")
122-
void insertHistory(String queryId, String queryText, String backendUrl, String userName, String source, long created, String routingGroup, String externalUrl);
122+
void insertHistory(String queryId, String queryText, String backendUrl, String userName, String source, long created, String routingDecision, String externalUrl);
123123

124124
@SqlUpdate("""
125125
DELETE FROM query_history

gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import java.util.concurrent.Future;
4343
import java.util.concurrent.TimeUnit;
4444

45+
import static com.google.common.base.Strings.isNullOrEmpty;
46+
4547
/**
4648
* This class performs health check, stats counts for each backend and provides a backend given
4749
* request object. Default implementation comes here.
@@ -56,7 +58,7 @@ public abstract class BaseRoutingManager
5658
private final String defaultRoutingGroup;
5759
private final QueryHistoryManager queryHistoryManager;
5860
private final LoadingCache<String, String> queryIdBackendCache;
59-
private final LoadingCache<String, String> queryIdRoutingGroupCache;
61+
private final LoadingCache<String, String> queryIdRoutingDecisionCache;
6062
private final LoadingCache<String, String> queryIdExternalUrlCache;
6163

6264
public BaseRoutingManager(GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager, RoutingConfiguration routingConfiguration)
@@ -65,7 +67,7 @@ public BaseRoutingManager(GatewayBackendManager gatewayBackendManager, QueryHist
6567
this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup();
6668
this.queryHistoryManager = queryHistoryManager;
6769
this.queryIdBackendCache = buildCache(this::findBackendForUnknownQueryId);
68-
this.queryIdRoutingGroupCache = buildCache(this::findRoutingGroupForUnknownQueryId);
70+
this.queryIdRoutingDecisionCache = buildCache(this::findRoutingDecisionForUnknownQueryId);
6971
this.queryIdExternalUrlCache = buildCache(this::findExternalUrlForUnknownQueryId);
7072
this.backendToStatus = new ConcurrentHashMap<>();
7173
}
@@ -82,9 +84,9 @@ public void setBackendForQueryId(String queryId, String backend)
8284
}
8385

8486
@Override
85-
public void setRoutingGroupForQueryId(String queryId, String routingGroup)
87+
public void setRoutingDecisionForQueryId(String queryId, String routingDecision)
8688
{
87-
queryIdRoutingGroupCache.put(queryId, routingGroup);
89+
queryIdRoutingDecisionCache.put(queryId, routingDecision);
8890
}
8991

9092
/**
@@ -99,12 +101,24 @@ public ProxyBackendConfiguration provideDefaultBackendConfiguration(String user)
99101
}
100102

101103
/**
102-
* Performs routing to a given cluster group. This falls back to a default backend, if no scheduled
103-
* backend is found.
104+
* Selects a backend configuration for the request.
105+
* At most one of `routingCluster` or `routingGroup` may be provided; they are mutually exclusive
106+
* - If `routingCluster` is provided, returns that backend when it is active and healthy; otherwise
107+
* falls back to the default backend.
108+
* - If `routingCluster` is not provided, considers all active backends in `routingGroup`, filters to
109+
* healthy ones, and delegates to `selectBackend(...)` to choose; if none are eligible, falls back
110+
* to the default backend.
111+
* - If neither `routingCluster` nor `routingGroup` is provided, falls back to the default backend.
104112
*/
105113
@Override
106-
public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user)
114+
public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String routingCluster, String user)
107115
{
116+
if (!isNullOrEmpty(routingCluster)) {
117+
return gatewayBackendManager.getBackendByName(routingCluster)
118+
.filter(ProxyBackendConfiguration::isActive)
119+
.filter(backEnd -> isBackendHealthy(backEnd.getName()))
120+
.orElseGet(() -> provideDefaultBackendConfiguration(user));
121+
}
108122
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getActiveBackends(routingGroup).stream()
109123
.filter(backEnd -> isBackendHealthy(backEnd.getName()))
110124
.toList();
@@ -144,21 +158,21 @@ public String findExternalUrlForQueryId(String queryId)
144158
}
145159

146160
/**
147-
* Looks up the routing group associated with the queryId in the cache.
161+
* Looks up the routing decision associated with the queryId in the cache.
148162
* If it's not in the cache, look up in query history
149163
*/
150164
@Nullable
151165
@Override
152-
public String findRoutingGroupForQueryId(String queryId)
166+
public String findRoutingDecisionForQueryId(String queryId)
153167
{
154-
String routingGroup = null;
168+
String routingDecision = null;
155169
try {
156-
routingGroup = queryIdRoutingGroupCache.get(queryId);
170+
routingDecision = queryIdRoutingDecisionCache.get(queryId);
157171
}
158172
catch (ExecutionException e) {
159-
log.warn("Exception while loading queryId from routing group cache %s", e.getLocalizedMessage());
173+
log.warn("Exception while loading queryId from routing decision cache %s", e.getLocalizedMessage());
160174
}
161-
return routingGroup;
175+
return routingDecision;
162176
}
163177

164178
@Override
@@ -241,13 +255,13 @@ private String searchAllBackendForQuery(String queryId)
241255
}
242256

243257
/**
244-
* Attempts to look up the routing group associated with the query id from query history table
258+
* Attempts to look up the routing decision associated with the query id from query history table
245259
*/
246-
private String findRoutingGroupForUnknownQueryId(String queryId)
260+
private String findRoutingDecisionForUnknownQueryId(String queryId)
247261
{
248-
String routingGroup = queryHistoryManager.getRoutingGroupForQueryId(queryId);
249-
setRoutingGroupForQueryId(queryId, routingGroup);
250-
return routingGroup;
262+
String routingDecision = queryHistoryManager.getRoutingDecisionForQueryId(queryId);
263+
setRoutingDecisionForQueryId(queryId, routingDecision);
264+
return routingDecision;
251265
}
252266

253267
/**

0 commit comments

Comments
 (0)