Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.TableNotFoundException;
Expand All @@ -35,6 +43,7 @@
import datawave.microservice.query.Query;
import datawave.query.CloseableIterable;
import datawave.query.config.ShardQueryConfiguration;
import datawave.query.exceptions.DatawaveAsyncOperationException;
import datawave.query.exceptions.DatawaveFatalQueryException;
import datawave.query.exceptions.DatawaveQueryException;
import datawave.query.index.lookup.UidIntersector;
Expand Down Expand Up @@ -67,6 +76,10 @@ public class DatePartitionedQueryPlanner extends QueryPlanner implements Cloneab
private DefaultQueryPlanner queryPlanner;
private String initialPlan;
private String plannedScript;
/**
* The max number of concurrent planning threads
*/
private int numConcurrentPlanningThreads = 10;

// handles boilerplate operations that surround a visitor's execution (e.g., timers, logging, validating)
private final TimedVisitorManager visitorManager = new TimedVisitorManager();
Expand Down Expand Up @@ -256,6 +269,14 @@ public UidIntersector getUidIntersector() {
return this.queryPlanner.getUidIntersector();
}

public void setNumConcurrentPlanningThreads(int numConcurrentPlanningThreads) {
this.numConcurrentPlanningThreads = numConcurrentPlanningThreads;
}

public int getNumConcurrentPlanningThreads() {
return this.numConcurrentPlanningThreads;
}

/**
* Not supported for {@link DatePartitionedQueryPlanner} and will result in an {@link UnsupportedOperationException}.
*
Expand Down Expand Up @@ -291,103 +312,159 @@ public CloseableIterable<QueryData> process(GenericQueryConfiguration genericCon
throws DatawaveQueryException {
visitorManager.setDebugEnabled(log.isDebugEnabled());

// Validate the config type.
if (!ShardQueryConfiguration.class.isAssignableFrom(genericConfig.getClass())) {
throw new ClassCastException("Config must be an instance of " + ShardQueryConfiguration.class.getSimpleName());
}

// Reset the planned script.
this.plannedScript = null;
this.plans.clear();

if (log.isDebugEnabled()) {
log.debug("Federated query: " + query);
}

ShardQueryConfiguration planningConfig = (ShardQueryConfiguration) genericConfig;
if (log.isDebugEnabled()) {
log.debug("Query's original date range " + dateFormat.format(planningConfig.getBeginDate()) + "-" + dateFormat.format(planningConfig.getEndDate()));
}

// Let's do the planning with the delegate planner first to ensure we have a final date range
// and appropriately expanded unfielded terms etc.
boolean generatePlanOnly = planningConfig.isGeneratePlanOnly();
planningConfig.setGeneratePlanOnly(true);
boolean expandValues = planningConfig.isExpandValues();
// we do NOT want to expand any values yet as they may not be dependable
// note we are expanding unfielded values (different flag)
planningConfig.setExpandValues(false);
boolean deferPushdownPullup = planningConfig.isDeferPushdownPullup();
planningConfig.setDeferPushdownPullup(true);

DefaultQueryPlanner initialPlanner = this.queryPlanner.clone();
initialPlanner.process(planningConfig, query, settings, scannerFactory);
this.initialPlan = initialPlanner.plannedScript;

planningConfig.setGeneratePlanOnly(generatePlanOnly);
planningConfig.setExpandValues(expandValues);
planningConfig.setDeferPushdownPullup(deferPushdownPullup);
DatePartitionedQueryIterable results = new DatePartitionedQueryIterable();

// Get the relevant date ranges and the sets of fields that have gaps in those ranges
SortedMap<Pair<Date,Date>,Set<String>> dateRanges = getSubQueryDateRanges(planningConfig);
// start up an executor with at least one core thread started
final AtomicInteger threadCounter = new AtomicInteger(1);
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "DatePartitionedQueryPlanner thread #" + threadCounter.getAndIncrement() + " for " + settings.getId());
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, getNumConcurrentPlanningThreads(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
threadFactory);

DatePartitionedQueryIterable results = new DatePartitionedQueryIterable();
List<Exception> exceptions = new ArrayList<>();
try {
// Validate the config type.
if (!ShardQueryConfiguration.class.isAssignableFrom(genericConfig.getClass())) {
throw new ClassCastException("Config must be an instance of " + ShardQueryConfiguration.class.getSimpleName());
}

// TODO: Lets attempt to process these pieces concurrently
for (Map.Entry<Pair<Date,Date>,Set<String>> dateRange : dateRanges.entrySet()) {
String subBeginDate = dateFormat.format(dateRange.getKey().getLeft());
String subEndDate = dateFormat.format(dateRange.getKey().getRight());
// Reset the planned script.
this.plannedScript = null;
this.plans.clear();

// Get the configuration with an updated query (pushed down unindexed fields)
ShardQueryConfiguration configCopy = getUpdatedConfig(planningConfig, dateRange.getKey(), dateRange.getValue());
if (log.isDebugEnabled()) {
log.debug("Federated query: " + query);
}

try {
// Create a copy of the original default query planner, and process the query with the new date range.
DefaultQueryPlanner subPlan = this.queryPlanner.clone();
ShardQueryConfiguration planningConfig = (ShardQueryConfiguration) genericConfig;
if (log.isDebugEnabled()) {
log.debug("Query's original date range " + dateFormat.format(planningConfig.getBeginDate()) + "-"
+ dateFormat.format(planningConfig.getEndDate()));
}

// Get the range stream for the new date range and query
results.addIterable(subPlan.reprocess(configCopy, configCopy.getQuery(), scannerFactory));
// Let's do the planning with the delegate planner first to ensure we have a final date range
// and appropriately expanded unfielded terms etc.
boolean generatePlanOnly = planningConfig.isGeneratePlanOnly();
planningConfig.setGeneratePlanOnly(true);
boolean expandValues = planningConfig.isExpandValues();
// we do NOT want to expand any values yet as they may not be dependable
// note we are expanding unfielded values (different flag)
planningConfig.setExpandValues(false);
boolean deferPushdownPullup = planningConfig.isDeferPushdownPullup();
planningConfig.setDeferPushdownPullup(true);

// now let's do the initial planning
DefaultQueryPlanner initialPlanner = this.queryPlanner.clone();
initialPlanner.process(planningConfig, query, settings, scannerFactory);
this.initialPlan = initialPlanner.plannedScript;

// and reset the expansion flags to what we had previously
planningConfig.setGeneratePlanOnly(generatePlanOnly);
planningConfig.setExpandValues(expandValues);
planningConfig.setDeferPushdownPullup(deferPushdownPullup);

// Get the relevant date ranges and the sets of fields that have gaps in those ranges
SortedMap<Pair<Date,Date>,Set<String>> dateRanges = getSubQueryDateRanges(planningConfig);

// Get those threads fired up
int threadsToUse = Math.min(dateRanges.size(), getNumConcurrentPlanningThreads());
executor.setCorePoolSize(threadsToUse);

// Now startup a planner for each date range
List<SubPlanCallable> futures = new ArrayList<>();
for (Map.Entry<Pair<Date,Date>,Set<String>> dateRange : dateRanges.entrySet()) {
SubPlanCallable subPlan = new SubPlanCallable(this.queryPlanner, planningConfig, dateRange, scannerFactory);
subPlan.setFuture(executor.submit(subPlan));
futures.add(subPlan);
}

if (log.isDebugEnabled()) {
log.debug("Query string for config of sub-plan against date range (" + subBeginDate + "-" + subEndDate + ") with unindexed fields "
+ dateRange.getValue() + ": " + configCopy.getQueryString());
// and process the results of each planner, gathering the stream of ranges into one iterable
List<DatawaveQueryException> exceptions = new ArrayList<>();
for (SubPlanCallable future : futures) {
Map.Entry<Pair<Date,Date>,Set<String>> dateRange = future.getDateRange();
String subBeginDate = dateFormat.format(dateRange.getKey().getLeft());
String subEndDate = dateFormat.format(dateRange.getKey().getRight());

try {
results.addIterable(future.getFuture().get());
if (log.isDebugEnabled()) {
log.debug("Query string for config of sub-plan against date range (" + subBeginDate + "-" + subEndDate + ") with unindexed fields "
+ dateRange.getValue() + ": " + future.getSubPlanConfig().getQueryString());
}
} catch (ExecutionException e) {
String msg = "Exception occurred when processing sub-plan against date range (" + subBeginDate + "-" + subEndDate + ")";
log.warn(msg, e);
exceptions.add(new DatawaveQueryException(msg, e.getCause()));
} catch (InterruptedException e) {
String msg = "Interrupted when processing sub-plan against date range (" + subBeginDate + "-" + subEndDate + ")";
exceptions.add(new DatawaveQueryException(msg, e));
} finally {
ShardQueryConfiguration subPlanConfig = future.getSubPlanConfig();

// append the new timers for logging at the end
planningConfig.appendTimers(subPlanConfig.getTimers());

// Add to the set of plans
plans.add(subPlanConfig.getQueryString());

// Update the planned script.
updatePlannedScript();
planningConfig.setQueryString(plannedScript);
}
} catch (DatawaveQueryException e) {
log.warn("Exception occurred when processing sub-plan against date range (" + subBeginDate + "-" + subEndDate + ")", e);
exceptions.add(e);
} finally {
// append the new timers for logging at the end
planningConfig.appendTimers(configCopy.getTimers());

// Add to the set of plans
plans.add(configCopy.getQueryString());

// Update the planned script.
updatePlannedScript();
planningConfig.setQueryString(plannedScript);
}
}

// if every plan failed, then pass an exception up
if (exceptions.size() == dateRanges.size()) {
if (exceptions.size() == 1 && exceptions.get(0) instanceof RuntimeException) {
throw (RuntimeException) (exceptions.get(0));
} else if (exceptions.size() == 1 && exceptions.get(0) instanceof DatawaveQueryException) {
throw (DatawaveQueryException) (exceptions.get(0));
} else {
DatawaveFatalQueryException e = new DatawaveFatalQueryException("Query failed creation");
for (Exception reason : exceptions) {
e.addSuppressed(reason);
// if every plan failed, then pass an exception up
if (exceptions.size() == dateRanges.size()) {
if (exceptions.size() == 1) {
Throwable e = unwrapException(exceptions.get(0));
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw (DatawaveQueryException) e;
}
} else {
DatawaveFatalQueryException e = new DatawaveFatalQueryException("Query failed creation for " + planningConfig.getQuery().getId());
for (Throwable reason : exceptions) {
e.addSuppressed(reason);
}
throw e;
}
throw e;
}

} finally {
executor.shutdown();
}

// reset the iterator to be our federated iterator
return results;
}

/**
* In the case were we have only 1 exception, we want to throw the underlying exception to be backward compatible which will be the lowest level
* DatawaveQueryException or first DatawaveFatalQueryException
*
* @param e
* The exception being unwrapped
* @return DatawaveQueryException or DatawaveFatalQueryException
*/
private Throwable unwrapException(DatawaveQueryException e) {
Throwable toThrow = e;
Throwable test = e;
while (test.getCause() != null && test.getCause() != test) {
test = test.getCause();
if (test instanceof DatawaveFatalQueryException) {
return test;
}
if (test instanceof DatawaveQueryException) {
toThrow = test;
}
}
return toThrow;
}

/**
* Update the planned script to represent a concatenation of the planned scripts from all sub-plans of the most recently executed call to
* {@link #process(GenericQueryConfiguration, String, Query, ScannerFactory)}.
Expand All @@ -410,6 +487,60 @@ private void updatePlannedScript() {
}
}

/**
* This callable will hold the state of one of the plan and the future used to generate that plan.
*/
private class SubPlanCallable implements Callable<CloseableIterable<QueryData>> {
private final ShardQueryConfiguration planningConfig;
private final Map.Entry<Pair<Date,Date>,Set<String>> dateRange;
private final DefaultQueryPlanner basePlanner;
private final ScannerFactory scannerFactory;

private ShardQueryConfiguration subPlanConfig;
private Future<CloseableIterable<QueryData>> future;

public SubPlanCallable(DefaultQueryPlanner planner, ShardQueryConfiguration planningConfig, Map.Entry<Pair<Date,Date>,Set<String>> dateRange,
ScannerFactory scannerFactory) {
this.basePlanner = planner;
this.planningConfig = planningConfig;
this.dateRange = dateRange;
this.scannerFactory = scannerFactory;
}

@Override
public CloseableIterable<QueryData> call() throws Exception {
try {
// Get an updated configuration with the new date range and query tree
this.subPlanConfig = getUpdatedConfig(planningConfig, dateRange.getKey(), dateRange.getValue());

// Create a copy of the original default query planner, and process the query with the new date range.
DefaultQueryPlanner subPlan = basePlanner.clone();

// Get the range stream for the new date range and query
return subPlan.reprocess(subPlanConfig, subPlanConfig.getQuery(), scannerFactory);
Copy link
Collaborator

@apmoriarty apmoriarty Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about the memory impact of starting a range stream and hanging onto the reference. I suppose if this solution isn't tenable we could always grab the range stream, verify it has a hit, and then close it -- marking this subplan as 'has data'.

The point is this: even though the concurrency is limited for how many range streams are executing at one point in time, and even though the scanners close between next calls, we still have the entire object in memory and it ain't cheap.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we may need to reduce the concurrency down to 1 if that is an issue. This is a straight forward tradeoff between memory and speed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not fully grasping what you are talking about as a solution. If it has a hit, and we close it, then we can't get the hits. What am I missing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After our discussion I understand what you were getting at. Essentially the Intersection/Union tree for each partition will be held in memory. So if we have a very large query and several partitions, that in itself could become quite memory intensive.
I am now thinking that I should do these subplans within the range stream so that they are processed as needed. instead of all up front.
I am going to keep this PR up but I will work on an alternative solution.

} catch (Exception e) {
throw new DatawaveAsyncOperationException(
"Failed to generate partitioned for " + subPlanConfig.getQuery().getId() + " and date range " + dateRange.getKey(), e);
}
}

public void setFuture(Future<CloseableIterable<QueryData>> future) {
this.future = future;
}

public Future<CloseableIterable<QueryData>> getFuture() {
return future;
}

public Map.Entry<Pair<Date,Date>,Set<String>> getDateRange() {
return dateRange;
}

public ShardQueryConfiguration getSubPlanConfig() {
return subPlanConfig;
}
}

