Skip to content

Commit 9c59e98

Browse files
committed
Add ability to enforce concurrent query limits
Add the ability to enforce concurrent query limits across a group of webservers. Zookeeper is used to track active queries and the following data: - The query ID - The user who submitted the query - The system the query was submitted on - The query logic the query originated from When the `ActiveQueryTracker` is instructed to track a query, the following nodes will be created in Zookeeper under the 'ActiveQueries' namespace: ``` /users/<userDn>/<queryId> /systems/<systemName>/<queryId> /queryLogics/<queryLogic>/<queryId> /queries/<queryId> /queries/<queryId>/user [data = byte[] value of userDn] /queries/<queryId>/system [data = byte[] value of systemName] /queries/<queryId>/queryLogic [data = byte[] value of queryLogic] /queries/<queryId>/heartbeats ``` This is done through the use of the `ActiveQueryTracker` class. In addition to managing the nodes that record information about the query, the `ActiveQueryTracker` class is also responsible for providing instances of the `QueryHeartbeat` class. A `QueryHeartbeat` is a wrapper around an ephemeral PersistentNode, provided by the Apache Curator library. As long as this node is present in Zookeeper for a particular query, the query will be considered to be active. Should the webservers fail over and the Zookeeper connection drop, these heartbeat nodes will automatically be deleted by Zookeeper. The `ActiveQueryTracker` is also responsible for providing instances of the `ActiveQuerySnapshot` class, which represent a snapshot of total active queries at a point in time that are associated with a particular user, system, or query logic. Query limit enforcement is done through the `QueryLimiter` class. Given a user, system, and query logic, it can determine if any of the following limits have been exceeded: - The max allowed concurrent queries for the user. - The max allowed concurrent queries of the query logic for the user. - The max allowed concurrent queries for the system. - The max allowed concurrent queries of the query logic for the system. Limits may be defined and customized on a per-user and per-system basis. They may also be defined for groups of query logics. The classes `UserLimitProvider`, `SystemLimitProvider`, and `QueryLogicGroupLimitProvider` are respectively responsible for identifying the best limits to enforce for a user, system, and query logic. They will be initialized in the `QueryLimiter` after providing a `QueryLimitConfiguration` instance. The following can be configured: On a system-wide basis: - The default concurrent user query limit. This applies to the total number of queries a user may run across all systems. May be overridden per user. - The default concurrent system query limit. Primarily to avoid a system getting overloaded. May be overridden per system. - The default of whether queries submitted to a system are counted towards the user's concurrent query total. This is always true. On a per-system basis: - The system name/ids the configuration targets. Regex matching is supported. - The concurrent system query limit. Overrides the system-wide value. - Whether queries submitted to the system count towards a user's concurrent query total. Overrides the system-wide value. - The concurrent system query limit for different query logic groups. Regex matching against group names is supported. on a per-user basis: - The user DN. - The user's concurrent query limit. Overrides the system-wide configuration. - The user's concurrent query limit for different query logic groups. Regex matching against group names is supported. On a per-query-logic-group basis: - The group name. - The query logics included in the group. Regex matching is supported. - The default concurrent user query limit. This applies to the total concurrent queries a user may run that originate from a query logic in the group across all systems. Given the possibilities for exact matches, partial regex matches, and wildcard regex matches, the determination of the best limit to use for any particular system or query logic is done by sorting matches into the following 'matching buckets' (in best-match priority): 1. Exact match 2. Partial regex (non-wildcard-only) 3. Wildcard-only regex and then selecting the lowest limit from the best bucket where we first found a match. Currently the `QueryLimiter` is used in `QueryExecutorBean`, along with a `QueryHeartbeatCache` instance to cache heartbeats and keep them alive when a running query is cached for retrieval later. For the purposes of this feature, a query is considered to start when an Accumulo connection is retrieved from the connection factory, and is considered to end when the connection is returned to the factory. The following error codes have been added: 412-20 - Concurrent query limit exceeded 500-164 - Error checking concurrent query limits Closes #3100
1 parent 344e5e2 commit 9c59e98

File tree

46 files changed

+5201
-21
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+5201
-21
lines changed

core/base-rest-responses/src/main/java/datawave/webservice/query/exception/DatawaveErrorCode.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ public enum DatawaveErrorCode {
169169
QUERY_PLAN_ERROR(500, 161, "Error retrieving plan for query."),
170170
QUERY_PREDICTIONS_ERROR(500, 162, "Error retrieving predictions for query."),
171171
QUERY_VALIDATION_ERROR(500, 163, "Error validating query."),
172+
CONCURRENT_QUERY_LIMIT_ERROR(500, 164, "Error checking concurrent query limits."),
172173
// 204 No Content
173174
NO_QUERIES_FOUND(204, 1, "No queries found for user."),
174175
RESULTS_NOT_SENT(204, 2, "Results not sent."),
@@ -275,7 +276,8 @@ public enum DatawaveErrorCode {
275276
CURRENT_AND_PREVIOUS_EVENT_ORDER_INVALID(412, 16, "Current event and previous event are not in chronological order"),
276277
CURRENT_AND_NEXT_EVENT_ORDER_INVALID(412, 17, "Current event and next event are not in chronological order"),
277278
FIELD_PHRASE_QUERY_NOT_INDEXED(412, 18, "Field cannot be queried as a phrase since it was not indexed as such."),
278-
NO_QUERY_VALIDATION_RULES_CONFIGURED(412, 19, "No query validation rules configured for the query logic.");
279+
NO_QUERY_VALIDATION_RULES_CONFIGURED(412, 19, "No query validation rules configured for the query logic."),
280+
CONCURRENT_QUERY_LIMIT_EXCEEDED(412, 20, "Concurrent query limit exceeded.");
279281

280282
private String message;
281283
private int httpCode;

core/utils/type-utils/src/main/java/datawave/query/parser/JavaRegexAnalyzer.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class JavaRegexAnalyzer {
2323

2424
// Types as applied to portions of the regex. We are interested in portions that
2525
// are literals and those that contain regex constructs.
26-
private enum RegexType {
26+
public enum RegexType {
2727
LITERAL(true), // a literal value
2828
ESCAPED_LITERAL(true), // an escaped literal (e.g. \[ or \.)
2929
REGEX(false), // a regex
@@ -42,7 +42,7 @@ public boolean isLiteral() {
4242
}
4343
}
4444

45-
private static class RegexPart {
45+
public static class RegexPart {
4646
// the regex is not-final to allow applyRegexCaseSensitivity
4747
public String regex;
4848
public RegexType type;
@@ -58,6 +58,18 @@ public RegexPart(String reg, RegexType typ, int nonCapt) {
5858
this(reg, typ, (nonCapt > 0));
5959
}
6060

61+
public String getRegex() {
62+
return regex;
63+
}
64+
65+
public RegexType getType() {
66+
return type;
67+
}
68+
69+
public boolean isNonCapturing() {
70+
return nonCapturing;
71+
}
72+
6173
@Override
6274
public boolean equals(Object o) {
6375
if (!(o instanceof RegexPart)) {
@@ -153,6 +165,10 @@ public String getRegex() {
153165
return getRegex(regexParts);
154166
}
155167

168+
public RegexPart[] getRegexParts() {
169+
return regexParts;
170+
}
171+
156172
public static String getRegex(RegexPart[] regexParts) {
157173
StringBuilder regex = new StringBuilder();
158174
for (RegexPart part : regexParts) {

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@
7272
<version.commons-pool>1.6</version.commons-pool>
7373
<version.commons-pool2>2.6.1</version.commons-pool2>
7474
<version.commons-text>1.11.0</version.commons-text>
75-
<version.curator>5.7.1</version.curator>
76-
<version.curator.test>5.7.1</version.curator.test>
75+
<version.curator>5.9.0</version.curator>
76+
<version.curator.test>5.9.0</version.curator.test>
7777
<version.datawave.accumulo-api>4.0.0</version.datawave.accumulo-api>
7878
<version.datawave.audit-api>4.0.0</version.datawave.audit-api>
7979
<version.datawave.authorization-api>4.0.0</version.datawave.authorization-api>

web-services/deploy/configuration/src/main/resources/beanRefContext.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
<value>classpath*:datawave/query/QueryExpiration.xml</value>
1919
<value>classpath*:datawave/query/QueryMetricsWriter.xml</value>
2020
<value>classpath*:datawave/query/*QueryLogicFactory.xml</value>
21+
<value>classpath*:datawave/query/QueryLimiterFactory.xml</value>
2122
<value>classpath*:datawave/query/CachedResults*.xml</value>
2223
<value>classpath*:datawave/mapreduce/MapReduceJobs.xml</value>
2324
<value>classpath*:datawave/modification/ModificationServices.xml</value>
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
3+
<beans xmlns="http://www.springframework.org/schema/beans"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
5+
xmlns:util="http://www.springframework.org/schema/util"
6+
xmlns:context="http://www.springframework.org/schema/context"
7+
xsi:schemaLocation="http://www.springframework.org/schema/beans
8+
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
9+
http://www.springframework.org/schema/context
10+
http://www.springframework.org/schema/context/spring-context-4.0.xsd
11+
http://www.springframework.org/schema/util
12+
http://www.springframework.org/schema/util/spring-util-4.0.xsd">
13+
14+
<!-- Configuration for the QueryLimitProvider. -->
15+
<bean id="queryLimitProviderConfig" class="datawave.webservice.query.limit.QueryLimitConfiguration">
16+
<property name="defaultSystemQueryLimit" value="20000"/>
17+
<property name="defaultUserQueryLimit" value="100"/>
18+
</bean>
19+
20+
<!-- Configuration for the QueryLimiter. -->
21+
<bean id="queryLimiter" class="datawave.webservice.query.limit.QueryLimiter" >
22+
<property name="zookeeperConfig" value="${ivarator.zookeeper.hosts}"/>
23+
</bean>
24+
</beans>

web-services/deploy/spring-framework-integration/src/test/resources/springFrameworkBeanRefContext.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
<value>classpath*:datawave/security/PrincipalFactory.xml</value>
1616
<value>classpath*:datawave/query/QueryExpiration.xml</value>
1717
<value>classpath*:datawave/query/*QueryLogicFactory.xml</value>
18+
<value>classpath*:datawave/query/QueryLimiterFactory.xml</value>
1819
<value>classpath*:datawave/query/CachedResults*.xml</value>
1920
<value>classpath*:datawave/mapreduce/MapReduceJobs.xml</value>
2021
</list>

web-services/query/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,11 +250,26 @@
250250
<artifactId>datawave-in-memory-accumulo</artifactId>
251251
<scope>test</scope>
252252
</dependency>
253+
<dependency>
254+
<groupId>org.apache.curator</groupId>
255+
<artifactId>curator-test</artifactId>
256+
<scope>test</scope>
257+
</dependency>
258+
<dependency>
259+
<groupId>org.assertj</groupId>
260+
<artifactId>assertj-core</artifactId>
261+
<scope>test</scope>
262+
</dependency>
253263
<dependency>
254264
<groupId>org.javassist</groupId>
255265
<artifactId>javassist</artifactId>
256266
<scope>test</scope>
257267
</dependency>
268+
<dependency>
269+
<groupId>org.springframework</groupId>
270+
<artifactId>spring-test</artifactId>
271+
<scope>test</scope>
272+
</dependency>
258273
</dependencies>
259274
<build>
260275
<finalName>${project.artifactId}</finalName>
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package datawave.webservice.query.cache;
2+
3+
import java.io.IOException;
4+
5+
import javax.annotation.PostConstruct;
6+
import javax.inject.Singleton;
7+
8+
import org.apache.log4j.Logger;
9+
10+
import com.github.benmanes.caffeine.cache.Cache;
11+
import com.github.benmanes.caffeine.cache.Caffeine;
12+
13+
import datawave.webservice.query.limit.QueryHeartbeat;
14+
15+
/**
16+
* A cache for storing query heartbeats of running queries.
17+
*/
18+
@Singleton
19+
public class QueryHeartbeatCache {
20+
21+
private static final Logger log = Logger.getLogger(QueryHeartbeatCache.class);
22+
23+
private Cache<String,QueryHeartbeat> cache;
24+
25+
@PostConstruct
26+
public void init() {
27+
cache = Caffeine.newBuilder().build();
28+
}
29+
30+
public void put(String queryId, QueryHeartbeat heartbeat) {
31+
cache.put(queryId, heartbeat);
32+
}
33+
34+
public QueryHeartbeat get(String queryId) {
35+
return cache.getIfPresent(queryId);
36+
}
37+
38+
public void stopAndRemoveHeartbeat(String queryId) {
39+
QueryHeartbeat heartbeat = cache.asMap().remove(queryId);
40+
if (heartbeat != null) {
41+
try {
42+
heartbeat.stop();
43+
} catch (IOException e) {
44+
log.error("Error stopping query heartbeat", e);
45+
}
46+
}
47+
}
48+
49+
public void clear() {
50+
cache.asMap().clear();
51+
}
52+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package datawave.webservice.query.limit;
2+
3+
/**
4+
* Represents an exception that occurred when interacting with the {@link ActiveQueryTracker}.
5+
*/
6+
public class ActiveQueryException extends RuntimeException {
7+
8+
public ActiveQueryException() {
9+
super();
10+
}
11+
12+
public ActiveQueryException(String message) {
13+
super(message);
14+
}
15+
16+
public ActiveQueryException(String message, Throwable cause) {
17+
super(message, cause);
18+
}
19+
20+
public ActiveQueryException(Throwable cause) {
21+
super(cause);
22+
}
23+
24+
protected ActiveQueryException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
25+
super(message, cause, enableSuppression, writableStackTrace);
26+
}
27+
}

0 commit comments

Comments
 (0)