diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryIterator.java b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryIterator.java index a72b0dc5f66..ac9c1d9458b 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryIterator.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryIterator.java @@ -380,7 +380,9 @@ public void seek(Range range, Collection columnFamilies, boolean i // preserve the original range for use with the Final Document tracking iterator because it is placed after the ResultCountingIterator // so the FinalDocumentTracking iterator needs the start key with the count already appended this.originalRange = range; - this.waitWindowObserver.start(range, yieldThresholdMs); + if (WaitWindowObserver.getNumYields(range.getStartKey(), collectTimingDetails) < maxYields) { + this.waitWindowObserver.start(range, yieldThresholdMs); + } getActiveQueryLog().get(getQueryId()).beginCall(this.originalRange, ActiveQuery.CallType.SEEK); ActiveQueryLog.getInstance().get(getQueryId()).beginCall(this.originalRange, ActiveQuery.CallType.SEEK); diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java index bad9ebca615..cd98bbba37a 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/QueryOptions.java @@ -145,6 +145,7 @@ public class QueryOptions implements OptionDescriber { public static final String START_TIME = "start.time"; public static final String END_TIME = "end.time"; public static final String YIELD_THRESHOLD_MS = "yield.threshold.ms"; + public static final String MAX_YIELDS = "max.yields"; public static final String FILTER_MASKED_VALUES = "filter.masked.values"; public static final String INCLUDE_DATATYPE = "include.datatype"; @@ -355,6 +356,7 @@ public class QueryOptions implements OptionDescriber { protected long maxIvaratorResults = -1; protected long yieldThresholdMs = Long.MAX_VALUE; + protected long maxYields = 10; protected Predicate fieldIndexKeyDataTypeFilter = KeyIdentity.Function; protected Predicate eventEntryKeyDataTypeFilter = KeyIdentity.Function; @@ -486,6 +488,7 @@ public void deepCopy(QueryOptions other) { this.maxIvaratorResults = other.maxIvaratorResults; this.yieldThresholdMs = other.yieldThresholdMs; + this.maxYields = other.maxYields; this.compressResults = other.compressResults; this.limitFieldsMap = other.limitFieldsMap; @@ -1162,6 +1165,7 @@ public IteratorOptions describeOptions() { " The maximum number of sources to use for ivarators across all ivarated terms within the query. Note the thread pool size is controlled via an accumulo property."); options.put(YIELD_THRESHOLD_MS, "The threshold in milliseconds that the query iterator will evaluate consecutive documents to false before yielding the scan."); + options.put(MAX_YIELDS, "The maximum number of times to yield on the same startKey without making progress."); options.put(COMPRESS_SERVER_SIDE_RESULTS, "GZIP compress the serialized Documents before returning to the webserver"); options.put(MAX_EVALUATION_PIPELINES, "The max number of evaluation pipelines"); options.put(SERIAL_EVALUATION_PIPELINE, "Forces us to use the serial pipeline. Allows us to still have a single thread for evaluation"); @@ -1616,6 +1620,10 @@ public boolean validateOptions(Map options) { this.setYieldThresholdMs(Long.parseLong(options.get(YIELD_THRESHOLD_MS))); } + if (options.containsKey(MAX_YIELDS)) { + this.setMaxYields(Long.parseLong(options.get(MAX_YIELDS))); + } + if (options.containsKey(COMPRESS_SERVER_SIDE_RESULTS)) { this.setCompressResults(Boolean.parseBoolean(options.get(COMPRESS_SERVER_SIDE_RESULTS))); } @@ -2075,6 +2083,14 @@ public void setYieldThresholdMs(long yieldThresholdMs) { this.yieldThresholdMs = yieldThresholdMs; } + public long getMaxYields() { + return maxYields; + } + + public void setMaxYields(long maxYields) { + this.maxYields = maxYields; + } + public int getFiFieldSeek() { return fiFieldSeek; } diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/waitwindow/WaitWindowObserver.java b/warehouse/query-core/src/main/java/datawave/query/iterator/waitwindow/WaitWindowObserver.java index 24771c7a043..ae148589263 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/waitwindow/WaitWindowObserver.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/waitwindow/WaitWindowObserver.java @@ -61,7 +61,7 @@ public class WaitWindowObserver { // When the wait window is over. Set during the initial seek protected long endOfWaitWindow; // Remaining time in the wait window. Updated by the timerTask - protected AtomicLong remainingTimeMs = new AtomicLong(); + protected AtomicLong remainingTimeMs = new AtomicLong(Long.MAX_VALUE); protected TimerTask timerTask = null; // How often the timerTask gets run protected long checkPeriod = 50; @@ -433,6 +433,27 @@ static public Text removeMarkers(Text text) { } } + static public int getNumYields(Key startKey, boolean collectTimingDetails) { + int numNulls = 0; + if (startKey != null) { + String s = null; + if (WaitWindowObserver.hasMarker(startKey.getColumnFamily())) { + s = startKey.getColumnFamily().toString(); + } else if (WaitWindowObserver.hasMarker(startKey.getColumnQualifier())) { + s = startKey.getColumnQualifier().toString(); + } + if (s != null && s.endsWith("\0")) { + for (int x = s.length() - 1; x > 0 && s.charAt(x) == '\0'; x--) { + numNulls++; + } + } + if (collectTimingDetails) { + numNulls = numNulls / 2; + } + } + return numNulls; + } + /* * Convenience method to produce a document containing a WAIT_WINDOW_OVERRUN attribute, This document gets returned before a yield when * collectTimingDetails=true so that the FinalDocumenTrackingIterator can add timing details and metrics befoer returning the Document.