-
Notifications
You must be signed in to change notification settings - Fork 271
Updated the planning to be concurrent for index holes in the #3258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: integration
Are you sure you want to change the base?
Changes from 3 commits
b6b2030
3df0ea1
c4a4f47
0e5c0a1
49e786d
e0821ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,14 @@ | |
| import java.util.SortedSet; | ||
| import java.util.TreeMap; | ||
| import java.util.TreeSet; | ||
| import java.util.concurrent.Callable; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.ThreadPoolExecutor; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import org.apache.accumulo.core.client.TableNotFoundException; | ||
|
|
@@ -35,6 +43,7 @@ | |
| import datawave.microservice.query.Query; | ||
| import datawave.query.CloseableIterable; | ||
| import datawave.query.config.ShardQueryConfiguration; | ||
| import datawave.query.exceptions.DatawaveAsyncOperationException; | ||
| import datawave.query.exceptions.DatawaveFatalQueryException; | ||
| import datawave.query.exceptions.DatawaveQueryException; | ||
| import datawave.query.index.lookup.UidIntersector; | ||
|
|
@@ -67,6 +76,10 @@ public class DatePartitionedQueryPlanner extends QueryPlanner implements Cloneab | |
| private DefaultQueryPlanner queryPlanner; | ||
| private String initialPlan; | ||
| private String plannedScript; | ||
| /** | ||
| * The max number of concurrent planning threads | ||
| */ | ||
| private int maxConcurrentPlanningThreads = 10; | ||
|
|
||
| // handles boilerplate operations that surround a visitor's execution (e.g., timers, logging, validating) | ||
| private final TimedVisitorManager visitorManager = new TimedVisitorManager(); | ||
|
|
@@ -256,6 +269,14 @@ public UidIntersector getUidIntersector() { | |
| return this.queryPlanner.getUidIntersector(); | ||
| } | ||
|
|
||
| public void setMaxConcurrentPlanningThreads(int maxConcurrentPlanningThreads) { | ||
| this.maxConcurrentPlanningThreads = maxConcurrentPlanningThreads; | ||
| } | ||
|
|
||
| public int getMaxConcurrentPlanningThreads() { | ||
| return this.maxConcurrentPlanningThreads; | ||
| } | ||
|
|
||
| /** | ||
| * Not supported for {@link DatePartitionedQueryPlanner} and will result in an {@link UnsupportedOperationException}. | ||
| * | ||
|
|
@@ -291,103 +312,159 @@ public CloseableIterable<QueryData> process(GenericQueryConfiguration genericCon | |
| throws DatawaveQueryException { | ||
| visitorManager.setDebugEnabled(log.isDebugEnabled()); | ||
|
|
||
| // Validate the config type. | ||
| if (!ShardQueryConfiguration.class.isAssignableFrom(genericConfig.getClass())) { | ||
| throw new ClassCastException("Config must be an instance of " + ShardQueryConfiguration.class.getSimpleName()); | ||
| } | ||
|
|
||
| // Reset the planned script. | ||
| this.plannedScript = null; | ||
| this.plans.clear(); | ||
|
|
||
| if (log.isDebugEnabled()) { | ||
| log.debug("Federated query: " + query); | ||
| } | ||
|
|
||
| ShardQueryConfiguration planningConfig = (ShardQueryConfiguration) genericConfig; | ||
| if (log.isDebugEnabled()) { | ||
| log.debug("Query's original date range " + dateFormat.format(planningConfig.getBeginDate()) + "-" + dateFormat.format(planningConfig.getEndDate())); | ||
| } | ||
|
|
||
| // Let's do the planning with the delegate planner first to ensure we have a final date range | ||
| // and appropriately expanded unfielded terms etc. | ||
| boolean generatePlanOnly = planningConfig.isGeneratePlanOnly(); | ||
| planningConfig.setGeneratePlanOnly(true); | ||
| boolean expandValues = planningConfig.isExpandValues(); | ||
| // we do NOT want to expand any values yet as they may not be dependable | ||
| // note we are expanding unfielded values (different flag) | ||
| planningConfig.setExpandValues(false); | ||
| boolean deferPushdownPullup = planningConfig.isDeferPushdownPullup(); | ||
| planningConfig.setDeferPushdownPullup(true); | ||
|
|
||
| DefaultQueryPlanner initialPlanner = this.queryPlanner.clone(); | ||
| initialPlanner.process(planningConfig, query, settings, scannerFactory); | ||
| this.initialPlan = initialPlanner.plannedScript; | ||
|
|
||
| planningConfig.setGeneratePlanOnly(generatePlanOnly); | ||
| planningConfig.setExpandValues(expandValues); | ||
| planningConfig.setDeferPushdownPullup(deferPushdownPullup); | ||
| DatePartitionedQueryIterable results = new DatePartitionedQueryIterable(); | ||
|
|
||
| // Get the relevant date ranges and the sets of fields that have gaps in those ranges | ||
| SortedMap<Pair<Date,Date>,Set<String>> dateRanges = getSubQueryDateRanges(planningConfig); | ||
| // start up an executor with at least one core thread started | ||
| final AtomicInteger threadCounter = new AtomicInteger(1); | ||
| ThreadFactory threadFactory = new ThreadFactory() { | ||
| @Override | ||
| public Thread newThread(Runnable r) { | ||
| return new Thread(r, "DatePartitionedQueryPlanner thread #" + threadCounter.getAndIncrement() + " for " + settings.getId()); | ||
| } | ||
| }; | ||
| ThreadPoolExecutor executor = new ThreadPoolExecutor(1, getMaxConcurrentPlanningThreads(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), | ||
| threadFactory); | ||
|
|
||
| DatePartitionedQueryIterable results = new DatePartitionedQueryIterable(); | ||
| List<Exception> exceptions = new ArrayList<>(); | ||
| try { | ||
| // Validate the config type. | ||
| if (!ShardQueryConfiguration.class.isAssignableFrom(genericConfig.getClass())) { | ||
| throw new ClassCastException("Config must be an instance of " + ShardQueryConfiguration.class.getSimpleName()); | ||
| } | ||
|
|
||
| // TODO: Lets attempt to process these pieces concurrently | ||
| for (Map.Entry<Pair<Date,Date>,Set<String>> dateRange : dateRanges.entrySet()) { | ||
| String subBeginDate = dateFormat.format(dateRange.getKey().getLeft()); | ||
| String subEndDate = dateFormat.format(dateRange.getKey().getRight()); | ||
| // Reset the planned script. | ||
| this.plannedScript = null; | ||
| this.plans.clear(); | ||
|
|
||
| // Get the configuration with an updated query (pushed down unindexed fields) | ||
| ShardQueryConfiguration configCopy = getUpdatedConfig(planningConfig, dateRange.getKey(), dateRange.getValue()); | ||
| if (log.isDebugEnabled()) { | ||
| log.debug("Federated query: " + query); | ||
| } | ||
|
|
||
| try { | ||
| // Create a copy of the original default query planner, and process the query with the new date range. | ||
| DefaultQueryPlanner subPlan = this.queryPlanner.clone(); | ||
| ShardQueryConfiguration planningConfig = (ShardQueryConfiguration) genericConfig; | ||
| if (log.isDebugEnabled()) { | ||
| log.debug("Query's original date range " + dateFormat.format(planningConfig.getBeginDate()) + "-" | ||
| + dateFormat.format(planningConfig.getEndDate())); | ||
| } | ||
|
|
||
| // Get the range stream for the new date range and query | ||
| results.addIterable(subPlan.reprocess(configCopy, configCopy.getQuery(), scannerFactory)); | ||
| // Let's do the planning with the delegate planner first to ensure we have a final date range | ||
| // and appropriately expanded unfielded terms etc. | ||
| boolean generatePlanOnly = planningConfig.isGeneratePlanOnly(); | ||
| planningConfig.setGeneratePlanOnly(true); | ||
| boolean expandValues = planningConfig.isExpandValues(); | ||
| // we do NOT want to expand any values yet as they may not be dependable | ||
| // note we are expanding unfielded values (different flag) | ||
| planningConfig.setExpandValues(false); | ||
| boolean deferPushdownPullup = planningConfig.isDeferPushdownPullup(); | ||
| planningConfig.setDeferPushdownPullup(true); | ||
|
|
||
| // now let's do the initial planning | ||
| DefaultQueryPlanner initialPlanner = this.queryPlanner.clone(); | ||
| initialPlanner.process(planningConfig, query, settings, scannerFactory); | ||
| this.initialPlan = initialPlanner.plannedScript; | ||
|
|
||
| // and reset the expansion flags to what we had previously | ||
| planningConfig.setGeneratePlanOnly(generatePlanOnly); | ||
| planningConfig.setExpandValues(expandValues); | ||
| planningConfig.setDeferPushdownPullup(deferPushdownPullup); | ||
|
|
||
| // Get the relevant date ranges and the sets of fields that have gaps in those ranges | ||
| SortedMap<Pair<Date,Date>,Set<String>> dateRanges = getSubQueryDateRanges(planningConfig); | ||
|
|
||
| // Get those threads fired up | ||
| int threadsToUse = Math.min(dateRanges.size(), getMaxConcurrentPlanningThreads()); | ||
| executor.setCorePoolSize(threadsToUse); | ||
|
|
||
| // Now startup a planner for each date range | ||
| List<SubPlanCallable> futures = new ArrayList<>(); | ||
| for (Map.Entry<Pair<Date,Date>,Set<String>> dateRange : dateRanges.entrySet()) { | ||
| SubPlanCallable subPlan = new SubPlanCallable(this.queryPlanner, planningConfig, dateRange, scannerFactory); | ||
| subPlan.setFuture(executor.submit(subPlan)); | ||
| futures.add(subPlan); | ||
| } | ||
|
|
||
| if (log.isDebugEnabled()) { | ||
| log.debug("Query string for config of sub-plan against date range (" + subBeginDate + "-" + subEndDate + ") with unindexed fields " | ||
| + dateRange.getValue() + ": " + configCopy.getQueryString()); | ||
| // and process the results of each planner, gathering the stream of ranges into one iterable | ||
| List<DatawaveQueryException> exceptions = new ArrayList<>(); | ||
| for (SubPlanCallable future : futures) { | ||
| Map.Entry<Pair<Date,Date>,Set<String>> dateRange = future.getDateRange(); | ||
| String subBeginDate = dateFormat.format(dateRange.getKey().getLeft()); | ||
| String subEndDate = dateFormat.format(dateRange.getKey().getRight()); | ||
|
|
||
| try { | ||
| results.addIterable(future.getFuture().get()); | ||
| if (log.isDebugEnabled()) { | ||
| log.debug("Query string for config of sub-plan against date range (" + subBeginDate + "-" + subEndDate + ") with unindexed fields " | ||
| + dateRange.getValue() + ": " + future.getSubPlanConfig().getQueryString()); | ||
| } | ||
| } catch (ExecutionException e) { | ||
| String msg = "Exception occurred when processing sub-plan against date range (" + subBeginDate + "-" + subEndDate + ")"; | ||
| log.warn(msg, e); | ||
| exceptions.add(new DatawaveQueryException(msg, e.getCause())); | ||
| } catch (InterruptedException e) { | ||
| String msg = "Interrupted when processing sub-plan against date range (" + subBeginDate + "-" + subEndDate + ")"; | ||
| exceptions.add(new DatawaveQueryException(msg, e)); | ||
| } finally { | ||
| ShardQueryConfiguration subPlanConfig = future.getSubPlanConfig(); | ||
|
|
||
| // append the new timers for logging at the end | ||
| planningConfig.appendTimers(subPlanConfig.getTimers()); | ||
|
|
||
| // Add to the set of plans | ||
| plans.add(subPlanConfig.getQueryString()); | ||
|
|
||
| // Update the planned script. | ||
| updatePlannedScript(); | ||
| planningConfig.setQueryString(plannedScript); | ||
| } | ||
| } catch (DatawaveQueryException e) { | ||
| log.warn("Exception occurred when processing sub-plan against date range (" + subBeginDate + "-" + subEndDate + ")", e); | ||
| exceptions.add(e); | ||
| } finally { | ||
| // append the new timers for logging at the end | ||
| planningConfig.appendTimers(configCopy.getTimers()); | ||
|
|
||
| // Add to the set of plans | ||
| plans.add(configCopy.getQueryString()); | ||
|
|
||
| // Update the planned script. | ||
| updatePlannedScript(); | ||
| planningConfig.setQueryString(plannedScript); | ||
| } | ||
| } | ||
|
|
||
| // if every plan failed, then pass an exception up | ||
| if (exceptions.size() == dateRanges.size()) { | ||
| if (exceptions.size() == 1 && exceptions.get(0) instanceof RuntimeException) { | ||
| throw (RuntimeException) (exceptions.get(0)); | ||
| } else if (exceptions.size() == 1 && exceptions.get(0) instanceof DatawaveQueryException) { | ||
| throw (DatawaveQueryException) (exceptions.get(0)); | ||
| } else { | ||
| DatawaveFatalQueryException e = new DatawaveFatalQueryException("Query failed creation"); | ||
| for (Exception reason : exceptions) { | ||
| e.addSuppressed(reason); | ||
| // if every plan failed, then pass an exception up | ||
| if (exceptions.size() == dateRanges.size()) { | ||
| if (exceptions.size() == 1) { | ||
| Throwable e = unwrapException(exceptions.get(0)); | ||
| if (e instanceof RuntimeException) { | ||
| throw (RuntimeException) e; | ||
| } else { | ||
| throw (DatawaveQueryException) e; | ||
| } | ||
| } else { | ||
| DatawaveFatalQueryException e = new DatawaveFatalQueryException("Query failed creation for " + planningConfig.getQuery().getId()); | ||
| for (Throwable reason : exceptions) { | ||
| e.addSuppressed(reason); | ||
| } | ||
| throw e; | ||
| } | ||
| throw e; | ||
| } | ||
|
|
||
| } finally { | ||
| executor.shutdown(); | ||
| } | ||
|
|
||
| // reset the iterator to be our federated iterator | ||
| return results; | ||
| } | ||
|
|
||
| /** | ||
| * In the case were we have only 1 exception, we want to throw the underlying exception to be backward compatible which will be the lowest level | ||
| * DatawaveQueryException or first DatawaveFatalQueryException | ||
| * | ||
| * @param e | ||
| * The exception being unwrapped | ||
| * @return DatawaveQueryException or DatawaveFatalQueryException | ||
| */ | ||
| private Throwable unwrapException(DatawaveQueryException e) { | ||
| Throwable toThrow = e; | ||
| Throwable test = e; | ||
| while (test.getCause() != null && test.getCause() != test) { | ||
| test = test.getCause(); | ||
| if (test instanceof DatawaveFatalQueryException) { | ||
| return test; | ||
| } | ||
| if (test instanceof DatawaveQueryException) { | ||
| toThrow = test; | ||
| } | ||
| } | ||
| return toThrow; | ||
| } | ||
|
|
||
| /** | ||
| * Update the planned script to represent a concatenation of the planned scripts from all sub-plans of the most recently executed call to | ||
| * {@link #process(GenericQueryConfiguration, String, Query, ScannerFactory)}. | ||
|
|
@@ -410,6 +487,60 @@ private void updatePlannedScript() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * This callable will hold the state of one of the plan and the future used to generate that plan. | ||
| */ | ||
| private class SubPlanCallable implements Callable<CloseableIterable<QueryData>> { | ||
| private final ShardQueryConfiguration planningConfig; | ||
| private final Map.Entry<Pair<Date,Date>,Set<String>> dateRange; | ||
| private final DefaultQueryPlanner basePlanner; | ||
| private final ScannerFactory scannerFactory; | ||
|
|
||
| private ShardQueryConfiguration subPlanConfig; | ||
| private Future<CloseableIterable<QueryData>> future; | ||
|
|
||
| public SubPlanCallable(DefaultQueryPlanner planner, ShardQueryConfiguration planningConfig, Map.Entry<Pair<Date,Date>,Set<String>> dateRange, | ||
| ScannerFactory scannerFactory) { | ||
| this.basePlanner = planner; | ||
| this.planningConfig = planningConfig; | ||
| this.dateRange = dateRange; | ||
| this.scannerFactory = scannerFactory; | ||
| } | ||
|
|
||
| @Override | ||
| public CloseableIterable<QueryData> call() throws Exception { | ||
| try { | ||
| // Get an updated configuration with the new date range and query tree | ||
| this.subPlanConfig = getUpdatedConfig(planningConfig, dateRange.getKey(), dateRange.getValue()); | ||
|
|
||
| // Create a copy of the original default query planner, and process the query with the new date range. | ||
| DefaultQueryPlanner subPlan = basePlanner.clone(); | ||
|
|
||
| // Get the range stream for the new date range and query | ||
| return subPlan.reprocess(subPlanConfig, subPlanConfig.getQuery(), scannerFactory); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm curious about the memory impact of starting a range stream and hanging onto the reference. I suppose if this solution isn't tenable we could always grab the range stream, verify it has a hit, and then close it -- marking this subplan as 'has data'. The point is this: even though the concurrency is limited for how many range streams are executing at one point in time, and even though the scanners close between next calls, we still have the entire object in memory and it ain't cheap.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So we may need to reduce the concurrency down to 1 if that is an issue. This is a straight forward tradeoff between memory and speed.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not fully grasping what you are talking about as a solution. If it has a hit, and we close it, then we can't get the hits. What am I missing.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After our discussion I understand what you were getting at. Essentially the Intersection/Union tree for each partition will be held in memory. So if we have a very large query and several partitions, that in itself could become quite memory intensive. |
||
| } catch (Exception e) { | ||
| throw new DatawaveAsyncOperationException( | ||
| "Failed to generate partitioned for " + subPlanConfig.getQuery().getId() + " and date range " + dateRange.getKey(), e); | ||
| } | ||
| } | ||
|
|
||
| public void setFuture(Future<CloseableIterable<QueryData>> future) { | ||
| this.future = future; | ||
| } | ||
|
|
||
| public Future<CloseableIterable<QueryData>> getFuture() { | ||
| return future; | ||
| } | ||
|
|
||
| public Map.Entry<Pair<Date,Date>,Set<String>> getDateRange() { | ||
| return dateRange; | ||
| } | ||
|
|
||
| public ShardQueryConfiguration getSubPlanConfig() { | ||
| return subPlanConfig; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Get a configuration object configured with an updated query date range, and a plan with pushed down unindexed fields. | ||
| * | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.