diff --git a/docs/config.yaml b/docs/config.yaml index 61fe44dc6..17f15e8b5 100644 --- a/docs/config.yaml +++ b/docs/config.yaml @@ -11,3 +11,11 @@ dataStore: clusterStatsConfiguration: monitorType: INFO_API + +# Valkey distributed cache (optional - for multi-instance deployments) +valkeyConfiguration: + enabled: false + host: localhost + port: 6379 + # password: ${VALKEY_PASSWORD} # Uncomment if Valkey requires AUTH + # cacheTtlSeconds: 1800 # Cache TTL in seconds (default: 1800 = 30 minutes) diff --git a/docs/installation.md b/docs/installation.md index 276edef84..06f7fa940 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -161,6 +161,32 @@ For additional configurations, use the `log.*` properties from the [Trino logging properties documentation](https://trino.io/docs/current/admin/properties-logging.html) and specify the properties in `serverConfig`. +### Configure distributed cache (optional) + +For multi-instance deployments, Trino Gateway supports distributed caching +using Valkey (or Redis) to share query metadata across gateway instances. +This improves query routing and enables horizontal scaling. + +For single gateway deployments, distributed caching is not needed - the +local cache is sufficient. + +```yaml +valkeyConfiguration: + enabled: true + host: valkey.internal.prod + port: 6379 + password: ${ENV:VALKEY_PASSWORD} + cacheTtlSeconds: 1800 # Cache TTL (default: 1800 = 30 minutes) +``` + +**Optional parameters**: You can customize `cacheTtlSeconds` based on your query duration: +- Short queries (< 5 min): 600 seconds (10 minutes) +- Default queries: 1800 seconds (30 minutes) +- Long-running queries: 3600 seconds (1 hour) + +See [Valkey distributed cache configuration](valkey-configuration.md) for +detailed configuration options, deployment scenarios, and performance tuning. + ### Proxying additional paths By default, Trino Gateway only proxies requests to paths starting with diff --git a/docs/operation.md b/docs/operation.md index 723393b4c..efa32dd14 100644 --- a/docs/operation.md +++ b/docs/operation.md @@ -58,8 +58,8 @@ monitor: ## Monitoring -Trino Gateway provides a metrics endpoint that uses the OpenMetrics format at -`/metrics`. Use it to monitor Trino Gateway instances with Prometheus and +Trino Gateway provides a metrics endpoint that uses the OpenMetrics format at +`/metrics`. Use it to monitor Trino Gateway instances with Prometheus and other compatible systems with the following Prometheus configuration: ```yaml @@ -70,6 +70,20 @@ scrape_configs: - gateway1.example.com:8080 ``` +### Multi-instance deployments + +When running multiple Trino Gateway instances, enable the Valkey distributed +cache to share query metadata across instances. This ensures consistent query +routing regardless of which gateway instance receives the request. + +Monitor the distributed cache performance by checking: +- Cache hit rate (target: 85-95%) +- Cache errors (should be near 0) +- Valkey server connectivity and memory usage + +See [Valkey distributed cache configuration](valkey-configuration.md) for +setup instructions and monitoring details. + ## Trino Gateway health endpoints Trino Gateway provides two API endpoints to indicate the current status of the server: diff --git a/docs/valkey-configuration.md b/docs/valkey-configuration.md new file mode 100644 index 000000000..887793ae2 --- /dev/null +++ b/docs/valkey-configuration.md @@ -0,0 +1,274 @@ +# Valkey Distributed Cache Configuration + +## Overview + +Valkey distributed cache enables horizontal scaling of Trino Gateway by sharing query metadata across multiple gateway instances. When disabled, each gateway maintains its own local cache. + +## Quick Start (Minimal Configuration) + +```yaml +valkeyConfiguration: + enabled: true + host: localhost + port: 6379 + # password: ${VALKEY_PASSWORD} # Optional: if AUTH required +``` + +**That's it!** Sensible defaults are provided for all other settings. + +--- + +## Configuration Reference + +### Basic Settings + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `enabled` | boolean | `false` | Enable/disable distributed caching | +| `host` | string | `localhost` | Valkey server hostname | +| `port` | int | `6379` | Valkey server port | +| `password` | string | `null` | Optional password for AUTH | +| `database` | int | `0` | Database index (0-15) | + +### Advanced Settings (Optional) + +These settings have sensible defaults and should only be changed for specific performance tuning needs. + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `maxTotal` | int | `20` | Maximum total connections in pool | +| `maxIdle` | int | `10` | Maximum idle connections | +| `minIdle` | int | `5` | Minimum idle connections | +| `timeoutMs` | int | `2000` | Connection timeout in milliseconds | +| `cacheTtlSeconds` | long | `1800` | Cache entry TTL (30 minutes) | +| `healthCheckIntervalMs` | long | `30000` | Health check interval (30 seconds) | + +--- + +## Environment Variables + +Use environment variable substitution for sensitive values: + +```yaml +valkeyConfiguration: + enabled: true + host: ${VALKEY_HOST:localhost} + port: ${VALKEY_PORT:6379} + password: ${VALKEY_PASSWORD} +``` + +--- + +## Deployment Scenarios + +### Single Gateway Instance + +```yaml +valkeyConfiguration: + enabled: false # Not needed - local cache is sufficient +``` + +### Multiple Gateway Instances (Recommended) + +```yaml +valkeyConfiguration: + enabled: true + host: valkey.internal.prod + port: 6379 + password: ${VALKEY_PASSWORD} +``` + +### High-Traffic Production (Advanced Tuning) + +```yaml +valkeyConfiguration: + enabled: true + host: valkey.internal.prod + port: 6379 + password: ${VALKEY_PASSWORD} + maxTotal: 100 # More connections for high concurrency + maxIdle: 50 + minIdle: 25 + timeoutMs: 5000 # Longer timeout for slower networks + cacheTtlSeconds: 3600 # 1 hour for long-running queries +``` + +--- + +## Connection Pool Sizing Guidelines + +| Deployment Size | Gateway Instances | Recommended `maxTotal` | Recommended `maxIdle` | +|-----------------|-------------------|------------------------|----------------------| +| Small | 1-2 | 20 (default) | 10 (default) | +| Medium | 3-5 | 50 | 25 | +| Large | 6-10 | 100 | 50 | +| Enterprise | 10+ | 200 | 100 | + +**Formula:** `maxTotal = (number of gateways) × 10` is a good starting point. + +--- + +## Performance Tuning + +### Cache TTL (`cacheTtlSeconds`) + +- **Default (1800s / 30min):** Good for typical workloads +- **Short-lived queries (<5min):** Use 600s (10min) +- **Long-running queries (hours):** Use 3600s (1 hour) or more +- **Interactive development:** Use 300s (5min) + +### Health Check Interval (`healthCheckIntervalMs`) + +- **Default (30000ms / 30s):** Balanced check frequency +- **Unstable network:** Increase to 60000ms (1 min) +- **Critical systems:** Decrease to 10000ms (10s) + +### Connection Timeouts (`timeoutMs`) + +- **Default (2000ms):** Good for local/same-datacenter Valkey +- **Cross-region:** Increase to 5000ms +- **High latency network:** Increase to 10000ms + +--- + +## Monitoring + +Valkey cache exposes the following metrics (accessible via `ValkeyDistributedCache` instance): + +```java +long hits = cache.getCacheHits(); +long misses = cache.getCacheMisses(); +long writes = cache.getCacheWrites(); +long errors = cache.getCacheErrors(); +double hitRate = cache.getCacheHitRate(); // Percentage +``` + +### Expected Metrics (Healthy System) + +- **Cache Hit Rate:** 85-95% +- **Cache Errors:** 0 (or very low) +- **Cache Writes:** ~Equal to query submission rate + +### Troubleshooting + +**Low Hit Rate (<70%)** +- Check TTL settings (may be too short) +- Verify Valkey isn't evicting entries (check memory) +- Check if multiple gateway versions deployed (cache key mismatch) + +**High Error Rate** +- Check Valkey connectivity +- Verify password/AUTH configuration +- Review Valkey server logs + +**Connection Pool Exhaustion** +- Increase `maxTotal` setting +- Check for connection leaks (should be none with try-with-resources) + +--- + +## Security Considerations + +### Production Deployment Checklist + +- [ ] **Enable AUTH:** Set `password` in configuration +- [ ] **Use Environment Variables:** Don't hardcode passwords +- [ ] **Network Security:** Deploy Valkey in private VPC/network +- [ ] **Encryption at Rest:** Enable Valkey persistence encryption +- [ ] **TLS/SSL:** (Future enhancement - not yet supported) +- [ ] **Access Control:** Restrict Valkey port (6379) to gateway instances only + +### Example Production Setup + +```yaml +# config.yaml +valkeyConfiguration: + enabled: true + host: ${VALKEY_INTERNAL_HOST} + port: 6379 + password: ${VALKEY_PASSWORD} +``` + +```bash +# Environment variables (set in deployment) +export VALKEY_INTERNAL_HOST=valkey.vpc.internal +export VALKEY_PASSWORD=$(vault read -field=password secret/valkey) +``` + +--- + +## Architecture + +### 3-Tier Caching + +``` +Request Flow: +1. Check L1 (Local Guava Cache) → 10k entries, 30min TTL + ├─ Hit: Return immediately (~1ms) + └─ Miss: Continue to L2 + +2. Check L2 (Valkey Distributed Cache) → Shared across gateways + ├─ Hit: Populate L1, return (~5ms) + └─ Miss: Continue to L3 + +3. Check L3 (PostgreSQL Database) → Source of truth + ├─ Found: Populate L2 + L1, return (~50ms) + └─ Not Found: Search all backends via HTTP (~200ms) +``` + +### Cache Keys + +``` +Backend: trino:query:backend:{queryId} +Routing Group: trino:query:routinggroup:{queryId} +External URL: trino:query:externalurl:{queryId} +``` + +--- + +## Migration Guide + +### From Single Gateway to Multi-Gateway + +1. **Deploy Valkey server** (standalone or cluster) +2. **Update config.yaml** on all gateways: + ```yaml + valkeyConfiguration: + enabled: true + host: valkey.internal + port: 6379 + password: ${VALKEY_PASSWORD} + ``` +3. **Restart gateways** (rolling restart recommended) +4. **Monitor metrics** to verify cache hit rates + +No data migration needed - cache will populate automatically. + +--- + +## FAQ + +**Q: Do I need Valkey if I only have one gateway?** +A: No. Local Guava cache is sufficient for single-instance deployments. + +**Q: What happens if Valkey goes down?** +A: Graceful degradation - queries continue working, falling back to database. Performance may degrade slightly. + +**Q: Can I use Redis instead of Valkey?** +A: Yes! Valkey is a Redis fork with compatible protocol. Just point to your Redis server. + +**Q: How much memory does Valkey need?** +A: Rough estimate: `(queries per minute) × (average query lifetime in minutes) × 500 bytes` + Example: 1000 q/min × 30 min × 500 bytes = ~15 MB + +**Q: Can I clear the cache?** +A: Yes, via Valkey CLI: `redis-cli -h -a FLUSHDB` + Or selectively: `redis-cli DEL trino:query:backend:*` + +--- + +## Support + +For issues or questions: +- GitHub Issues: https://github.com/trinodb/trino-gateway/issues +- Trino Community Slack: #trino-gateway channel diff --git a/gateway-ha/config.yaml b/gateway-ha/config.yaml index b2e36aabe..0e73afcd2 100644 --- a/gateway-ha/config.yaml +++ b/gateway-ha/config.yaml @@ -23,3 +23,11 @@ clusterStatsConfiguration: monitor: taskDelay: 1m clusterMetricsRegistryRefreshPeriod: 30s + +# Valkey distributed cache (optional - for multi-instance deployments) +valkeyConfiguration: + enabled: false # Set to true to enable distributed caching + host: localhost + port: 6379 + # password: ${VALKEY_PASSWORD} # Uncomment if Valkey requires AUTH + # cacheTtlSeconds: 1800 # Cache TTL in seconds (default: 1800 = 30 minutes) diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml index 69abe8c51..f72517c57 100644 --- a/gateway-ha/pom.xml +++ b/gateway-ha/pom.xml @@ -189,6 +189,12 @@ ${dep.trino.version} + + io.valkey + valkey-java + 5.5.0 + + jakarta.annotation jakarta.annotation-api diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java index 740f1b6bd..55f6ee43b 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/HaGatewayConfiguration.java @@ -47,6 +47,8 @@ public class HaGatewayConfiguration private UIConfiguration uiConfiguration = new UIConfiguration(); + private ValkeyConfiguration valkeyConfiguration = new ValkeyConfiguration(); + // List of Modules with FQCN (Fully Qualified Class Name) private List modules; @@ -267,6 +269,16 @@ public void setProxyResponseConfiguration(ProxyResponseConfiguration proxyRespon this.proxyResponseConfiguration = proxyResponseConfiguration; } + public ValkeyConfiguration getValkeyConfiguration() + { + return valkeyConfiguration; + } + + public void setValkeyConfiguration(ValkeyConfiguration valkeyConfiguration) + { + this.valkeyConfiguration = valkeyConfiguration; + } + private void validateStatementPath(String statementPath, List statementPaths) { if (statementPath.startsWith(V1_STATEMENT_PATH) || diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/ValkeyConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ValkeyConfiguration.java new file mode 100644 index 000000000..6d3b8d683 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/ValkeyConfiguration.java @@ -0,0 +1,165 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.config; + +/** + * Configuration for Valkey distributed cache. + */ +public class ValkeyConfiguration +{ + private boolean enabled; + private String host = "localhost"; + private int port = 6379; + private String password; + private int database; + private int maxTotal = 20; + private int maxIdle = 10; + private int minIdle = 5; + private int timeoutMs = 2000; + private long cacheTtlSeconds = 1800; // 30 minutes default + private long healthCheckIntervalMs = 30000; // 30 seconds default + + public ValkeyConfiguration() {} + + public boolean isEnabled() + { + return enabled; + } + + public void setEnabled(boolean enabled) + { + this.enabled = enabled; + } + + public String getHost() + { + return host; + } + + public void setHost(String host) + { + this.host = host; + } + + public int getPort() + { + return port; + } + + public void setPort(int port) + { + if (port < 1 || port > 65535) { + throw new IllegalArgumentException("Port must be between 1 and 65535, got: " + port); + } + this.port = port; + } + + public String getPassword() + { + return password; + } + + public void setPassword(String password) + { + this.password = password; + } + + public int getDatabase() + { + return database; + } + + public void setDatabase(int database) + { + this.database = database; + } + + public int getMaxTotal() + { + return maxTotal; + } + + public void setMaxTotal(int maxTotal) + { + if (maxTotal < 1) { + throw new IllegalArgumentException("maxTotal must be at least 1, got: " + maxTotal); + } + this.maxTotal = maxTotal; + } + + public int getMaxIdle() + { + return maxIdle; + } + + public void setMaxIdle(int maxIdle) + { + if (maxIdle < 0) { + throw new IllegalArgumentException("maxIdle cannot be negative, got: " + maxIdle); + } + this.maxIdle = maxIdle; + } + + public int getMinIdle() + { + return minIdle; + } + + public void setMinIdle(int minIdle) + { + if (minIdle < 0) { + throw new IllegalArgumentException("minIdle cannot be negative, got: " + minIdle); + } + this.minIdle = minIdle; + } + + public int getTimeoutMs() + { + return timeoutMs; + } + + public void setTimeoutMs(int timeoutMs) + { + if (timeoutMs < 0) { + throw new IllegalArgumentException("timeoutMs cannot be negative, got: " + timeoutMs); + } + this.timeoutMs = timeoutMs; + } + + public long getCacheTtlSeconds() + { + return cacheTtlSeconds; + } + + public void setCacheTtlSeconds(long cacheTtlSeconds) + { + if (cacheTtlSeconds < 0) { + throw new IllegalArgumentException("cacheTtlSeconds cannot be negative, got: " + cacheTtlSeconds); + } + this.cacheTtlSeconds = cacheTtlSeconds; + } + + public long getHealthCheckIntervalMs() + { + return healthCheckIntervalMs; + } + + public void setHealthCheckIntervalMs(long healthCheckIntervalMs) + { + if (healthCheckIntervalMs < 1000) { + throw new IllegalArgumentException("healthCheckIntervalMs must be at least 1000ms, got: " + healthCheckIntervalMs); + } + this.healthCheckIntervalMs = healthCheckIntervalMs; + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java b/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java index f74aea935..64efab027 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/module/HaGatewayProviderModule.java @@ -13,6 +13,7 @@ */ package io.trino.gateway.ha.module; +import com.google.common.collect.ImmutableList; import com.google.inject.AbstractModule; import com.google.inject.Provides; import com.google.inject.Scopes; @@ -37,12 +38,15 @@ import io.trino.gateway.ha.config.DataStoreConfiguration; import io.trino.gateway.ha.config.GatewayCookieConfigurationPropertiesProvider; import io.trino.gateway.ha.config.HaGatewayConfiguration; +import io.trino.gateway.ha.config.MonitorConfiguration; import io.trino.gateway.ha.config.OAuth2GatewayCookieConfigurationPropertiesProvider; import io.trino.gateway.ha.config.RoutingRulesConfiguration; import io.trino.gateway.ha.config.RulesExternalConfiguration; +import io.trino.gateway.ha.config.ValkeyConfiguration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; import io.trino.gateway.ha.persistence.RecordAndAnnotatedConstructorMapper; import io.trino.gateway.ha.router.BackendStateManager; +import io.trino.gateway.ha.router.DistributedCache; import io.trino.gateway.ha.router.ForRouter; import io.trino.gateway.ha.router.GatewayBackendManager; import io.trino.gateway.ha.router.HaGatewayManager; @@ -52,6 +56,8 @@ import io.trino.gateway.ha.router.QueryHistoryManager; import io.trino.gateway.ha.router.ResourceGroupsManager; import io.trino.gateway.ha.router.RoutingGroupSelector; +import io.trino.gateway.ha.router.RoutingManager; +import io.trino.gateway.ha.router.ValkeyDistributedCache; import io.trino.gateway.ha.security.AuthorizationManager; import io.trino.gateway.ha.security.LbAuthorizer; import io.trino.gateway.ha.security.LbFormAuthManager; @@ -65,6 +71,8 @@ import org.jdbi.v3.core.Jdbi; import org.jdbi.v3.sqlobject.SqlObjectPlugin; +import java.util.List; + import static com.google.inject.multibindings.Multibinder.newSetBinder; import static io.airlift.jaxrs.JaxrsBinder.jaxrsBinder; import static io.trino.gateway.ha.config.ClusterStatsMonitorType.INFO_API; @@ -198,4 +206,48 @@ public static ClusterStatsMonitor getClusterStatsMonitor(@ForMonitor HttpClient case NOOP -> new NoopClusterStatsMonitor(); }; } + + @Provides + @Singleton + public List getClusterStatsObservers( + RoutingManager mgr, + BackendStateManager backendStateManager) + { + return ImmutableList.builder() + .add(new HealthCheckObserver(mgr)) + .add(new ClusterStatsObserver(backendStateManager)) + .build(); + } + + @Provides + @Singleton + public MonitorConfiguration getMonitorConfiguration() + { + return configuration.getMonitor(); + } + + @Provides + @Singleton + public ValkeyConfiguration getValkeyConfiguration() + { + return configuration.getValkeyConfiguration(); + } + + @Provides + @Singleton + public DistributedCache getDistributedCache() + { + ValkeyConfiguration valkeyConfig = configuration.getValkeyConfiguration(); + return new ValkeyDistributedCache( + valkeyConfig.getHost(), + valkeyConfig.getPort(), + valkeyConfig.getPassword(), + valkeyConfig.getDatabase(), + valkeyConfig.isEnabled(), + valkeyConfig.getMaxTotal(), + valkeyConfig.getMaxIdle(), + valkeyConfig.getMinIdle(), + valkeyConfig.getTimeoutMs(), + valkeyConfig.getHealthCheckIntervalMs()); + } } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java index 996ee158c..46d4423a3 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java @@ -24,6 +24,7 @@ import io.trino.gateway.ha.clustermonitor.TrinoStatus; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.RoutingConfiguration; +import io.trino.gateway.ha.config.ValkeyConfiguration; import jakarta.annotation.Nullable; import jakarta.annotation.PreDestroy; import jakarta.ws.rs.HttpMethod; @@ -50,20 +51,54 @@ public abstract class BaseRoutingManager implements RoutingManager { private static final Logger log = Logger.get(BaseRoutingManager.class); + + /** + * Cache key format: trino:query:backend:{queryId} + * Stores the backend URL for each query. + * TTL: Configurable (default 30 minutes). + * Scope: Shared across all gateway instances via Valkey. + */ + private static final String CACHE_KEY_PREFIX_BACKEND = "trino:query:backend:"; + + /** + * Cache key format: trino:query:routinggroup:{queryId} + * Stores the routing group for each query. + * TTL: Configurable (default 30 minutes). + * Scope: Shared across all gateway instances via Valkey. + */ + private static final String CACHE_KEY_PREFIX_ROUTING_GROUP = "trino:query:routinggroup:"; + + /** + * Cache key format: trino:query:externalurl:{queryId} + * Stores the external URL for each query (lazy-loaded on first access). + * TTL: Configurable (default 30 minutes). + * Scope: Shared across all gateway instances via Valkey. + */ + private static final String CACHE_KEY_PREFIX_EXTERNAL_URL = "trino:query:externalurl:"; + private final ExecutorService executorService = Executors.newFixedThreadPool(5); private final GatewayBackendManager gatewayBackendManager; private final ConcurrentHashMap backendToStatus; private final String defaultRoutingGroup; private final QueryHistoryManager queryHistoryManager; + private final DistributedCache distributedCache; + private final long cacheTtlSeconds; private final LoadingCache queryIdBackendCache; private final LoadingCache queryIdRoutingGroupCache; private final LoadingCache queryIdExternalUrlCache; - public BaseRoutingManager(GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager, RoutingConfiguration routingConfiguration) + public BaseRoutingManager( + GatewayBackendManager gatewayBackendManager, + QueryHistoryManager queryHistoryManager, + RoutingConfiguration routingConfiguration, + DistributedCache distributedCache, + ValkeyConfiguration valkeyConfiguration) { this.gatewayBackendManager = gatewayBackendManager; this.defaultRoutingGroup = routingConfiguration.getDefaultRoutingGroup(); this.queryHistoryManager = queryHistoryManager; + this.distributedCache = distributedCache; + this.cacheTtlSeconds = valkeyConfiguration.getCacheTtlSeconds(); this.queryIdBackendCache = buildCache(this::findBackendForUnknownQueryId); this.queryIdRoutingGroupCache = buildCache(this::findRoutingGroupForUnknownQueryId); this.queryIdExternalUrlCache = buildCache(this::findExternalUrlForUnknownQueryId); @@ -78,13 +113,29 @@ public BaseRoutingManager(GatewayBackendManager gatewayBackendManager, QueryHist @Override public void setBackendForQueryId(String queryId, String backend) { + // L1: Guava cache queryIdBackendCache.put(queryId, backend); + // L2: Valkey distributed cache (write-through) + try { + distributedCache.set(CACHE_KEY_PREFIX_BACKEND + queryId, backend, cacheTtlSeconds); + } + catch (Exception e) { + log.warn(e, "Failed to write backend mapping to Valkey for queryId: %s", queryId); + } } @Override public void setRoutingGroupForQueryId(String queryId, String routingGroup) { + // L1: Guava cache queryIdRoutingGroupCache.put(queryId, routingGroup); + // L2: Valkey distributed cache (write-through) + try { + distributedCache.set(CACHE_KEY_PREFIX_ROUTING_GROUP + queryId, routingGroup, cacheTtlSeconds); + } + catch (Exception e) { + log.warn(e, "Failed to write routing group mapping to Valkey for queryId: %s", queryId); + } } /** @@ -179,18 +230,51 @@ public void updateClusterStats(List stats) @VisibleForTesting void setExternalUrlForQueryId(String queryId, String externalUrl) { + // L1: Guava cache queryIdExternalUrlCache.put(queryId, externalUrl); + // L2: Valkey distributed cache (write-through) + try { + distributedCache.set(CACHE_KEY_PREFIX_EXTERNAL_URL + queryId, externalUrl, cacheTtlSeconds); + } + catch (Exception e) { + log.debug(e, "Failed to write external URL mapping to Valkey for queryId: %s", queryId); + } } @VisibleForTesting String findBackendForUnknownQueryId(String queryId) { - String backend; + String backend = null; + + // L2: Check Valkey distributed cache + try { + Optional valkeyResult = distributedCache.get(CACHE_KEY_PREFIX_BACKEND + queryId); + if (valkeyResult.isPresent()) { + backend = valkeyResult.get(); + log.debug("Found backend mapping in Valkey for queryId: %s", queryId); + return backend; + } + } + catch (Exception e) { + log.debug(e, "Failed to read from Valkey for queryId: %s, falling through to database", queryId); + } + + // L3: Check database backend = queryHistoryManager.getBackendForQueryId(queryId); - if (Strings.isNullOrEmpty(backend)) { - log.debug("Unable to find backend mapping for [%s]. Searching for suitable backend", queryId); - backend = searchAllBackendForQuery(queryId); + if (!Strings.isNullOrEmpty(backend)) { + // Populate Valkey cache for next time (write-back) + try { + distributedCache.set(CACHE_KEY_PREFIX_BACKEND + queryId, backend, cacheTtlSeconds); + } + catch (Exception e) { + log.debug(e, "Failed to write backend mapping to Valkey for queryId: %s", queryId); + } + return backend; } + + // L4: HTTP search across all backends + log.debug("Unable to find backend mapping for [%s]. Searching for suitable backend", queryId); + backend = searchAllBackendForQuery(queryId); return backend; } @@ -245,7 +329,26 @@ private String searchAllBackendForQuery(String queryId) */ private String findRoutingGroupForUnknownQueryId(String queryId) { - String routingGroup = queryHistoryManager.getRoutingGroupForQueryId(queryId); + String routingGroup = null; + + // L2: Check Valkey distributed cache + try { + Optional valkeyResult = distributedCache.get(CACHE_KEY_PREFIX_ROUTING_GROUP + queryId); + if (valkeyResult.isPresent()) { + routingGroup = valkeyResult.get(); + log.debug("Found routing group mapping in Valkey for queryId: %s", queryId); + // Still populate L1 Guava cache + queryIdRoutingGroupCache.put(queryId, routingGroup); + return routingGroup; + } + } + catch (Exception e) { + log.debug(e, "Failed to read routing group from Valkey for queryId: %s, falling through to database", queryId); + } + + // L3: Check database + routingGroup = queryHistoryManager.getRoutingGroupForQueryId(queryId); + // setRoutingGroupForQueryId will populate both L1 and L2 setRoutingGroupForQueryId(queryId, routingGroup); return routingGroup; } @@ -255,7 +358,26 @@ private String findRoutingGroupForUnknownQueryId(String queryId) */ private String findExternalUrlForUnknownQueryId(String queryId) { - String externalUrl = queryHistoryManager.getExternalUrlForQueryId(queryId); + String externalUrl = null; + + // L2: Check Valkey distributed cache + try { + Optional valkeyResult = distributedCache.get(CACHE_KEY_PREFIX_EXTERNAL_URL + queryId); + if (valkeyResult.isPresent()) { + externalUrl = valkeyResult.get(); + log.debug("Found external URL mapping in Valkey for queryId: %s", queryId); + // Still populate L1 Guava cache + queryIdExternalUrlCache.put(queryId, externalUrl); + return externalUrl; + } + } + catch (Exception e) { + log.debug(e, "Failed to read external URL from Valkey for queryId: %s, falling through to database", queryId); + } + + // L3: Check database + externalUrl = queryHistoryManager.getExternalUrlForQueryId(queryId); + // setExternalUrlForQueryId will populate both L1 and L2 setExternalUrlForQueryId(queryId, externalUrl); return externalUrl; } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/DistributedCache.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/DistributedCache.java new file mode 100644 index 000000000..a176cf9dc --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/DistributedCache.java @@ -0,0 +1,78 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.router; + +import java.util.Optional; + +/** + * Interface for distributed caching operations. + * Provides a simple key-value cache abstraction for sharing data across gateway instances. + */ +public interface DistributedCache +{ + /** + * Retrieves a value from the cache. + * + * @param key the cache key + * @return Optional containing the value if present, empty otherwise + */ + Optional get(String key); + + /** + * Stores a value in the cache with TTL. + * + * @param key the cache key + * @param value the value to store + * @param ttlSeconds time-to-live in seconds + * @return true if successful, false otherwise + */ + boolean set(String key, String value, long ttlSeconds); + + /** + * Stores a value in the cache without TTL (persists until explicitly deleted). + * + * @param key the cache key + * @param value the value to store + * @return true if successful, false otherwise + */ + boolean set(String key, String value); + + /** + * Removes a value from the cache. + * + * @param key the cache key + * @return true if the key existed and was deleted, false otherwise + */ + boolean delete(String key); + + /** + * Removes multiple values from the cache. + * + * @param keys the cache keys to delete + * @return number of keys that were deleted + */ + long delete(String... keys); + + /** + * Checks if the cache is available and healthy. + * + * @return true if cache is operational, false otherwise + */ + boolean isHealthy(); + + /** + * Closes the cache connection and releases resources. + */ + void close(); +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java index f3a2345dc..d28e65919 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java @@ -21,6 +21,7 @@ import io.trino.gateway.ha.clustermonitor.TrinoStatus; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.RoutingConfiguration; +import io.trino.gateway.ha.config.ValkeyConfiguration; import java.util.HashMap; import java.util.List; @@ -157,9 +158,11 @@ ProxyBackendConfiguration backendConfiguration() public QueryCountBasedRouter( GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager, - RoutingConfiguration routingConfiguration) + RoutingConfiguration routingConfiguration, + DistributedCache distributedCache, + ValkeyConfiguration valkeyConfiguration) { - super(gatewayBackendManager, queryHistoryManager, routingConfiguration); + super(gatewayBackendManager, queryHistoryManager, routingConfiguration, distributedCache, valkeyConfiguration); clusterStats = new ConcurrentHashMap<>(); } diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/StochasticRoutingManager.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/StochasticRoutingManager.java index e82401015..20447cfbf 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/router/StochasticRoutingManager.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/StochasticRoutingManager.java @@ -16,6 +16,7 @@ import com.google.inject.Inject; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.RoutingConfiguration; +import io.trino.gateway.ha.config.ValkeyConfiguration; import java.util.List; import java.util.Optional; @@ -30,9 +31,11 @@ public class StochasticRoutingManager public StochasticRoutingManager( GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager, - RoutingConfiguration routingConfiguration) + RoutingConfiguration routingConfiguration, + DistributedCache distributedCache, + ValkeyConfiguration valkeyConfiguration) { - super(gatewayBackendManager, queryHistoryManager, routingConfiguration); + super(gatewayBackendManager, queryHistoryManager, routingConfiguration, distributedCache, valkeyConfiguration); } @Override diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/router/ValkeyDistributedCache.java b/gateway-ha/src/main/java/io/trino/gateway/ha/router/ValkeyDistributedCache.java new file mode 100644 index 000000000..4ad5c0642 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/router/ValkeyDistributedCache.java @@ -0,0 +1,312 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.router; + +import io.airlift.log.Logger; +import io.valkey.Jedis; +import io.valkey.JedisPool; +import io.valkey.JedisPoolConfig; +import io.valkey.exceptions.JedisException; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Valkey-based distributed cache implementation with graceful degradation. + * Falls through to database on Valkey unavailability. + */ +public class ValkeyDistributedCache + implements DistributedCache +{ + private static final Logger log = Logger.get(ValkeyDistributedCache.class); + + private final JedisPool pool; + private final boolean enabled; + private final long healthCheckIntervalMs; + private volatile boolean healthy = true; + private volatile long lastHealthCheck; + + // Cache metrics + private final AtomicLong cacheHits = new AtomicLong(); + private final AtomicLong cacheMisses = new AtomicLong(); + private final AtomicLong cacheWrites = new AtomicLong(); + private final AtomicLong cacheErrors = new AtomicLong(); + + /** + * Creates a new Valkey distributed cache. + * + * @param host Valkey server host + * @param port Valkey server port + * @param password Optional password (can be null) + * @param database Database index (default 0) + * @param enabled Whether Valkey is enabled + * @param maxTotal Maximum total connections in pool + * @param maxIdle Maximum idle connections + * @param minIdle Minimum idle connections + * @param timeoutMs Connection timeout in milliseconds + * @param healthCheckIntervalMs Health check interval in milliseconds + */ + public ValkeyDistributedCache( + String host, + int port, + String password, + int database, + boolean enabled, + int maxTotal, + int maxIdle, + int minIdle, + int timeoutMs, + long healthCheckIntervalMs) + { + this.enabled = enabled; + this.healthCheckIntervalMs = healthCheckIntervalMs; + + if (!enabled) { + log.info("Valkey distributed cache is disabled"); + this.pool = null; + this.healthy = false; + return; + } + + JedisPoolConfig poolConfig = new JedisPoolConfig(); + poolConfig.setMaxTotal(maxTotal); + poolConfig.setMaxIdle(maxIdle); + poolConfig.setMinIdle(minIdle); + poolConfig.setTestOnBorrow(true); + poolConfig.setTestOnReturn(true); + poolConfig.setTestWhileIdle(true); + poolConfig.setMinEvictableIdleTime(Duration.ofMinutes(1)); + poolConfig.setTimeBetweenEvictionRuns(Duration.ofSeconds(30)); + poolConfig.setNumTestsPerEvictionRun(3); + poolConfig.setBlockWhenExhausted(true); + + JedisPool tempPool = null; + try { + if (password != null && !password.isEmpty()) { + tempPool = new JedisPool(poolConfig, host, port, timeoutMs, password, database); + } + else { + tempPool = new JedisPool(poolConfig, host, port, timeoutMs, null, database); + } + log.info("Valkey distributed cache initialized: %s:%d (database: %d)", host, port, database); + + // Test connection - gracefully degrade if unavailable + try (Jedis jedis = tempPool.getResource()) { + jedis.ping(); + log.info("Valkey health check passed"); + this.healthy = true; + } + catch (Exception healthCheckException) { + log.warn(healthCheckException, "Initial Valkey health check failed, will retry later"); + this.healthy = false; + } + } + catch (Exception e) { + log.error(e, "Failed to create Valkey connection pool, distributed cache will be disabled"); + this.healthy = false; + if (tempPool != null) { + tempPool.close(); + tempPool = null; + } + } + this.pool = tempPool; + } + + @Override + public Optional get(String key) + { + if (!enabled || !healthy) { + return Optional.empty(); + } + + try (Jedis jedis = pool.getResource()) { + String value = jedis.get(key); + if (value != null) { + cacheHits.incrementAndGet(); + } + else { + cacheMisses.incrementAndGet(); + } + return Optional.ofNullable(value); + } + catch (JedisException e) { + log.warn(e, "Valkey get failed for key: %s (gracefully degrading)", key); + cacheErrors.incrementAndGet(); + markUnhealthy(); + return Optional.empty(); + } + } + + @Override + public boolean set(String key, String value, long ttlSeconds) + { + if (!enabled || !healthy) { + return false; + } + + try (Jedis jedis = pool.getResource()) { + if (ttlSeconds > 0) { + jedis.setex(key, ttlSeconds, value); + } + else { + jedis.set(key, value); + } + cacheWrites.incrementAndGet(); + return true; + } + catch (JedisException e) { + log.warn(e, "Valkey set failed for key: %s (gracefully degrading)", key); + cacheErrors.incrementAndGet(); + markUnhealthy(); + return false; + } + } + + @Override + public boolean set(String key, String value) + { + return set(key, value, 0); + } + + @Override + public boolean delete(String key) + { + if (!enabled || !healthy) { + return false; + } + + try (Jedis jedis = pool.getResource()) { + Long deleted = jedis.del(key); + return deleted != null && deleted > 0; + } + catch (JedisException e) { + log.warn(e, "Valkey delete failed for key: %s (gracefully degrading)", key); + markUnhealthy(); + return false; + } + } + + @Override + public long delete(String... keys) + { + if (!enabled || !healthy || keys == null || keys.length == 0) { + return 0; + } + + try (Jedis jedis = pool.getResource()) { + Long deleted = jedis.del(keys); + return deleted != null ? deleted : 0; + } + catch (JedisException e) { + log.warn(e, "Valkey bulk delete failed (gracefully degrading)"); + markUnhealthy(); + return 0; + } + } + + @Override + public boolean isHealthy() + { + long now = System.currentTimeMillis(); + if (now - lastHealthCheck > healthCheckIntervalMs) { + checkHealth(); + } + return healthy; + } + + private void checkHealth() + { + if (!enabled || pool == null) { + healthy = false; + lastHealthCheck = System.currentTimeMillis(); + return; + } + + try (Jedis jedis = pool.getResource()) { + String pong = jedis.ping(); + healthy = "PONG".equals(pong); + if (healthy) { + log.debug("Valkey health check passed"); + } + } + catch (Exception e) { + log.warn(e, "Valkey health check failed"); + healthy = false; + } + finally { + lastHealthCheck = System.currentTimeMillis(); + } + } + + private void markUnhealthy() + { + if (healthy) { + log.warn("Marking Valkey as unhealthy due to operation failure"); + healthy = false; + lastHealthCheck = 0; // Force immediate health check on next operation + } + } + + @Override + public void close() + { + if (pool != null && !pool.isClosed()) { + log.info("Closing Valkey connection pool"); + pool.close(); + } + } + + /** + * Returns the number of cache hits. + */ + public long getCacheHits() + { + return cacheHits.get(); + } + + /** + * Returns the number of cache misses. + */ + public long getCacheMisses() + { + return cacheMisses.get(); + } + + /** + * Returns the number of successful cache writes. + */ + public long getCacheWrites() + { + return cacheWrites.get(); + } + + /** + * Returns the number of cache operation errors. + */ + public long getCacheErrors() + { + return cacheErrors.get(); + } + + /** + * Returns the cache hit rate as a percentage (0-100). + * Returns 0 if no operations have occurred. + */ + public double getCacheHitRate() + { + long total = cacheHits.get() + cacheMisses.get(); + return total == 0 ? 0.0 : (cacheHits.get() * 100.0 / total); + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyRequestHandler.java b/gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyRequestHandler.java index 8da9713ca..96c77545d 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyRequestHandler.java +++ b/gateway-ha/src/main/java/io/trino/gateway/proxyserver/ProxyRequestHandler.java @@ -187,13 +187,23 @@ private void performRequest( FluentFuture future = executeHttp(request); + log.info("PATH DEBUG: Request URI: %s, Path: %s, Method: %s", request.getUri(), request.getUri().getPath(), request.getMethod()); + log.info("PATH DEBUG: Statement paths: %s", statementPaths); + boolean pathMatches = statementPaths.stream().anyMatch(request.getUri().getPath()::startsWith); + boolean isPost = request.getMethod().equals(HttpMethod.POST); + log.info("PATH DEBUG: Path matches: %s, Is POST: %s, Condition result: %s", pathMatches, isPost, pathMatches && isPost); + if (statementPaths.stream().anyMatch(request.getUri().getPath()::startsWith) && request.getMethod().equals(HttpMethod.POST)) { + log.info("PATH DEBUG: Condition passed! Will call recordBackendForQueryId"); Optional username = ((TrinoRequestUser) servletRequest.getAttribute(TRINO_REQUEST_USER)).getUser(); future = future.transform(response -> recordBackendForQueryId(request, response, username, routingDestination), executor); if (includeClusterInfoInResponse) { cookieBuilder.add(new NewCookie.Builder("trinoClusterHost").value(remoteUri.getHost()).build()); } } + else { + log.info("PATH DEBUG: Condition failed! Will NOT call recordBackendForQueryId"); + } setupAsyncResponse( asyncResponse, @@ -275,16 +285,19 @@ private ProxyResponse recordBackendForQueryId(Request request, ProxyResponse res log.debug("Extracting proxy destination : [%s] for request : [%s]", queryDetail.getBackendUrl(), request.getUri()); + log.info("CACHE DEBUG: Response status code: %s, Request URI: %s", response.statusCode(), request.getUri()); if (response.statusCode() == OK.getStatusCode()) { + log.info("CACHE DEBUG: Processing 200 OK response, response body length: %d", response.body().length()); try { HashMap results = OBJECT_MAPPER.readValue(response.body(), HashMap.class); - queryDetail.setQueryId(results.get("id")); + String queryId = results.get("id"); + log.info("CACHE DEBUG: Extracted queryId from response: %s", queryId); + queryDetail.setQueryId(queryId); routingManager.setBackendForQueryId(queryDetail.getQueryId(), queryDetail.getBackendUrl()); routingManager.setRoutingGroupForQueryId(queryDetail.getQueryId(), routingDestination.routingGroup()); - log.debug("QueryId [%s] mapped with proxy [%s]", queryDetail.getQueryId(), queryDetail.getBackendUrl()); } catch (IOException e) { - log.error("Failed to get QueryId from response [%s] , Status code [%s]", response.body(), response.statusCode()); + log.warn("Failed to parse query response for caching: %s", e.getMessage()); } } else { diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/config/TestValkeyConfiguration.java b/gateway-ha/src/test/java/io/trino/gateway/ha/config/TestValkeyConfiguration.java new file mode 100644 index 000000000..5174045f2 --- /dev/null +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/config/TestValkeyConfiguration.java @@ -0,0 +1,244 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.config; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +final class TestValkeyConfiguration +{ + @Test + void testDefaultValues() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + assertThat(config.isEnabled()).isFalse(); + assertThat(config.getHost()).isEqualTo("localhost"); + assertThat(config.getPort()).isEqualTo(6379); + assertThat(config.getPassword()).isNull(); + assertThat(config.getDatabase()).isEqualTo(0); + assertThat(config.getMaxTotal()).isEqualTo(20); + assertThat(config.getMaxIdle()).isEqualTo(10); + assertThat(config.getMinIdle()).isEqualTo(5); + assertThat(config.getTimeoutMs()).isEqualTo(2000); + assertThat(config.getCacheTtlSeconds()).isEqualTo(1800); + } + + @Test + void testSettersAndGetters() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + config.setEnabled(true); + config.setHost("valkey.example.com"); + config.setPort(7000); + config.setPassword("secret-password"); + config.setDatabase(2); + config.setMaxTotal(50); + config.setMaxIdle(25); + config.setMinIdle(10); + config.setTimeoutMs(5000); + config.setCacheTtlSeconds(3600); + + assertThat(config.isEnabled()).isTrue(); + assertThat(config.getHost()).isEqualTo("valkey.example.com"); + assertThat(config.getPort()).isEqualTo(7000); + assertThat(config.getPassword()).isEqualTo("secret-password"); + assertThat(config.getDatabase()).isEqualTo(2); + assertThat(config.getMaxTotal()).isEqualTo(50); + assertThat(config.getMaxIdle()).isEqualTo(25); + assertThat(config.getMinIdle()).isEqualTo(10); + assertThat(config.getTimeoutMs()).isEqualTo(5000); + assertThat(config.getCacheTtlSeconds()).isEqualTo(3600); + } + + @Test + void testEnabledFlag() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + assertThat(config.isEnabled()).isFalse(); + + config.setEnabled(true); + assertThat(config.isEnabled()).isTrue(); + + config.setEnabled(false); + assertThat(config.isEnabled()).isFalse(); + } + + @Test + void testHostConfiguration() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + config.setHost("192.168.1.100"); + assertThat(config.getHost()).isEqualTo("192.168.1.100"); + + config.setHost("valkey-cluster.local"); + assertThat(config.getHost()).isEqualTo("valkey-cluster.local"); + } + + @Test + void testPortConfiguration() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + config.setPort(6380); + assertThat(config.getPort()).isEqualTo(6380); + + config.setPort(7000); + assertThat(config.getPort()).isEqualTo(7000); + } + + @Test + void testPasswordConfiguration() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + assertThat(config.getPassword()).isNull(); + + config.setPassword("my-secret-password"); + assertThat(config.getPassword()).isEqualTo("my-secret-password"); + + config.setPassword(null); + assertThat(config.getPassword()).isNull(); + } + + @Test + void testDatabaseConfiguration() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + config.setDatabase(0); + assertThat(config.getDatabase()).isEqualTo(0); + + config.setDatabase(5); + assertThat(config.getDatabase()).isEqualTo(5); + + config.setDatabase(15); + assertThat(config.getDatabase()).isEqualTo(15); + } + + @Test + void testPoolSizeConfiguration() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + config.setMaxTotal(100); + config.setMaxIdle(50); + config.setMinIdle(25); + + assertThat(config.getMaxTotal()).isEqualTo(100); + assertThat(config.getMaxIdle()).isEqualTo(50); + assertThat(config.getMinIdle()).isEqualTo(25); + } + + @Test + void testTimeoutConfiguration() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + config.setTimeoutMs(1000); + assertThat(config.getTimeoutMs()).isEqualTo(1000); + + config.setTimeoutMs(10000); + assertThat(config.getTimeoutMs()).isEqualTo(10000); + } + + @Test + void testCacheTtlConfiguration() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + config.setCacheTtlSeconds(600); + assertThat(config.getCacheTtlSeconds()).isEqualTo(600); + + config.setCacheTtlSeconds(7200); + assertThat(config.getCacheTtlSeconds()).isEqualTo(7200); + } + + @Test + void testInvalidPortValidation() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + assertThatThrownBy(() -> config.setPort(0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Port must be between 1 and 65535"); + + assertThatThrownBy(() -> config.setPort(65536)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Port must be between 1 and 65535"); + + assertThatThrownBy(() -> config.setPort(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Port must be between 1 and 65535"); + } + + @Test + void testInvalidMaxTotalValidation() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + assertThatThrownBy(() -> config.setMaxTotal(0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("maxTotal must be at least 1"); + + assertThatThrownBy(() -> config.setMaxTotal(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("maxTotal must be at least 1"); + } + + @Test + void testInvalidMaxIdleValidation() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + assertThatThrownBy(() -> config.setMaxIdle(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("maxIdle cannot be negative"); + } + + @Test + void testInvalidMinIdleValidation() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + assertThatThrownBy(() -> config.setMinIdle(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("minIdle cannot be negative"); + } + + @Test + void testInvalidTimeoutValidation() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + assertThatThrownBy(() -> config.setTimeoutMs(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("timeoutMs cannot be negative"); + } + + @Test + void testInvalidCacheTtlValidation() + { + ValkeyConfiguration config = new ValkeyConfiguration(); + + assertThatThrownBy(() -> config.setCacheTtlSeconds(-1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("cacheTtlSeconds cannot be negative"); + } +} diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/NoopDistributedCache.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/NoopDistributedCache.java new file mode 100644 index 000000000..7041c25a3 --- /dev/null +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/NoopDistributedCache.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.router; + +import java.util.Optional; + +/** + * No-op distributed cache for testing purposes. + */ +public class NoopDistributedCache + implements DistributedCache +{ + @Override + public Optional get(String key) + { + return Optional.empty(); + } + + @Override + public boolean set(String key, String value, long ttlSeconds) + { + return true; + } + + @Override + public boolean set(String key, String value) + { + return true; + } + + @Override + public boolean delete(String key) + { + return true; + } + + @Override + public long delete(String... keys) + { + return keys.length; + } + + @Override + public boolean isHealthy() + { + return true; + } + + @Override + public void close() + { + // no-op + } +} diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java index 91e476aeb..aa05909c1 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestQueryCountBasedRouter.java @@ -19,6 +19,7 @@ import io.trino.gateway.ha.config.DataStoreConfiguration; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.RoutingConfiguration; +import io.trino.gateway.ha.config.ValkeyConfiguration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -179,7 +180,9 @@ public void init() JdbcConnectionManager connectionManager = createTestingJdbcConnectionManager(dataStoreConfig); backendManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration); historyManager = new HaQueryHistoryManager(connectionManager.getJdbi(), dataStoreConfig); - queryCountBasedRouter = new QueryCountBasedRouter(backendManager, historyManager, routingConfiguration); + DistributedCache distributedCache = new NoopDistributedCache(); + ValkeyConfiguration valkeyConfiguration = new ValkeyConfiguration(); + queryCountBasedRouter = new QueryCountBasedRouter(backendManager, historyManager, routingConfiguration, distributedCache, valkeyConfiguration); populateData(); queryCountBasedRouter.updateClusterStats(clusters); } diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerExternalUrlCache.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerExternalUrlCache.java index e82ebb049..0fe042434 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerExternalUrlCache.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerExternalUrlCache.java @@ -14,6 +14,7 @@ package io.trino.gateway.ha.router; import io.trino.gateway.ha.config.RoutingConfiguration; +import io.trino.gateway.ha.config.ValkeyConfiguration; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -134,7 +135,7 @@ private static class TestRoutingManager private TestRoutingManager(GatewayBackendManager gatewayBackendManager, QueryHistoryManager queryHistoryManager, RoutingConfiguration routingConfiguration) { - super(gatewayBackendManager, queryHistoryManager, routingConfiguration); + super(gatewayBackendManager, queryHistoryManager, routingConfiguration, new NoopDistributedCache(), new ValkeyConfiguration()); } @Override diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerNotFound.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerNotFound.java index 8621c0695..b4fa23652 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerNotFound.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestRoutingManagerNotFound.java @@ -15,6 +15,7 @@ import io.trino.gateway.ha.config.DataStoreConfiguration; import io.trino.gateway.ha.config.RoutingConfiguration; +import io.trino.gateway.ha.config.ValkeyConfiguration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; import org.junit.jupiter.api.Test; @@ -35,8 +36,10 @@ public TestRoutingManagerNotFound() GatewayBackendManager backendManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration); QueryHistoryManager historyManager = new HaQueryHistoryManager(connectionManager.getJdbi(), dataStoreConfig); + DistributedCache distributedCache = new NoopDistributedCache(); + ValkeyConfiguration valkeyConfiguration = new ValkeyConfiguration(); - this.routingManager = new StochasticRoutingManager(backendManager, historyManager, routingConfiguration); + this.routingManager = new StochasticRoutingManager(backendManager, historyManager, routingConfiguration, distributedCache, valkeyConfiguration); } @Test diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java index 8714896f5..96770614d 100644 --- a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestStochasticRoutingManager.java @@ -17,6 +17,7 @@ import io.trino.gateway.ha.config.DataStoreConfiguration; import io.trino.gateway.ha.config.ProxyBackendConfiguration; import io.trino.gateway.ha.config.RoutingConfiguration; +import io.trino.gateway.ha.config.ValkeyConfiguration; import io.trino.gateway.ha.persistence.JdbcConnectionManager; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -42,7 +43,9 @@ void setUp() RoutingConfiguration routingConfiguration = new RoutingConfiguration(); backendManager = new HaGatewayManager(connectionManager.getJdbi(), routingConfiguration); historyManager = new HaQueryHistoryManager(connectionManager.getJdbi(), dataStoreConfig); - haRoutingManager = new StochasticRoutingManager(backendManager, historyManager, routingConfiguration); + DistributedCache distributedCache = new NoopDistributedCache(); + ValkeyConfiguration valkeyConfiguration = new ValkeyConfiguration(); + haRoutingManager = new StochasticRoutingManager(backendManager, historyManager, routingConfiguration, distributedCache, valkeyConfiguration); } @Test diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestValkeyDistributedCache.java b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestValkeyDistributedCache.java new file mode 100644 index 000000000..a1a4bc542 --- /dev/null +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/router/TestValkeyDistributedCache.java @@ -0,0 +1,334 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.router; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; + +final class TestValkeyDistributedCache +{ + private ValkeyDistributedCache cache; + + @AfterEach + void cleanup() + { + if (cache != null) { + cache.close(); + } + } + + @Test + void testDisabledCacheReturnsEmptyAndFails() + { + // Create disabled cache + cache = new ValkeyDistributedCache( + "localhost", + 6379, + null, + 0, + false, // disabled + 20, + 10, + 5, + 2000, + 30000); + + // All operations should fail gracefully + assertThat(cache.get("key")).isEmpty(); + assertThat(cache.set("key", "value")).isFalse(); + assertThat(cache.set("key", "value", 100)).isFalse(); + assertThat(cache.delete("key")).isFalse(); + assertThat(cache.delete("key1", "key2")).isEqualTo(0); + assertThat(cache.isHealthy()).isFalse(); + } + + @Test + void testEnabledCacheWithInvalidHostIsUnhealthy() + { + // JedisPool creation is lazy - it doesn't fail immediately + // but the health check will fail + cache = new ValkeyDistributedCache( + "invalid-host-that-does-not-exist-12345", + 6379, + null, + 0, + true, + 20, + 10, + 5, + 100, + 30000); // Very short timeout to make test fast + + // Cache should be marked unhealthy after failed health check + assertThat(cache.isHealthy()).isFalse(); + assertThat(cache.get("key")).isEmpty(); + assertThat(cache.set("key", "value")).isFalse(); + } + + @Test + void testDisabledCacheCloseDoesNothing() + { + cache = new ValkeyDistributedCache( + "localhost", + 6379, + null, + 0, + false, + 20, + 10, + 5, + 2000, + 30000); + + // Should not throw exception + cache.close(); + } + + @Test + void testGetReturnsEmptyWhenUnhealthy() + { + cache = new ValkeyDistributedCache( + "localhost", + 6379, + null, + 0, + false, + 20, + 10, + 5, + 2000, + 30000); + + Optional result = cache.get("testKey"); + assertThat(result).isEmpty(); + } + + @Test + void testSetReturnsFalseWhenUnhealthy() + { + cache = new ValkeyDistributedCache( + "localhost", + 6379, + null, + 0, + false, + 20, + 10, + 5, + 2000, + 30000); + + boolean result = cache.set("testKey", "testValue", 3600); + assertThat(result).isFalse(); + } + + @Test + void testSetWithoutTtlReturnsFalseWhenUnhealthy() + { + cache = new ValkeyDistributedCache( + "localhost", + 6379, + null, + 0, + false, + 20, + 10, + 5, + 2000, + 30000); + + boolean result = cache.set("testKey", "testValue"); + assertThat(result).isFalse(); + } + + @Test + void testDeleteSingleKeyReturnsFalseWhenUnhealthy() + { + cache = new ValkeyDistributedCache( + "localhost", + 6379, + null, + 0, + false, + 20, + 10, + 5, + 2000, + 30000); + + boolean result = cache.delete("testKey"); + assertThat(result).isFalse(); + } + + @Test + void testDeleteMultipleKeysReturnsZeroWhenUnhealthy() + { + cache = new ValkeyDistributedCache( + "localhost", + 6379, + null, + 0, + false, + 20, + 10, + 5, + 2000, + 30000); + + long result = cache.delete("key1", "key2", "key3"); + assertThat(result).isEqualTo(0); + } + + @Test + void testDeleteMultipleKeysWithNullArray() + { + cache = new ValkeyDistributedCache( + "localhost", + 6379, + null, + 0, + false, + 20, + 10, + 5, + 2000, + 30000); + + long result = cache.delete((String[]) null); + assertThat(result).isEqualTo(0); + } + + @Test + void testDeleteMultipleKeysWithEmptyArray() + { + cache = new ValkeyDistributedCache( + "localhost", + 6379, + null, + 0, + false, + 20, + 10, + 5, + 2000, + 30000); + + long result = cache.delete(); + assertThat(result).isEqualTo(0); + } + + @Test + void testIsHealthyReturnsFalseWhenDisabled() + { + cache = new ValkeyDistributedCache( + "localhost", + 6379, + null, + 0, + false, + 20, + 10, + 5, + 2000, + 30000); + + assertThat(cache.isHealthy()).isFalse(); + } + + @Test + void testSetWithZeroTtlBehavesLikeNoTtl() + { + cache = new ValkeyDistributedCache( + "localhost", + 6379, + null, + 0, + false, + 20, + 10, + 5, + 2000, + 30000); + + boolean resultWithZero = cache.set("key1", "value", 0); + boolean resultWithoutTtl = cache.set("key2", "value"); + + // Both should return false when disabled + assertThat(resultWithZero).isEqualTo(resultWithoutTtl); + } + + @Test + void testCacheOperationsWithPassword() + { + // Test that password configuration is accepted + cache = new ValkeyDistributedCache( + "invalid-host", + 6379, + "test-password", + 0, + true, + 20, + 10, + 5, + 100, + 30000); + + // Cache should be unhealthy but initialized + assertThat(cache.isHealthy()).isFalse(); + } + + @Test + void testCacheOperationsWithDatabase() + { + // Test that database configuration is accepted + cache = new ValkeyDistributedCache( + "invalid-host", + 6379, + null, + 5, // database index + true, + 20, + 10, + 5, + 100, + 30000); + + // Cache should be unhealthy but initialized + assertThat(cache.isHealthy()).isFalse(); + } + + @Test + void testCachePoolConfiguration() + { + // Test that pool configuration parameters are accepted + cache = new ValkeyDistributedCache( + "invalid-host", + 6379, + null, + 0, + true, + 50, // maxTotal + 25, // maxIdle + 10, // minIdle + 5000, // timeoutMs + 30000); // healthCheckIntervalMs + + // Cache should be unhealthy but initialized + assertThat(cache.isHealthy()).isFalse(); + } +} diff --git a/mkdocs.yml b/mkdocs.yml index eade2a2b8..909b9e40c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -92,6 +92,7 @@ nav: - Routing logic: routing-logic.md - Gateway API: gateway-api.md - Resource groups API: resource-groups-api.md + - Valkey distributed cache: valkey-configuration.md - Code architecture: design.md - Migration to Airlift: migration-to-airlift.md - Contribute: