Skip to content

Add abilty to yield in Ivarators, AndIterator, OrIterator and return metrics before yield (#704) #2042

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 15, 2025
Merged
5 changes: 4 additions & 1 deletion docker/config/application-query.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ warehouse:
ivaratorMaxOpenFiles: 100
ivaratorCacheScanPersistThreshold: 100000
ivaratorCacheScanTimeoutMinutes: 60
yieldThresholdMs: 188400
modelName: 'DATAWAVE'
edgeModelName: 'DATAWAVE_EDGE'

Expand Down Expand Up @@ -205,6 +206,7 @@ datawave:
ivaratorMaxOpenFiles: ${warehouse.defaults.ivaratorMaxOpenFiles}
ivaratorCacheScanPersistThreshold: ${warehouse.defaults.ivaratorCacheScanPersistThreshold}
ivaratorCacheScanTimeoutMinutes: ${warehouse.defaults.ivaratorCacheScanTimeoutMinutes}
yieldThresholdMs: ${warehouse.defaults.yieldThresholdMs}
eventQueryDataDecoratorTransformer:
requestedDecorators:
- "CSV"
Expand All @@ -226,6 +228,7 @@ datawave:
TOKENIZED-LUCENE: "TokenizedLuceneToJexlQueryParser"
sendTimingToStatsd: false
collectQueryMetrics: true
collectTimingDetails: true
logTimingDetails: true
statsdHost: ${warehouse.statsd.host}
statsdPort: ${warehouse.statsd.port}
Expand Down Expand Up @@ -571,4 +574,4 @@ datawave:
hazelcast:
client:
clusterName: ${QUERY_CACHE:cache}
clusterName: ${QUERY_CACHE:cache}
clusterName: ${QUERY_CACHE:cache}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
<property name="reverseIndexTableName" value="${datawave.query.logic.logics.BaseEventQuery.reverseIndexTableName}" />
<property name="maxResults" value="${datawave.query.logic.logics.BaseEventQuery.maxResults}" />
<property name="queryThreads" value="${datawave.query.logic.logics.BaseEventQuery.queryThreads}" />
<property name="yieldThresholdMs" value="${datawave.query.logic.logics.BaseEventQuery.yieldThresholdMs}" />
<property name="collectTimingDetails" value="${datawave.query.logic.logics.BaseEventQuery.collectTimingDetails}" />
<property name="indexLookupThreads" value="${datawave.query.logic.logics.BaseEventQuery.indexLookupThreads}" />
<property name="dateIndexThreads" value="${datawave.query.logic.logics.BaseEventQuery.dateIndexThreads}" />
<property name="fullTableScanEnabled" value="${datawave.query.logic.logics.BaseEventQuery.fullTableScanEnabled}" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ warehouse:
ivaratorMaxOpenFiles: 100
ivaratorCacheScanPersistThreshold: 100000
ivaratorCacheScanTimeoutMinutes: 60
yieldThresholdMs: 188400
modelName: 'DATAWAVE'
edgeModelName: 'DATAWAVE_EDGE'

Expand Down Expand Up @@ -212,6 +213,7 @@ datawave:
ivaratorMaxOpenFiles: ${warehouse.defaults.ivaratorMaxOpenFiles}
ivaratorCacheScanPersistThreshold: ${warehouse.defaults.ivaratorCacheScanPersistThreshold}
ivaratorCacheScanTimeoutMinutes: ${warehouse.defaults.ivaratorCacheScanTimeoutMinutes}
yieldThresholdMs: ${warehouse.defaults.yieldThresholdMs}
eventQueryDataDecoratorTransformer:
requestedDecorators:
- "CSV"
Expand All @@ -233,6 +235,7 @@ datawave:
TOKENIZED-LUCENE: "TokenizedLuceneToJexlQueryParser"
sendTimingToStatsd: false
collectQueryMetrics: true
collectTimingDetails: true
logTimingDetails: false
statsdHost: ${warehouse.statsd.host}
statsdPort: ${warehouse.statsd.port}
Expand Down
4 changes: 4 additions & 0 deletions properties/default.properties
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ enable.index.only.filter.functions=false
query.tld.collapse.uids=false
#fields generated internally at query evaluation time
evaluation.only.fields=
# Time to wait before yielding in the QueryIterator
query.iterator.yield.threshold.ms=188400
############################
#
# Accumulo Connection Pools
Expand Down Expand Up @@ -461,6 +463,8 @@ beq.evaluationPipelines=16
beq.pipelineCachedResults=16
# Are full scans enabled for the base event query?
beq.fullTableScanEnabled=false
# Collect timing details (sources, seeks, nexts, yields, etc) in QueryIterator and add to query metrics
beq.collectTimingDetails=true

# Threads used for various query logics
shard.query.threads=100
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
}

