Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions docs/routing-rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ return a result with the following criteria:

* Response status code of OK (200)
* Message in JSON format
* Only one group can be returned
* Only one group or one cluster can be returned
* If `errors` is not null, the query is routed to the configured default group

#### Request headers modification
Expand All @@ -126,7 +126,16 @@ or setting client tags before the request reaches the Trino cluster.
}
}
```

```json
{
"routingCluster": "test-cluster",
"errors": [
"Error1",
"Error2",
"Error3"
]
}
```
### Configure routing rules with a file

Rules consist of a name, description, condition, and list
Expand Down Expand Up @@ -158,10 +167,10 @@ In addition to the default objects, rules may optionally utilize
[trinoRequestUser](#trinorequestuser) and
[trinoQueryProperties](#trinoqueryproperties)
, which provide information about the user and query respectively.
You must include an action of the form `result.put(\"routingGroup\", \"foo\")`
to trigger routing of a request that satisfies the condition to the specific
routing group. Without this action, the configured default group is used and the
whole routing rule is redundant.
You must include an action of the form `result.put(\"routingGroup\", \"foo\")` or
`result.put(\"routingCluster\", \"bar\")` to trigger routing of a request that satisfies
the condition to the specific routing group. Without this action, the configured default
group is used and the whole routing rule is redundant.

The condition and actions are written in [MVEL](http://mvel.documentnode.com/),
an expression language with Java-like syntax. Classes from `java.util`, data-type
Expand Down Expand Up @@ -373,15 +382,28 @@ priority: 1
condition: 'request.getHeader("X-Trino-Source") == "airflow" && request.getHeader("X-Trino-Client-Tags") contains "label=special"'
actions:
- 'result.put("routingGroup", "etl-special")'
---
name: "airflow cluster"
description: "query can also be pinned to a specific cluster"
priority: 10
condition: 'request.getHeader("X-Trino-Source") == "airflow" && request.getHeader("X-Trino-Client-Tags") contains "label=airflow-cluster"'
actions:
- 'result.put("routingCluster", "airflow-cluster")'
```

Note that both rules still fire. The difference is that you are guaranteed
that the first rule (priority 0) is fired before the second rule (priority 1).
Thus `routingGroup` is set to `etl` and then to `etl-special`, so the
`routingGroup` is always `etl-special` in the end.

When mixing cluster and group actions, the same rule priority semantics apply.
If a higher-priority rule (evaluated later) sets `routingCluster`, it overwrites
any previously set group, and vice versa. In practice, the
last assignment wins and `RoutingTargetHandler` inspects the resulting
`RoutingDecision`, using the cluster when both values are present.

More specific rules must be set to a higher priority so they are evaluated last
to set a `routingGroup`.
to set a `routingGroup` or a `routingCluster`.

##### Passing State

Expand Down
2 changes: 1 addition & 1 deletion gateway-ha/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ serverConfig:

routingRules:
rulesEngineEnabled: False
# rulesConfigPath: "src/main/resources/rules/routing_rules.yml"
# rulesConfigPath: "gateway-ha/src/main/resources/rules/routing_rules.yml"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why have this change?
also why -ha?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I'm testing with TrinoGatewayRunner, it sets the working directory to the repository root. For instance, to read the config.yaml file, we need to put gateway-ha directory in the path

HaGatewayLauncher.main(new String[] {"gateway-ha/config.yaml"});

Wondering what's the reason we'd like to set it to rulesConfigPath: "src/main/resources/rules/routing_rules.yml"?


dataStore:
jdbcUrl: jdbc:postgresql://localhost:5432/trino_gateway_db
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import io.trino.gateway.ha.handler.schema.RoutingDestination;
import io.trino.gateway.ha.handler.schema.RoutingTargetResponse;
import io.trino.gateway.ha.router.GatewayCookie;
import io.trino.gateway.ha.router.RoutingGroupSelector;
import io.trino.gateway.ha.router.RoutingManager;
import io.trino.gateway.ha.router.RoutingSelector;
import io.trino.gateway.ha.router.schema.RoutingSelectorResponse;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletRequestWrapper;
Expand All @@ -45,7 +45,7 @@ public class RoutingTargetHandler
{
private static final Logger log = Logger.get(RoutingTargetHandler.class);
private final RoutingManager routingManager;
private final RoutingGroupSelector routingGroupSelector;
private final RoutingSelector routingSelector;
private final String defaultRoutingGroup;
private final List<String> statementPaths;
private final boolean requestAnalyserClientsUseV2Format;
Expand All @@ -55,11 +55,11 @@ public class RoutingTargetHandler
@Inject
public RoutingTargetHandler(
RoutingManager routingManager,
RoutingGroupSelector routingGroupSelector,
RoutingSelector routingSelector,
HaGatewayConfiguration haGatewayConfiguration)
{
this.routingManager = requireNonNull(routingManager);
this.routingGroupSelector = requireNonNull(routingGroupSelector);
this.routingSelector = requireNonNull(routingSelector);
this.defaultRoutingGroup = haGatewayConfiguration.getRouting().getDefaultRoutingGroup();
statementPaths = requireNonNull(haGatewayConfiguration.getStatementPaths());
requestAnalyserClientsUseV2Format = haGatewayConfiguration.getRequestAnalyzerConfig().isClientsUseV2Format();
Expand All @@ -73,12 +73,12 @@ public RoutingTargetResponse resolveRouting(HttpServletRequest request)
Optional<String> previousCluster = getPreviousCluster(queryId, request);

RoutingTargetResponse routingTargetResponse = previousCluster.map(cluster -> {
String routingGroup = queryId.map(routingManager::findRoutingGroupForQueryId)
String routingDecision = queryId.map(routingManager::findRoutingDecisionForQueryId)
.orElse(defaultRoutingGroup);
String externalUrl = queryId.map(routingManager::findExternalUrlForQueryId)
.orElse(cluster);
return new RoutingTargetResponse(
new RoutingDestination(routingGroup, cluster, buildUriWithNewCluster(cluster, request), externalUrl),
new RoutingDestination(routingDecision, cluster, buildUriWithNewCluster(cluster, request), externalUrl),
request);
}).orElse(getRoutingTargetResponse(request));

Expand All @@ -88,23 +88,26 @@ public RoutingTargetResponse resolveRouting(HttpServletRequest request)

private RoutingTargetResponse getRoutingTargetResponse(HttpServletRequest request)
{
RoutingSelectorResponse routingDestination = routingGroupSelector.findRoutingDestination(request);
RoutingSelectorResponse routingDestination = routingSelector.findRoutingDestination(request);
String user = request.getHeader(USER_HEADER);

// This falls back on default routing group backend if there is no cluster found for the routing group.
String routingCluster = routingDestination.routingCluster();
String routingGroup = !isNullOrEmpty(routingDestination.routingGroup())
? routingDestination.routingGroup()
: defaultRoutingGroup;
ProxyBackendConfiguration backendConfiguration = routingManager.provideBackendConfiguration(routingGroup, user);
ProxyBackendConfiguration backendConfiguration = routingManager.provideBackendConfiguration(routingGroup, routingCluster, user);
String clusterHost = backendConfiguration.getProxyTo();
String externalUrl = backendConfiguration.getExternalUrl();
// Apply headers from RoutingDestination if there are any
HttpServletRequest modifiedRequest = request;
if (!routingDestination.externalHeaders().isEmpty()) {
modifiedRequest = new HeaderModifyingRequestWrapper(request, routingDestination.externalHeaders());
}
// routingCluster and routingGroup are mutually exclusive. If neither is set, fall back to the default routing group.
String routingDecision = !isNullOrEmpty(routingCluster) ? routingCluster : routingGroup;
return new RoutingTargetResponse(
new RoutingDestination(routingGroup, clusterHost, buildUriWithNewCluster(clusterHost, request), externalUrl),
new RoutingDestination(routingDecision, clusterHost, buildUriWithNewCluster(clusterHost, request), externalUrl),
modifiedRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

import java.net.URI;

public record RoutingDestination(String routingGroup, String clusterHost, URI clusterUri, String externalUrl) {}
public record RoutingDestination(String routingDecision, String clusterHost, URI clusterUri, String externalUrl) {}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
import io.trino.gateway.ha.router.PathFilter;
import io.trino.gateway.ha.router.QueryHistoryManager;
import io.trino.gateway.ha.router.ResourceGroupsManager;
import io.trino.gateway.ha.router.RoutingGroupSelector;
import io.trino.gateway.ha.router.RoutingManager;
import io.trino.gateway.ha.router.RoutingSelector;
import io.trino.gateway.ha.security.ApiAuthenticator;
import io.trino.gateway.ha.security.AuthorizationManager;
import io.trino.gateway.ha.security.BasicAuthFilter;
Expand Down Expand Up @@ -212,27 +212,27 @@ public AuthorizationManager getAuthorizationManager()

@Provides
@Singleton
public RoutingGroupSelector getRoutingGroupSelector(@ForRouter HttpClient httpClient)
public RoutingSelector getRoutingSelector(@ForRouter HttpClient httpClient)
{
RoutingRulesConfiguration routingRulesConfig = configuration.getRoutingRules();
if (routingRulesConfig.isRulesEngineEnabled()) {
try {
return switch (routingRulesConfig.getRulesType()) {
case FILE -> RoutingGroupSelector.byRoutingRulesEngine(
case FILE -> RoutingSelector.byRoutingRulesEngine(
routingRulesConfig.getRulesConfigPath(),
routingRulesConfig.getRulesRefreshPeriod(),
configuration.getRequestAnalyzerConfig());
case EXTERNAL -> {
RulesExternalConfiguration rulesExternalConfiguration = routingRulesConfig.getRulesExternalConfiguration();
yield RoutingGroupSelector.byRoutingExternal(httpClient, rulesExternalConfiguration, configuration.getRequestAnalyzerConfig());
yield RoutingSelector.byRoutingExternal(httpClient, rulesExternalConfiguration, configuration.getRequestAnalyzerConfig());
}
};
}
catch (Exception e) {
return RoutingGroupSelector.byRoutingGroupHeader();
return RoutingSelector.byRoutingGroupHeader();
}
}
return RoutingGroupSelector.byRoutingGroupHeader();
return RoutingSelector.byRoutingGroupHeader();
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public record QueryHistory(
@ColumnName("user_name") @Nullable String userName,
@ColumnName("source") @Nullable String source,
@ColumnName("created") long created,
@ColumnName("routing_group") String routingGroup,
@ColumnName("routing_decision") String routingDecision,
@ColumnName("external_url") String externalUrl)
{
public QueryHistory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ default List<QueryHistory> findRecentQueriesByUserName(String userName, boolean
String findBackendUrlByQueryId(String queryId);

@SqlQuery("""
SELECT routing_group FROM query_history
SELECT routing_decision FROM query_history
WHERE query_id = :queryId
""")
String findRoutingGroupByQueryId(String queryId);
String findRoutingDecisionByQueryId(String queryId);

@SqlQuery("""
SELECT external_url FROM query_history
Expand Down Expand Up @@ -116,10 +116,10 @@ GROUP BY FLOOR(created / 1000 / 60), backend_url
List<Map<String, Object>> findDistribution(long created);

@SqlUpdate("""
INSERT INTO query_history (query_id, query_text, backend_url, user_name, source, created, routing_group, external_url)
VALUES (:queryId, :queryText, :backendUrl, :userName, :source, :created, :routingGroup, :externalUrl)
INSERT INTO query_history (query_id, query_text, backend_url, user_name, source, created, routing_decision, external_url)
VALUES (:queryId, :queryText, :backendUrl, :userName, :source, :created, :routingDecision, :externalUrl)
""")
void insertHistory(String queryId, String queryText, String backendUrl, String userName, String source, long created, String routingGroup, String externalUrl);
void insertHistory(String queryId, String queryText, String backendUrl, String userName, String source, long created, String routingDecision, String externalUrl);

@SqlUpdate("""
DELETE FROM query_history
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Strings.isNullOrEmpty;

/**
* This class performs health check, stats counts for each backend and provides a backend given
* request object. Default implementation comes here.
Expand All @@ -56,7 +58,7 @@ public abstract class BaseRoutingManager
private final String defaultRoutingGroup;
private final QueryHistoryManager queryHistoryManager;
private final LoadingCache<String, String> queryIdBackendCache;
private final LoadingCache<String, String> queryIdRoutingGroupCache;
private final LoadingCache<String, String> queryIdRoutingDecisionCache;
private final LoadingCache<String, String> queryIdExternalUrlCache;

public BaseRoutingManager(GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager, RoutingConfiguration routingConfiguration)
Expand All @@ -65,7 +67,7 @@ public BaseRoutingManager(GatewayBackendManager gatewayBackendManager, QueryHist
this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup();
this.queryHistoryManager = queryHistoryManager;
this.queryIdBackendCache = buildCache(this::findBackendForUnknownQueryId);
this.queryIdRoutingGroupCache = buildCache(this::findRoutingGroupForUnknownQueryId);
this.queryIdRoutingDecisionCache = buildCache(this::findRoutingDecisionForUnknownQueryId);
this.queryIdExternalUrlCache = buildCache(this::findExternalUrlForUnknownQueryId);
this.backendToStatus = new ConcurrentHashMap<>();
}
Expand All @@ -82,9 +84,9 @@ public void setBackendForQueryId(String queryId, String backend)
}

@Override
public void setRoutingGroupForQueryId(String queryId, String routingGroup)
public void setRoutingDecisionForQueryId(String queryId, String routingDecision)
{
queryIdRoutingGroupCache.put(queryId, routingGroup);
queryIdRoutingDecisionCache.put(queryId, routingDecision);
}

/**
Expand All @@ -99,12 +101,24 @@ public ProxyBackendConfiguration provideDefaultBackendConfiguration(String user)
}

/**
* Performs routing to a given cluster group. This falls back to a default backend, if no scheduled
* backend is found.
* Selects a backend configuration for the request.
* At most one of `routingCluster` or `routingGroup` may be provided; they are mutually exclusive
* - If `routingCluster` is provided, returns that backend when it is active and healthy; otherwise
* falls back to the default backend.
* - If `routingCluster` is not provided, considers all active backends in `routingGroup`, filters to
* healthy ones, and delegates to `selectBackend(...)` to choose; if none are eligible, falls back
* to the default backend.
* - If neither `routingCluster` nor `routingGroup` is provided, falls back to the default backend.
*/
@Override
public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user)
public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String routingCluster, String user)
{
if (!isNullOrEmpty(routingCluster)) {
return gatewayBackendManager.getBackendByName(routingCluster)
.filter(ProxyBackendConfiguration::isActive)
.filter(backEnd -> isBackendHealthy(backEnd.getName()))
.orElseGet(() -> provideDefaultBackendConfiguration(user));
}
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getActiveBackends(routingGroup).stream()
.filter(backEnd -> isBackendHealthy(backEnd.getName()))
.toList();
Expand Down Expand Up @@ -144,21 +158,21 @@ public String findExternalUrlForQueryId(String queryId)
}

/**
* Looks up the routing group associated with the queryId in the cache.
* Looks up the routing decision associated with the queryId in the cache.
* If it's not in the cache, look up in query history
*/
@Nullable
@Override
public String findRoutingGroupForQueryId(String queryId)
public String findRoutingDecisionForQueryId(String queryId)
{
String routingGroup = null;
String routingDecision = null;
try {
routingGroup = queryIdRoutingGroupCache.get(queryId);
routingDecision = queryIdRoutingDecisionCache.get(queryId);
}
catch (ExecutionException e) {
log.warn("Exception while loading queryId from routing group cache %s", e.getLocalizedMessage());
log.warn("Exception while loading queryId from routing decision cache %s", e.getLocalizedMessage());
}
return routingGroup;
return routingDecision;
}

@Override
Expand Down Expand Up @@ -241,13 +255,13 @@ private String searchAllBackendForQuery(String queryId)
}

/**
* Attempts to look up the routing group associated with the query id from query history table
* Attempts to look up the routing decision associated with the query id from query history table
*/
private String findRoutingGroupForUnknownQueryId(String queryId)
private String findRoutingDecisionForUnknownQueryId(String queryId)
{
String routingGroup = queryHistoryManager.getRoutingGroupForQueryId(queryId);
setRoutingGroupForQueryId(queryId, routingGroup);
return routingGroup;
String routingDecision = queryHistoryManager.getRoutingDecisionForQueryId(queryId);
setRoutingDecisionForQueryId(queryId, routingDecision);
return routingDecision;
}

/**
Expand Down
Loading