From b6b20304adb78e8c5696c9c5f3969cdac0d2bdd3 Mon Sep 17 00:00:00 2001 From: Ivan Bella <347158+ivakegg@users.noreply.github.com> Date: Fri, 31 Oct 2025 18:17:49 +0000 Subject: [PATCH 1/4] Updated the planning to be concurrent for index holes in the date partitioned query planner --- .../planner/DatePartitionedQueryPlanner.java | 79 ++++++++++++++++--- 1 file changed, 66 insertions(+), 13 deletions(-) diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java index c16154b9422..fbcecfcd235 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java @@ -17,6 +17,15 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.accumulo.core.client.TableNotFoundException; @@ -27,6 +36,7 @@ import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.log4j.Logger; import datawave.core.common.logging.ThreadConfigurableLogger; @@ -35,6 +45,7 @@ import datawave.microservice.query.Query; import datawave.query.CloseableIterable; import datawave.query.config.ShardQueryConfiguration; +import datawave.query.exceptions.DatawaveAsyncOperationException; import datawave.query.exceptions.DatawaveFatalQueryException; import datawave.query.exceptions.DatawaveQueryException; import datawave.query.index.lookup.UidIntersector; @@ -67,6 +78,10 @@ public class DatePartitionedQueryPlanner extends QueryPlanner implements Cloneab private DefaultQueryPlanner queryPlanner; private String initialPlan; private String plannedScript; + /** + * The max number of concurrent planning threads + */ + private int maxConcurrentPlanningThreads = 10; // handles boilerplate operations that surround a visitor's execution (e.g., timers, logging, validating) private final TimedVisitorManager visitorManager = new TimedVisitorManager(); @@ -256,6 +271,14 @@ public UidIntersector getUidIntersector() { return this.queryPlanner.getUidIntersector(); } + public void setMaxConcurrentPlanningThreads(int maxConcurrentPlanningThreads) { + this.maxConcurrentPlanningThreads = maxConcurrentPlanningThreads; + } + + public int getMaxConcurrentPlanningThreads() { + return this.maxConcurrentPlanningThreads; + } + /** * Not supported for {@link DatePartitionedQueryPlanner} and will result in an {@link UnsupportedOperationException}. * @@ -291,6 +314,17 @@ public CloseableIterable process(GenericQueryConfiguration genericCon throws DatawaveQueryException { visitorManager.setDebugEnabled(log.isDebugEnabled()); + // start up an executor first to give the threads a chance to start + final AtomicInteger threadCounter = new AtomicInteger(1); + ThreadFactory threadFactory = new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "DatePartitionedQueryPlanner " + threadCounter.getAndIncrement()); + } + }; + ExecutorService executor = new ThreadPoolExecutor(1, getMaxConcurrentPlanningThreads(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), + threadFactory); + // Validate the config type. if (!ShardQueryConfiguration.class.isAssignableFrom(genericConfig.getClass())) { throw new ClassCastException("Config must be an instance of " + ShardQueryConfiguration.class.getSimpleName()); @@ -332,29 +366,48 @@ public CloseableIterable process(GenericQueryConfiguration genericCon SortedMap,Set> dateRanges = getSubQueryDateRanges(planningConfig); DatePartitionedQueryIterable results = new DatePartitionedQueryIterable(); - List exceptions = new ArrayList<>(); + List exceptions = new ArrayList<>(); + List,Set>,ShardQueryConfiguration,Future>>> futures = new ArrayList<>(); - // TODO: Lets attempt to process these pieces concurrently for (Map.Entry,Set> dateRange : dateRanges.entrySet()) { + // Get the configuration with an updated query (pushed down unindexed fields) + final ShardQueryConfiguration configCopy = getUpdatedConfig(planningConfig, dateRange.getKey(), dateRange.getValue()); + futures.add(Triple.of(dateRange, configCopy, executor.submit(new Callable>() { + @Override + public CloseableIterable call() { + try { + // Create a copy of the original default query planner, and process the query with the new date range. + DefaultQueryPlanner subPlan = DatePartitionedQueryPlanner.this.queryPlanner.clone(); + + // Get the range stream for the new date range and query + return subPlan.reprocess(configCopy, configCopy.getQuery(), scannerFactory); + } catch (Exception e) { + throw new DatawaveAsyncOperationException(e); + } + } + }))); + } + + for (Triple,Set>,ShardQueryConfiguration,Future>> future : futures) { + Map.Entry,Set> dateRange = future.getLeft(); String subBeginDate = dateFormat.format(dateRange.getKey().getLeft()); String subEndDate = dateFormat.format(dateRange.getKey().getRight()); - - // Get the configuration with an updated query (pushed down unindexed fields) - ShardQueryConfiguration configCopy = getUpdatedConfig(planningConfig, dateRange.getKey(), dateRange.getValue()); + ShardQueryConfiguration configCopy = future.getMiddle(); try { - // Create a copy of the original default query planner, and process the query with the new date range. - DefaultQueryPlanner subPlan = this.queryPlanner.clone(); - - // Get the range stream for the new date range and query - results.addIterable(subPlan.reprocess(configCopy, configCopy.getQuery(), scannerFactory)); - + results.addIterable(future.getRight().get()); if (log.isDebugEnabled()) { log.debug("Query string for config of sub-plan against date range (" + subBeginDate + "-" + subEndDate + ") with unindexed fields " + dateRange.getValue() + ": " + configCopy.getQueryString()); } - } catch (DatawaveQueryException e) { + } catch (ExecutionException e) { log.warn("Exception occurred when processing sub-plan against date range (" + subBeginDate + "-" + subEndDate + ")", e); + if (e.getCause() instanceof DatawaveAsyncOperationException) { + exceptions.add(e.getCause().getCause()); + } else { + exceptions.add(e.getCause()); + } + } catch (InterruptedException e) { exceptions.add(e); } finally { // append the new timers for logging at the end @@ -377,7 +430,7 @@ public CloseableIterable process(GenericQueryConfiguration genericCon throw (DatawaveQueryException) (exceptions.get(0)); } else { DatawaveFatalQueryException e = new DatawaveFatalQueryException("Query failed creation"); - for (Exception reason : exceptions) { + for (Throwable reason : exceptions) { e.addSuppressed(reason); } throw e; From c4a4f4706f246751b92c4d1db854cce813895b39 Mon Sep 17 00:00:00 2001 From: Ivan Bella <347158+ivakegg@users.noreply.github.com> Date: Thu, 6 Nov 2025 20:54:28 +0000 Subject: [PATCH 2/4] Review updates --- .../planner/DatePartitionedQueryPlanner.java | 284 +++++++++++------- .../java/datawave/query/QueryPlanTest.java | 9 +- 2 files changed, 186 insertions(+), 107 deletions(-) diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java index fbcecfcd235..17d1b0c4c28 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java @@ -19,7 +19,6 @@ import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; @@ -36,7 +35,6 @@ import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang3.tuple.Pair; -import org.apache.commons.lang3.tuple.Triple; import org.apache.log4j.Logger; import datawave.core.common.logging.ThreadConfigurableLogger; @@ -314,133 +312,159 @@ public CloseableIterable process(GenericQueryConfiguration genericCon throws DatawaveQueryException { visitorManager.setDebugEnabled(log.isDebugEnabled()); - // start up an executor first to give the threads a chance to start + DatePartitionedQueryIterable results = new DatePartitionedQueryIterable(); + + // start up an executor with at least one core thread started final AtomicInteger threadCounter = new AtomicInteger(1); ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { - return new Thread(r, "DatePartitionedQueryPlanner " + threadCounter.getAndIncrement()); + return new Thread(r, "DatePartitionedQueryPlanner thread #" + threadCounter.getAndIncrement() + " for " + settings.getId()); } }; - ExecutorService executor = new ThreadPoolExecutor(1, getMaxConcurrentPlanningThreads(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), + ThreadPoolExecutor executor = new ThreadPoolExecutor(1, getMaxConcurrentPlanningThreads(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory); - // Validate the config type. - if (!ShardQueryConfiguration.class.isAssignableFrom(genericConfig.getClass())) { - throw new ClassCastException("Config must be an instance of " + ShardQueryConfiguration.class.getSimpleName()); - } - - // Reset the planned script. - this.plannedScript = null; - this.plans.clear(); - - if (log.isDebugEnabled()) { - log.debug("Federated query: " + query); - } - - ShardQueryConfiguration planningConfig = (ShardQueryConfiguration) genericConfig; - if (log.isDebugEnabled()) { - log.debug("Query's original date range " + dateFormat.format(planningConfig.getBeginDate()) + "-" + dateFormat.format(planningConfig.getEndDate())); - } + try { + // Validate the config type. + if (!ShardQueryConfiguration.class.isAssignableFrom(genericConfig.getClass())) { + throw new ClassCastException("Config must be an instance of " + ShardQueryConfiguration.class.getSimpleName()); + } - // Let's do the planning with the delegate planner first to ensure we have a final date range - // and appropriately expanded unfielded terms etc. - boolean generatePlanOnly = planningConfig.isGeneratePlanOnly(); - planningConfig.setGeneratePlanOnly(true); - boolean expandValues = planningConfig.isExpandValues(); - // we do NOT want to expand any values yet as they may not be dependable - // note we are expanding unfielded values (different flag) - planningConfig.setExpandValues(false); - boolean deferPushdownPullup = planningConfig.isDeferPushdownPullup(); - planningConfig.setDeferPushdownPullup(true); + // Reset the planned script. + this.plannedScript = null; + this.plans.clear(); - DefaultQueryPlanner initialPlanner = this.queryPlanner.clone(); - initialPlanner.process(planningConfig, query, settings, scannerFactory); - this.initialPlan = initialPlanner.plannedScript; + if (log.isDebugEnabled()) { + log.debug("Federated query: " + query); + } - planningConfig.setGeneratePlanOnly(generatePlanOnly); - planningConfig.setExpandValues(expandValues); - planningConfig.setDeferPushdownPullup(deferPushdownPullup); + ShardQueryConfiguration planningConfig = (ShardQueryConfiguration) genericConfig; + if (log.isDebugEnabled()) { + log.debug("Query's original date range " + dateFormat.format(planningConfig.getBeginDate()) + "-" + + dateFormat.format(planningConfig.getEndDate())); + } - // Get the relevant date ranges and the sets of fields that have gaps in those ranges - SortedMap,Set> dateRanges = getSubQueryDateRanges(planningConfig); + // Let's do the planning with the delegate planner first to ensure we have a final date range + // and appropriately expanded unfielded terms etc. + boolean generatePlanOnly = planningConfig.isGeneratePlanOnly(); + planningConfig.setGeneratePlanOnly(true); + boolean expandValues = planningConfig.isExpandValues(); + // we do NOT want to expand any values yet as they may not be dependable + // note we are expanding unfielded values (different flag) + planningConfig.setExpandValues(false); + boolean deferPushdownPullup = planningConfig.isDeferPushdownPullup(); + planningConfig.setDeferPushdownPullup(true); + + // now let's do the initial planning + DefaultQueryPlanner initialPlanner = this.queryPlanner.clone(); + initialPlanner.process(planningConfig, query, settings, scannerFactory); + this.initialPlan = initialPlanner.plannedScript; + + // and reset the expansion flags to what we had previously + planningConfig.setGeneratePlanOnly(generatePlanOnly); + planningConfig.setExpandValues(expandValues); + planningConfig.setDeferPushdownPullup(deferPushdownPullup); + + // Get the relevant date ranges and the sets of fields that have gaps in those ranges + SortedMap,Set> dateRanges = getSubQueryDateRanges(planningConfig); + + // Get those threads fired up + int threadsToUse = Math.min(dateRanges.size(), getMaxConcurrentPlanningThreads()); + executor.setCorePoolSize(threadsToUse); + + // Now startup a planner for each date range + List futures = new ArrayList<>(); + for (Map.Entry,Set> dateRange : dateRanges.entrySet()) { + SubPlanCallable subPlan = new SubPlanCallable(this.queryPlanner, planningConfig, dateRange, scannerFactory); + subPlan.setFuture(executor.submit(subPlan)); + futures.add(subPlan); + } - DatePartitionedQueryIterable results = new DatePartitionedQueryIterable(); - List exceptions = new ArrayList<>(); - List,Set>,ShardQueryConfiguration,Future>>> futures = new ArrayList<>(); - - for (Map.Entry,Set> dateRange : dateRanges.entrySet()) { - // Get the configuration with an updated query (pushed down unindexed fields) - final ShardQueryConfiguration configCopy = getUpdatedConfig(planningConfig, dateRange.getKey(), dateRange.getValue()); - futures.add(Triple.of(dateRange, configCopy, executor.submit(new Callable>() { - @Override - public CloseableIterable call() { - try { - // Create a copy of the original default query planner, and process the query with the new date range. - DefaultQueryPlanner subPlan = DatePartitionedQueryPlanner.this.queryPlanner.clone(); - - // Get the range stream for the new date range and query - return subPlan.reprocess(configCopy, configCopy.getQuery(), scannerFactory); - } catch (Exception e) { - throw new DatawaveAsyncOperationException(e); + // and process the results of each planner, gathering the stream of ranges into one iterable + List exceptions = new ArrayList<>(); + for (SubPlanCallable future : futures) { + Map.Entry,Set> dateRange = future.getDateRange(); + String subBeginDate = dateFormat.format(dateRange.getKey().getLeft()); + String subEndDate = dateFormat.format(dateRange.getKey().getRight()); + + try { + results.addIterable(future.getFuture().get()); + if (log.isDebugEnabled()) { + log.debug("Query string for config of sub-plan against date range (" + subBeginDate + "-" + subEndDate + ") with unindexed fields " + + dateRange.getValue() + ": " + future.getSubPlanConfig().getQueryString()); } + } catch (ExecutionException e) { + String msg = "Exception occurred when processing sub-plan against date range (" + subBeginDate + "-" + subEndDate + ")"; + log.warn(msg, e); + exceptions.add(new DatawaveQueryException(msg, e.getCause())); + } catch (InterruptedException e) { + String msg = "Interrupted when processing sub-plan against date range (" + subBeginDate + "-" + subEndDate + ")"; + exceptions.add(new DatawaveQueryException(msg, e)); + } finally { + ShardQueryConfiguration subPlanConfig = future.getSubPlanConfig(); + + // append the new timers for logging at the end + planningConfig.appendTimers(subPlanConfig.getTimers()); + + // Add to the set of plans + plans.add(subPlanConfig.getQueryString()); + + // Update the planned script. + updatePlannedScript(); + planningConfig.setQueryString(plannedScript); } - }))); - } - - for (Triple,Set>,ShardQueryConfiguration,Future>> future : futures) { - Map.Entry,Set> dateRange = future.getLeft(); - String subBeginDate = dateFormat.format(dateRange.getKey().getLeft()); - String subEndDate = dateFormat.format(dateRange.getKey().getRight()); - ShardQueryConfiguration configCopy = future.getMiddle(); + } - try { - results.addIterable(future.getRight().get()); - if (log.isDebugEnabled()) { - log.debug("Query string for config of sub-plan against date range (" + subBeginDate + "-" + subEndDate + ") with unindexed fields " - + dateRange.getValue() + ": " + configCopy.getQueryString()); - } - } catch (ExecutionException e) { - log.warn("Exception occurred when processing sub-plan against date range (" + subBeginDate + "-" + subEndDate + ")", e); - if (e.getCause() instanceof DatawaveAsyncOperationException) { - exceptions.add(e.getCause().getCause()); + // if every plan failed, then pass an exception up + if (exceptions.size() == dateRanges.size()) { + if (exceptions.size() == 1) { + Throwable e = unwrapException(exceptions.get(0)); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw (DatawaveQueryException) e; + } } else { - exceptions.add(e.getCause()); + DatawaveFatalQueryException e = new DatawaveFatalQueryException("Query failed creation for " + planningConfig.getQuery().getId()); + for (Throwable reason : exceptions) { + e.addSuppressed(reason); + } + throw e; } - } catch (InterruptedException e) { - exceptions.add(e); - } finally { - // append the new timers for logging at the end - planningConfig.appendTimers(configCopy.getTimers()); - - // Add to the set of plans - plans.add(configCopy.getQueryString()); - - // Update the planned script. - updatePlannedScript(); - planningConfig.setQueryString(plannedScript); } - } - // if every plan failed, then pass an exception up - if (exceptions.size() == dateRanges.size()) { - if (exceptions.size() == 1 && exceptions.get(0) instanceof RuntimeException) { - throw (RuntimeException) (exceptions.get(0)); - } else if (exceptions.size() == 1 && exceptions.get(0) instanceof DatawaveQueryException) { - throw (DatawaveQueryException) (exceptions.get(0)); - } else { - DatawaveFatalQueryException e = new DatawaveFatalQueryException("Query failed creation"); - for (Throwable reason : exceptions) { - e.addSuppressed(reason); - } - throw e; - } + } finally { + executor.shutdown(); } // reset the iterator to be our federated iterator return results; } + /** + * In the case were we have only 1 exception, we want to throw the underlying exception to be backward compatible which will be the lowest level + * DatawaveQueryException or first DatawaveFatalQueryException + * + * @param e + * The exception being unwrapped + * @return DatawaveQueryException or DatawaveFatalQueryException + */ + private Throwable unwrapException(DatawaveQueryException e) { + Throwable toThrow = e; + Throwable test = e; + while (test.getCause() != null && test.getCause() != test) { + test = test.getCause(); + if (test instanceof DatawaveFatalQueryException) { + return test; + } + if (test instanceof DatawaveQueryException) { + toThrow = test; + } + } + return toThrow; + } + /** * Update the planned script to represent a concatenation of the planned scripts from all sub-plans of the most recently executed call to * {@link #process(GenericQueryConfiguration, String, Query, ScannerFactory)}. @@ -463,6 +487,60 @@ private void updatePlannedScript() { } } + /** + * This callable will hold the state of one of the plan and the future used to generate that plan. + */ + private class SubPlanCallable implements Callable> { + private final ShardQueryConfiguration planningConfig; + private final Map.Entry,Set> dateRange; + private final DefaultQueryPlanner basePlanner; + private final ScannerFactory scannerFactory; + + private ShardQueryConfiguration subPlanConfig; + private Future> future; + + public SubPlanCallable(DefaultQueryPlanner planner, ShardQueryConfiguration planningConfig, Map.Entry,Set> dateRange, + ScannerFactory scannerFactory) { + this.basePlanner = planner; + this.planningConfig = planningConfig; + this.dateRange = dateRange; + this.scannerFactory = scannerFactory; + } + + @Override + public CloseableIterable call() throws Exception { + try { + // Get an updated configuration with the new date range and query tree + this.subPlanConfig = getUpdatedConfig(planningConfig, dateRange.getKey(), dateRange.getValue()); + + // Create a copy of the original default query planner, and process the query with the new date range. + DefaultQueryPlanner subPlan = basePlanner.clone(); + + // Get the range stream for the new date range and query + return subPlan.reprocess(subPlanConfig, subPlanConfig.getQuery(), scannerFactory); + } catch (Exception e) { + throw new DatawaveAsyncOperationException( + "Failed to generate partitioned for " + subPlanConfig.getQuery().getId() + " and date range " + dateRange.getKey(), e); + } + } + + public void setFuture(Future> future) { + this.future = future; + } + + public Future> getFuture() { + return future; + } + + public Map.Entry,Set> getDateRange() { + return dateRange; + } + + public ShardQueryConfiguration getSubPlanConfig() { + return subPlanConfig; + } + } + /** * Get a configuration object configured with an updated query date range, and a plan with pushed down unindexed fields. * diff --git a/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java b/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java index 3fd6361eb17..b0a4b1b73e9 100644 --- a/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.util.Collections; +import datawave.query.exceptions.DatawaveQueryException; import org.apache.commons.jexl3.parser.ParseException; import org.apache.log4j.Logger; import org.junit.Before; @@ -109,8 +110,8 @@ public void planInMetricsAfterMissingIndexExceptionFederatedQueryPlannerNE() thr this.logic.setIndexTableName("missing"); try { runTest(query, query); - fail("Expected RuntimeException."); - } catch (RuntimeException e) { + fail("Expected DatawaveQueryException."); + } catch (DatawaveQueryException e) { assertEquals(expectedPlan, metric.getPlan()); } } @@ -122,8 +123,8 @@ public void planInMetricsAfterMissingIndexExceptionFederatedQueryPlannerNotEq() this.logic.setIndexTableName("missing"); try { runTest(query, query); - fail("Expected RuntimeException."); - } catch (RuntimeException e) { + fail("Expected DatawaveQueryException."); + } catch (DatawaveQueryException e) { assertEquals(expectedPlan, metric.getPlan()); } } From 0e5c0a10cc22ace164ae11e2493caccfe465e08c Mon Sep 17 00:00:00 2001 From: Ivan Bella <347158+ivakegg@users.noreply.github.com> Date: Fri, 7 Nov 2025 14:24:25 +0000 Subject: [PATCH 3/4] formatting --- .../query-core/src/test/java/datawave/query/QueryPlanTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java b/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java index b0a4b1b73e9..137e43fc056 100644 --- a/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java @@ -7,7 +7,6 @@ import java.io.IOException; import java.util.Collections; -import datawave.query.exceptions.DatawaveQueryException; import org.apache.commons.jexl3.parser.ParseException; import org.apache.log4j.Logger; import org.junit.Before; @@ -19,6 +18,7 @@ import datawave.query.config.ShardQueryConfiguration; import datawave.query.exceptions.DatawaveAsyncOperationException; import datawave.query.exceptions.DatawaveFatalQueryException; +import datawave.query.exceptions.DatawaveQueryException; import datawave.query.exceptions.DoNotPerformOptimizedQueryException; import datawave.query.exceptions.FullTableScansDisallowedException; import datawave.query.exceptions.InvalidQueryException; From e0821cab32ba2aee4b6173bfde198fced63df175 Mon Sep 17 00:00:00 2001 From: Ivan Bella <347158+ivakegg@users.noreply.github.com> Date: Tue, 18 Nov 2025 14:28:44 +0000 Subject: [PATCH 4/4] Review comment update --- .../query/planner/DatePartitionedQueryPlanner.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java index 17d1b0c4c28..07320b70c3e 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/DatePartitionedQueryPlanner.java @@ -79,7 +79,7 @@ public class DatePartitionedQueryPlanner extends QueryPlanner implements Cloneab /** * The max number of concurrent planning threads */ - private int maxConcurrentPlanningThreads = 10; + private int numConcurrentPlanningThreads = 10; // handles boilerplate operations that surround a visitor's execution (e.g., timers, logging, validating) private final TimedVisitorManager visitorManager = new TimedVisitorManager(); @@ -269,12 +269,12 @@ public UidIntersector getUidIntersector() { return this.queryPlanner.getUidIntersector(); } - public void setMaxConcurrentPlanningThreads(int maxConcurrentPlanningThreads) { - this.maxConcurrentPlanningThreads = maxConcurrentPlanningThreads; + public void setNumConcurrentPlanningThreads(int numConcurrentPlanningThreads) { + this.numConcurrentPlanningThreads = numConcurrentPlanningThreads; } - public int getMaxConcurrentPlanningThreads() { - return this.maxConcurrentPlanningThreads; + public int getNumConcurrentPlanningThreads() { + return this.numConcurrentPlanningThreads; } /** @@ -322,7 +322,7 @@ public Thread newThread(Runnable r) { return new Thread(r, "DatePartitionedQueryPlanner thread #" + threadCounter.getAndIncrement() + " for " + settings.getId()); } }; - ThreadPoolExecutor executor = new ThreadPoolExecutor(1, getMaxConcurrentPlanningThreads(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), + ThreadPoolExecutor executor = new ThreadPoolExecutor(1, getNumConcurrentPlanningThreads(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory); try { @@ -370,7 +370,7 @@ public Thread newThread(Runnable r) { SortedMap,Set> dateRanges = getSubQueryDateRanges(planningConfig); // Get those threads fired up - int threadsToUse = Math.min(dateRanges.size(), getMaxConcurrentPlanningThreads()); + int threadsToUse = Math.min(dateRanges.size(), getNumConcurrentPlanningThreads()); executor.setCorePoolSize(threadsToUse); // Now startup a planner for each date range