/**
* Get a configuration object configured with an updated query date range, and a plan with pushed down unindexed fields.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datawave.query.config.ShardQueryConfiguration;
import datawave.query.exceptions.DatawaveAsyncOperationException;
import datawave.query.exceptions.DatawaveFatalQueryException;
import datawave.query.exceptions.DatawaveQueryException;
import datawave.query.exceptions.DoNotPerformOptimizedQueryException;
import datawave.query.exceptions.FullTableScansDisallowedException;
import datawave.query.exceptions.InvalidQueryException;
Expand Down Expand Up @@ -109,8 +110,8 @@ public void planInMetricsAfterMissingIndexExceptionFederatedQueryPlannerNE() thr
this.logic.setIndexTableName("missing");
try {
runTest(query, query);
fail("Expected RuntimeException.");
} catch (RuntimeException e) {
fail("Expected DatawaveQueryException.");
} catch (DatawaveQueryException e) {
assertEquals(expectedPlan, metric.getPlan());
}
}
Expand All @@ -122,8 +123,8 @@ public void planInMetricsAfterMissingIndexExceptionFederatedQueryPlannerNotEq()
this.logic.setIndexTableName("missing");
try {
runTest(query, query);
fail("Expected RuntimeException.");
} catch (RuntimeException e) {
fail("Expected DatawaveQueryException.");
} catch (DatawaveQueryException e) {
assertEquals(expectedPlan, metric.getPlan());
}
}
Expand Down
Loading