@Override
public String toString() {
protected String toStringImpl(boolean includeQueryId) {
StringBuilder builder = new StringBuilder();
builder.append("DatawaveFieldIndexFilterIteratorJexl (").append(queryId).append(") fName=").append(getFieldName()).append(", filter=").append(filter)
.append(", lowerBound=").append(getFieldValue()).append(", lowerInclusive=").append(lowerInclusive).append(", upperBound=")
.append(upperBound).append(", upperInclusive=").append(upperInclusive).append(", negated=").append(isNegated()).append("}");
builder.append(getClass().getSimpleName());
if (includeQueryId) {
builder.append(" (").append(queryId).append(")");
}
builder.append(" {fName=").append(getFieldName()).append(", filter=").append(filter).append(", lowerBound=").append(getFieldValue())
.append(", lowerInclusive=").append(lowerInclusive).append(", upperBound=").append(upperBound).append(", upperInclusive=")
.append(upperInclusive).append(", negated=").append(isNegated()).append("}");

return builder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,17 @@ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
}

@Override
public String toString() {
protected String toStringImpl(boolean includeQueryId) {
StringBuilder builder = new StringBuilder();
if (fst != null)
if (fst != null) {
builder.append("DatawaveFieldIndexFSTIteratorJexl");
else
} else {
builder.append("DatawaveFieldIndexListIteratorJexl");
builder.append(" (").append(queryId).append(") {fName=").append(getFieldName()).append(", negated=").append(isNegated()).append("}");
}
if (includeQueryId) {
builder.append(" (").append(queryId).append(")");
}
builder.append(" {fName=").append(getFieldName()).append(", negated=").append(isNegated()).append("}");
return builder.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,15 @@ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
}

@Override
public String toString() {
protected String toStringImpl(boolean includeQueryId) {
StringBuilder builder = new StringBuilder();
builder.append("DatawaveFieldIndexRangeIteratorJexl (").append(queryId).append(") fName=").append(getFieldName()).append(", lowerBound=")
.append(getFieldValue()).append(", lowerInclusive=").append(lowerInclusive).append(", upperBound=").append(upperBound)
.append(", upperInclusive=").append(upperInclusive).append(", negated=").append(isNegated()).append("}");
builder.append(getClass().getSimpleName());
if (includeQueryId) {
builder.append(" (").append(queryId).append(")");
}
builder.append(" {fName=").append(getFieldName()).append(", lowerBound=").append(getFieldValue()).append(", lowerInclusive=").append(lowerInclusive)
.append(", upperBound=").append(upperBound).append(", upperInclusive=").append(upperInclusive).append(", negated=").append(isNegated())
.append("}");
return builder.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,14 @@ public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
}

@Override
public String toString() {
protected String toStringImpl(boolean includeQueryId) {
StringBuilder builder = new StringBuilder();
builder.append("DatawaveFieldIndexRegexIteratorJexl (").append(queryId).append(") fName=").append(getFieldName()).append(", fValue=")
.append(getFieldValue()).append(", regex=").append(regex).append(", negated=").append(isNegated()).append("}");
builder.append(getClass().getSimpleName());
if (includeQueryId) {
builder.append(" (").append(queryId).append(")");
}
builder.append(" {fName=").append(getFieldName()).append(", fValue=").append(getFieldValue()).append(", regex=").append(regex).append(", negated=")
.append(isNegated()).append("}");
return builder.toString();
}

Expand Down Expand Up @@ -125,15 +129,11 @@ protected List<Range> buildBoundingFiRanges(Text rowId, Text fiName, Text fieldV
*/
@Override
protected boolean matches(Key k) throws IOException {
boolean matches = false;
String colq = k.getColumnQualifier().toString();

// search backwards for the null bytes to expose the value in value\0datatype\0UID
int index = colq.lastIndexOf('\0');
index = colq.lastIndexOf('\0', index - 1);
matches = (pattern.get().matcher(colq.substring(0, index)).matches());

return matches;
return (pattern.get().matcher(colq.substring(0, index)).matches());
}

}
Loading