diff --git a/warehouse/query-core/src/main/java/datawave/query/exceptions/DatawaveAsyncOperationException.java b/warehouse/query-core/src/main/java/datawave/query/exceptions/DatawaveAsyncOperationException.java new file mode 100644 index 00000000000..d8296f1e68f --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/exceptions/DatawaveAsyncOperationException.java @@ -0,0 +1,32 @@ +package datawave.query.exceptions; + +import datawave.query.planner.DefaultQueryPlanner; + +/** + * An exception thrown when the {@link DefaultQueryPlanner} encounters a problem during an async operation like fetching field sets or serializing iterator + * options in another thread + */ +public class DatawaveAsyncOperationException extends RuntimeException { + + private static final long serialVersionUID = -5455973957749708049L; + + public DatawaveAsyncOperationException() { + super(); + } + + public DatawaveAsyncOperationException(String message, Throwable cause) { + super(message, cause); + } + + public DatawaveAsyncOperationException(String message) { + super(message); + } + + public DatawaveAsyncOperationException(Throwable cause) { + super(cause); + } + + protected DatawaveAsyncOperationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java index 1d763edb37a..55cf0b0dff0 100644 --- a/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java +++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/ShardRangeStream.java @@ -1,5 +1,6 @@ package datawave.query.index.lookup; +import java.io.IOException; import java.util.Collections; import java.util.Date; import java.util.Iterator; @@ -16,11 +17,13 @@ import org.apache.commons.jexl3.parser.JexlNode; import com.google.common.base.Function; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterators; +import com.google.common.collect.Multimap; +import datawave.data.type.Type; import datawave.query.CloseableIterable; import datawave.query.config.ShardQueryConfiguration; -import datawave.query.exceptions.DatawaveQueryException; import datawave.query.index.lookup.IndexStream.StreamContext; import datawave.query.iterator.FieldIndexOnlyQueryIterator; import datawave.query.iterator.QueryOptions; @@ -65,7 +68,7 @@ public CloseableIterable streamPlans(JexlNode node) { DefaultQueryPlanner.addOption(cfg, QueryOptions.DATATYPE_FILTER, config.getDatatypeFilterAsString(), false); DefaultQueryPlanner.addOption(cfg, QueryOptions.END_TIME, Long.toString(config.getEndDate().getTime()), false); - DefaultQueryPlanner.configureTypeMappings(config, cfg, metadataHelper, true); + configureTypeMappings(config, cfg, metadataHelper); scanner.setRanges(Collections.singleton(rangeForTerm(null, null, config))); @@ -97,7 +100,7 @@ public CloseableIterable streamPlans(JexlNode node) { } - } catch (TableNotFoundException | DatawaveQueryException e) { + } catch (TableNotFoundException e) { throw new RuntimeException(e); } finally { // shut down the executor as all threads have completed @@ -134,4 +137,29 @@ public QueryPlan apply(Entry entry) { // @formatter:on } } + + /** + * Lift and shift from DefaultQueryPlanner to avoid reliance on static methods + */ + private void configureTypeMappings(ShardQueryConfiguration config, IteratorSetting cfg, MetadataHelper metadataHelper) { + DefaultQueryPlanner.addOption(cfg, QueryOptions.QUERY_MAPPING_COMPRESS, Boolean.toString(true), false); + + Multimap> nonIndexedQueryFieldsDatatypes = HashMultimap.create(config.getQueryFieldsDatatypes()); + nonIndexedQueryFieldsDatatypes.keySet().removeAll(config.getIndexedFields()); + String nonIndexedTypes = QueryOptions.buildFieldNormalizerString(nonIndexedQueryFieldsDatatypes); + DefaultQueryPlanner.addOption(cfg, QueryOptions.NON_INDEXED_DATATYPES, nonIndexedTypes, false); + + try { + String serializedTypeMetadata = metadataHelper.getTypeMetadata(config.getDatatypeFilter()).toString(); + DefaultQueryPlanner.addOption(cfg, QueryOptions.TYPE_METADATA, serializedTypeMetadata, false); + + String requiredAuthsString = metadataHelper.getUsersMetadataAuthorizationSubset(); + requiredAuthsString = QueryOptions.compressOption(requiredAuthsString, QueryOptions.UTF8); + DefaultQueryPlanner.addOption(cfg, QueryOptions.TYPE_METADATA_AUTHS, requiredAuthsString, false); + } catch (TableNotFoundException | IOException e) { + throw new RuntimeException(e); + } + + DefaultQueryPlanner.addOption(cfg, QueryOptions.METADATA_TABLE_NAME, config.getMetadataTableName(), false); + } } diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java index 72d8a852e3d..8fbebd477e8 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.TimeZone; import java.util.TreeSet; @@ -28,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; @@ -80,6 +82,7 @@ import datawave.query.config.ScanHintRule; import datawave.query.config.ShardQueryConfiguration; import datawave.query.exceptions.CannotExpandUnfieldedTermFatalException; +import datawave.query.exceptions.DatawaveAsyncOperationException; import datawave.query.exceptions.DatawaveFatalQueryException; import datawave.query.exceptions.DatawaveQueryException; import datawave.query.exceptions.DoNotPerformOptimizedQueryException; @@ -157,6 +160,15 @@ import datawave.query.jexl.visitors.order.OrderByCostVisitor; import datawave.query.jexl.visitors.whindex.WhindexVisitor; import datawave.query.model.QueryModel; +import datawave.query.planner.async.AbstractQueryPlannerCallable; +import datawave.query.planner.async.FetchCompositeMetadata; +import datawave.query.planner.async.FetchContentExpansionFields; +import datawave.query.planner.async.FetchIndexOnlyFields; +import datawave.query.planner.async.FetchIndexedFields; +import datawave.query.planner.async.FetchNonEventFields; +import datawave.query.planner.async.FetchTermFrequencyFields; +import datawave.query.planner.async.FetchTypeMetadata; +import datawave.query.planner.async.SerializeIvaratorCacheDirs; import datawave.query.planner.comparator.DefaultQueryPlanComparator; import datawave.query.planner.comparator.GeoWaveQueryPlanComparator; import datawave.query.planner.pushdown.PushDownVisitor; @@ -270,10 +282,46 @@ public class DefaultQueryPlanner extends QueryPlanner implements Cloneable { protected String rangeStreamClass = RangeStream.class.getCanonicalName(); - protected ExecutorService builderThread = null; + protected ExecutorService executor = null; + + protected AbstractQueryPlannerCallable compositeMetadataCallable; + protected AbstractQueryPlannerCallable typeMetadataCallable; + protected AbstractQueryPlannerCallable contentExpansionFieldsCallable; + protected AbstractQueryPlannerCallable ivaratorCacheDirCallable; + protected AbstractQueryPlannerCallable> indexedFieldsCallable; + protected AbstractQueryPlannerCallable> indexOnlyFieldsCallable; + protected AbstractQueryPlannerCallable> nonEventFieldsCallable; + protected AbstractQueryPlannerCallable> termFrequencyFieldsCallable; + + protected Future compositeMetadataFuture; + protected Future typeMetadataFuture; + protected Future contentExpansionFieldsFuture; + protected Future ivaratorCacheDirFuture; + protected Future> indexedFieldsFuture; + protected Future> indexOnlyFieldsFuture; + protected Future> nonEventFieldsFuture; + protected Future> termFrequencyFieldsFuture; + + protected CompositeMetadata compositeMetadata; + protected TypeMetadata typeMetadata; + protected String contentExpansionFields; + protected String serializedIvaratorDirs; + protected Set indexedFields; + protected Set indexOnlyFields; + protected Set nonEventFields; + protected Set termFrequencyFields; protected Future settingFuture = null; + private boolean logConcurrentStageExecution = false; + private int concurrentTimeoutMillis = 10_000; // 10 second default + + // tracks time spent in various stages that may not be covered in the other query stopwatches + protected QueryStopwatch stageStopWatch = new QueryStopwatch(); + + // tracks time saved via concurrent task execution + protected QueryStopwatch futureStopWatch = new QueryStopwatch(); + protected long maxRangeWaitMillis = 125; /** @@ -392,10 +440,10 @@ public CloseableIterable process(GenericQueryConfiguration genericCon throw new ClassCastException("Config object must be an instance of ShardQueryConfiguration"); } - builderThread = Executors.newSingleThreadExecutor(); - ShardQueryConfiguration config = (ShardQueryConfiguration) genericConfig; + startConcurrentExecution(config); + // lets mark the query as started (used by ivarators at a minimum) try { markQueryStarted(config, settings); @@ -406,6 +454,47 @@ public CloseableIterable process(GenericQueryConfiguration genericCon return process(scannerFactory, getMetadataHelper(config), getDateIndexHelper(config), config, query, settings); } + /** + * This method starts a number of long-running tasks that can be done in parallel. + * + * @param config + * the config + */ + protected void startConcurrentExecution(ShardQueryConfiguration config) { + // iterator setting future + seven futures below make 8, add two for growth/extension + executor = Executors.newFixedThreadPool(10); + + compositeMetadata = null; + typeMetadata = null; + contentExpansionFields = null; + serializedIvaratorDirs = null; + indexedFields = null; + indexOnlyFields = null; + nonEventFields = null; + termFrequencyFields = null; + + // expensive operations are executed in parallel + compositeMetadataCallable = new FetchCompositeMetadata(futureStopWatch, metadataHelper, config.getDatatypeFilter()); + typeMetadataCallable = new FetchTypeMetadata(futureStopWatch, metadataHelper, config.getDatatypeFilter()); + contentExpansionFieldsCallable = new FetchContentExpansionFields(futureStopWatch, metadataHelper, config.getDatatypeFilter()); + ivaratorCacheDirCallable = new SerializeIvaratorCacheDirs(futureStopWatch, this, config); + indexedFieldsCallable = new FetchIndexedFields(futureStopWatch, metadataHelper, config.getDatatypeFilter()); + indexOnlyFieldsCallable = new FetchIndexOnlyFields(futureStopWatch, metadataHelper, config.getDatatypeFilter()); + nonEventFieldsCallable = new FetchNonEventFields(futureStopWatch, metadataHelper, config.getDatatypeFilter()); + termFrequencyFieldsCallable = new FetchTermFrequencyFields(futureStopWatch, metadataHelper, config.getDatatypeFilter()); + + // field sets tend to be needed first, so submit those before others + indexOnlyFieldsFuture = executor.submit(indexOnlyFieldsCallable); + indexedFieldsFuture = executor.submit(indexedFieldsCallable); + nonEventFieldsFuture = executor.submit(nonEventFieldsCallable); + termFrequencyFieldsFuture = executor.submit(termFrequencyFieldsCallable); + + compositeMetadataFuture = executor.submit(compositeMetadataCallable); + typeMetadataFuture = executor.submit(typeMetadataCallable); + contentExpansionFieldsFuture = executor.submit(contentExpansionFieldsCallable); + ivaratorCacheDirFuture = executor.submit(ivaratorCacheDirCallable); + } + protected CloseableIterable process(ScannerFactory scannerFactory, MetadataHelper metadataHelper, DateIndexHelper dateIndexHelper, ShardQueryConfiguration config, String query, Query settings) throws DatawaveQueryException { settingFuture = null; @@ -413,7 +502,7 @@ protected CloseableIterable process(ScannerFactory scannerFactory, Me IteratorSetting cfg = null; if (preloadOptions) { - cfg = getQueryIterator(metadataHelper, config, settings, "", false, true); + cfg = getQueryIterator(metadataHelper, config, "", false, true); } try { @@ -468,7 +557,7 @@ protected CloseableIterable process(ScannerFactory scannerFactory, Me if (!config.isGeneratePlanOnly()) { while (null == cfg) { - cfg = getQueryIterator(metadataHelper, config, settings, "", false, false); + cfg = getQueryIterator(metadataHelper, config, "", false, false); } configureIterator(config, cfg, newQueryString, isFullTable); } @@ -480,6 +569,10 @@ protected CloseableIterable process(ScannerFactory scannerFactory, Me this.plannedScript = newQueryString; config.setQueryString(this.plannedScript); + if (logConcurrentStageExecution) { + logTimeSavedViaConcurrentExecution(); + } + if (!config.isGeneratePlanOnly()) { // add the geo query comparator to sort by geo range granularity if this is a geo query List> queryPlanComparators = null; @@ -542,8 +635,9 @@ private void configureIterator(ShardQueryConfiguration config, IteratorSetting c addOption(cfg, QueryOptions.FULL_TABLE_SCAN_ONLY, Boolean.toString(isFullTable), false); addOption(cfg, QueryOptions.TRACK_SIZES, Boolean.toString(config.isTrackSizes()), false); addOption(cfg, QueryOptions.ACTIVE_QUERY_LOG_NAME, config.getActiveQueryLogName(), false); + // Set the start and end dates - configureTypeMappings(config, cfg, metadataHelper, getCompressOptionMappings()); + configureTypeMappings(config, cfg, metadataHelper, getCompressOptionMappings(), false); } /** @@ -573,8 +667,8 @@ public void close(GenericQueryConfiguration genericConfig, Query settings) { log.warn("Config object must be an instance of ShardQueryConfiguration to properly close the DefaultQueryPlanner. You gave me a " + genericConfig); } - if (null != builderThread) { - builderThread.shutdown(); + if (null != executor) { + executor.shutdown(); } return; } @@ -588,8 +682,8 @@ public void close(GenericQueryConfiguration genericConfig, Query settings) { log.error("Failed to close query " + settings.getId(), e); } - if (null != builderThread) { - builderThread.shutdown(); + if (null != executor) { + executor.shutdown(); } } @@ -778,7 +872,7 @@ protected ASTJexlScript updateQueryTree(ScannerFactory scannerFactory, MetadataH // | Post Query Model Expansion Clean Up | // +-------------------------------------+ - Set indexOnlyFields = loadIndexedFields(config); + Set indexOnlyFields = getIndexOnlyFields(); if (!indexOnlyFields.isEmpty()) { // filter:includeRegex and filter:excludeRegex functions cannot be run against index-only fields, clean that up @@ -835,7 +929,7 @@ protected ASTJexlScript updateQueryTree(ScannerFactory scannerFactory, MetadataH // check the query for any fields that are term frequencies // if any exist, populate the shard query config with these fields - timedCheckForTokenizedFields(timers, "Check for term frequency (tokenized) fields", config, metadataHelper); + timedCheckForTokenizedFields(timers, "Check for term frequency (tokenized) fields", config); if (reduceQuery) { config.setQueryTree(timedReduce(timers, "Reduce Query Final", config.getQueryTree())); @@ -908,14 +1002,9 @@ protected ASTJexlScript processTree(final ASTJexlScript originalQueryTree, Shard Set indexOnlyFields = null; Set nonEventFields = null; if (config.getMinSelectivity() > 0 || !disableBoundedLookup) { - try { - indexedFields = metadataHelper.getIndexedFields(config.getDatatypeFilter()); - indexOnlyFields = metadataHelper.getIndexOnlyFields(config.getDatatypeFilter()); - nonEventFields = metadataHelper.getNonEventFields(config.getDatatypeFilter()); - } catch (TableNotFoundException te) { - QueryException qe = new QueryException(DatawaveErrorCode.METADATA_ACCESS_ERROR, te); - throw new DatawaveFatalQueryException(qe); - } + indexedFields = getIndexedFields(); + indexOnlyFields = getIndexOnlyFields(); + nonEventFields = getNonEventFields(); } // apply the node transform rules @@ -1164,20 +1253,14 @@ protected void timeScanHintRules(QueryStopwatch timers, String stage, ShardQuery stopwatch.stop(); } - protected void timedCheckForTokenizedFields(QueryStopwatch timers, String stage, ShardQueryConfiguration config, MetadataHelper metadataHelper) { + protected void timedCheckForTokenizedFields(QueryStopwatch timers, String stage, ShardQueryConfiguration config) { TraceStopwatch stopwatch = timers.newStartedStopwatch("DefaultQueryPlanner - " + stage); // Figure out if the query contained any term frequency terms so we know // if we may use the term frequencies instead of the fields index in some cases Set queryTfFields = Collections.emptySet(); - Set termFrequencyFields; - try { - termFrequencyFields = metadataHelper.getTermFrequencyFields(config.getDatatypeFilter()); - } catch (TableNotFoundException e) { - stopwatch.stop(); - QueryException qe = new QueryException(DatawaveErrorCode.TERM_FREQUENCY_FIELDS_RETRIEVAL_ERROR, e); - throw new DatawaveFatalQueryException(qe); - } + Set termFrequencyFields = getTermFrequencyFields(); + if (!termFrequencyFields.isEmpty()) { queryTfFields = SetMembershipVisitor.getMembers(termFrequencyFields, config, config.getQueryTree()); @@ -1214,37 +1297,6 @@ protected QueryModel loadQueryModel(ShardQueryConfiguration config) { return queryModelProvider.getQueryModel(); } - /* - - - */ - - protected Set loadIndexedFields(ShardQueryConfiguration config) { - try { - return metadataHelper.getIndexOnlyFields(config.getDatatypeFilter()); - } catch (TableNotFoundException e) { - QueryException qe = new QueryException(DatawaveErrorCode.INDEX_ONLY_FIELDS_RETRIEVAL_ERROR, e); - throw new DatawaveFatalQueryException(qe); - } - } - - /** - * Loads expansion fields filtered by datatype. If an error occurs that error is rethrown as a {@link DatawaveFatalQueryException} - * - * @param config - * a configuration - * @return list of expansion fields - */ - protected Set loadExpansionFields(ShardQueryConfiguration config) { - try { - return metadataHelper.getExpansionFields(config.getDatatypeFilter()); - } catch (TableNotFoundException e) { - QueryException qe = new QueryException(DatawaveErrorCode.METADATA_ACCESS_ERROR, e); - log.info(qe); - throw new DatawaveFatalQueryException(qe); - } - } - /* * Start methods that operate on the query tree */ @@ -1399,7 +1451,7 @@ protected ASTJexlScript timedApplyNodeTransformRules(QueryStopwatch timers, Stri protected ASTJexlScript timedExpandAnyFieldRegexNodes(QueryStopwatch timers, final ASTJexlScript script, ShardQueryConfiguration config, MetadataHelper metadataHelper, ScannerFactory scannerFactory, String query) throws DatawaveQueryException { try { - config.setIndexedFields(metadataHelper.getIndexedFields(config.getDatatypeFilter())); + config.setIndexedFields(getIndexedFields()); config.setReverseIndexedFields(metadataHelper.getReverseIndexedFields(config.getDatatypeFilter())); // @formatter:off @@ -2195,10 +2247,11 @@ protected void configureAdditionalOptions(ShardQueryConfiguration config, Iterat // no-op } - protected Future loadQueryIterator(final MetadataHelper metadataHelper, final ShardQueryConfiguration config, final Query settings, - final String queryString, final Boolean isFullTable, boolean isPreload) throws DatawaveQueryException { + protected Future loadQueryIterator(final MetadataHelper metadataHelper, final ShardQueryConfiguration config, final Boolean isFullTable, + boolean isPreload) { + + return executor.submit(() -> { - return builderThread.submit(() -> { // VersioningIterator is typically set at 20 on the table IteratorSetting cfg = new IteratorSetting(config.getBaseIteratorPriority() + 40, "query", getQueryIteratorClass()); @@ -2225,7 +2278,7 @@ protected Future loadQueryIterator(final MetadataHelper metadat addOption(cfg, QueryOptions.ZOOKEEPER_CONFIG, config.getZookeeperConfig(), false); } if (config.getIvaratorCacheDirConfigs() != null && !config.getIvaratorCacheDirConfigs().isEmpty()) { - addOption(cfg, QueryOptions.IVARATOR_CACHE_DIR_CONFIG, IvaratorCacheDirConfig.toJson(getShuffledIvaratoCacheDirConfigs(config)), false); + addOption(cfg, QueryOptions.IVARATOR_CACHE_DIR_CONFIG, getSerializedIvaratorDirs(), false); } addOption(cfg, QueryOptions.IVARATOR_CACHE_BUFFER_SIZE, Integer.toString(config.getIvaratorCacheBufferSize()), false); addOption(cfg, QueryOptions.IVARATOR_SCAN_PERSIST_THRESHOLD, Long.toString(config.getIvaratorCacheScanPersistThreshold()), false); @@ -2254,27 +2307,17 @@ protected Future loadQueryIterator(final MetadataHelper metadat loadFields(cfg, config, isPreload); configureSeekingOptions(cfg, config); - try { - CompositeMetadata compositeMetadata = metadataHelper.getCompositeMetadata().filter(config.getQueryFieldsDatatypes().keySet()); - if (compositeMetadata != null && !compositeMetadata.isEmpty()) { - addOption(cfg, QueryOptions.COMPOSITE_METADATA, java.util.Base64.getEncoder().encodeToString(CompositeMetadata.toBytes(compositeMetadata)), - false); - } - } catch (TableNotFoundException e) { - QueryException qe = new QueryException(DatawaveErrorCode.COMPOSITE_METADATA_CONFIG_ERROR, e); - throw new DatawaveQueryException(qe); + CompositeMetadata compositeMetadata = getCompositeMetadata(); + compositeMetadata = compositeMetadata.filter(config.getQueryFieldsDatatypes().keySet()); + if (compositeMetadata != null && !compositeMetadata.isEmpty()) { + addOption(cfg, QueryOptions.COMPOSITE_METADATA, java.util.Base64.getEncoder().encodeToString(CompositeMetadata.toBytes(compositeMetadata)), + false); } String datatypeFilter = config.getDatatypeFilterAsString(); - addOption(cfg, QueryOptions.DATATYPE_FILTER, datatypeFilter, false); - try { - addOption(cfg, QueryOptions.CONTENT_EXPANSION_FIELDS, Joiner.on(',').join(metadataHelper.getContentFields(config.getDatatypeFilter())), false); - } catch (TableNotFoundException e) { - QueryException qe = new QueryException(DatawaveErrorCode.CONTENT_FIELDS_RETRIEVAL_ERROR, e); - throw new DatawaveQueryException(qe); - } + addOption(cfg, QueryOptions.CONTENT_EXPANSION_FIELDS, getContentExpansionFields(), false); if (config.isDebugMultithreadedSources()) { addOption(cfg, QueryOptions.DEBUG_MULTITHREADED_SOURCES, Boolean.toString(config.isDebugMultithreadedSources()), false); @@ -2307,8 +2350,8 @@ protected Future loadQueryIterator(final MetadataHelper metadat private void loadFields(IteratorSetting cfg, ShardQueryConfiguration config, boolean isPreload) throws DatawaveQueryException { try { Set compositeFields = metadataHelper.getCompositeToFieldMap(config.getDatatypeFilter()).keySet(); - Set indexedFields = metadataHelper.getIndexedFields(config.getDatatypeFilter()); - Set indexOnlyFields = metadataHelper.getIndexOnlyFields(config.getDatatypeFilter()); + Set indexedFields = getIndexedFields(); + Set indexOnlyFields = getIndexOnlyFields(); // only reduce the query fields if planning has occurred if (!isPreload && config.getReduceQueryFields()) { @@ -2368,7 +2411,7 @@ protected void configureSeekingOptions(IteratorSetting cfg, ShardQueryConfigurat * the shard config * @return a list of ivarator cache dirs */ - private List getShuffledIvaratoCacheDirConfigs(ShardQueryConfiguration config) { + public List getShuffledIvaratoCacheDirConfigs(ShardQueryConfiguration config) { List shuffledIvaratorCacheDirs = new ArrayList<>(); // group the ivarator cache dirs by their priority @@ -2392,8 +2435,6 @@ private List getShuffledIvaratoCacheDirConfigs(ShardQuer * the {@link MetadataHelper} * @param config * the {@link ShardQueryConfiguration} - * @param settings - * the {@link Query} * @param queryString * the raw query string * @param isFullTable @@ -2404,10 +2445,10 @@ private List getShuffledIvaratoCacheDirConfigs(ShardQuer * @throws DatawaveQueryException * if something goes wrong */ - protected IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQueryConfiguration config, Query settings, String queryString, - Boolean isFullTable, boolean isPreload) throws DatawaveQueryException { + protected IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQueryConfiguration config, String queryString, Boolean isFullTable, + boolean isPreload) throws DatawaveQueryException { if (null == settingFuture) - settingFuture = loadQueryIterator(metadataHelper, config, settings, queryString, isFullTable, isPreload); + settingFuture = loadQueryIterator(metadataHelper, config, isFullTable, isPreload); if (settingFuture.isDone()) try { return settingFuture.get(); @@ -2418,12 +2459,12 @@ protected IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQ return null; } - public static void configureTypeMappings(ShardQueryConfiguration config, IteratorSetting cfg, MetadataHelper metadataHelper, boolean compressMappings) + public void configureTypeMappings(ShardQueryConfiguration config, IteratorSetting cfg, MetadataHelper metadataHelper, boolean compressMappings) throws DatawaveQueryException { configureTypeMappings(config, cfg, metadataHelper, compressMappings, false); } - public static void configureTypeMappings(ShardQueryConfiguration config, IteratorSetting cfg, MetadataHelper metadataHelper, boolean compressMappings, + public void configureTypeMappings(ShardQueryConfiguration config, IteratorSetting cfg, MetadataHelper metadataHelper, boolean compressMappings, boolean isPreload) throws DatawaveQueryException { try { addOption(cfg, QueryOptions.QUERY_MAPPING_COMPRESS, Boolean.toString(compressMappings), false); @@ -2436,7 +2477,7 @@ public static void configureTypeMappings(ShardQueryConfiguration config, Iterato String nonIndexedTypes = QueryOptions.buildFieldNormalizerString(nonIndexedQueryFieldsDatatypes); String requiredAuthsString = metadataHelper.getUsersMetadataAuthorizationSubset(); - TypeMetadata typeMetadata = metadataHelper.getTypeMetadata(config.getDatatypeFilter()); + TypeMetadata typeMetadata = getTypeMetadata(); if (config.getReduceTypeMetadata() && !isPreload) { Set fieldsToRetain = ReduceFields.getQueryFields(config.getQueryTree()); @@ -2453,12 +2494,13 @@ public static void configureTypeMappings(ShardQueryConfiguration config, Iterato serializedTypeMetadata = QueryOptions.compressOption(serializedTypeMetadata, QueryOptions.UTF8); } } + addOption(cfg, QueryOptions.NON_INDEXED_DATATYPES, nonIndexedTypes, false); addOption(cfg, QueryOptions.TYPE_METADATA, serializedTypeMetadata, false); addOption(cfg, QueryOptions.TYPE_METADATA_AUTHS, requiredAuthsString, false); addOption(cfg, QueryOptions.METADATA_TABLE_NAME, config.getMetadataTableName(), false); - } catch (TableNotFoundException | IOException e) { + } catch (IOException e) { QueryException qe = new QueryException(DatawaveErrorCode.TYPE_MAPPING_CONFIG_ERROR, e); throw new DatawaveQueryException(qe); } @@ -2484,10 +2526,8 @@ public static void addOption(IteratorSetting cfg, String option, String value, b * the config * @param cfg * the iterator configuration - * @throws DatawaveQueryException - * for issues with running the query */ - protected void setCommonIteratorOptions(ShardQueryConfiguration config, IteratorSetting cfg) throws DatawaveQueryException { + protected void setCommonIteratorOptions(ShardQueryConfiguration config, IteratorSetting cfg) { // Applying filtering options, including classnames, whether applied to // post-processing or field index if (config.getUseFilters()) { @@ -2881,14 +2921,6 @@ protected ASTJexlScript timedSortQueryBeforeGlobalIndex(ShardQueryConfiguration }); } - private TypeMetadata getTypeMetadata() { - try { - return metadataHelper.getTypeMetadata(); - } catch (TableNotFoundException e) { - throw new DatawaveFatalQueryException("Could not get TypeMetadata"); - } - } - /** * Initializes the range stream, whether it is configured to be a different class than the Default Range stream or not. * @@ -3063,7 +3095,7 @@ protected Multimap> configureIndexedAndNormalizedFields(MetadataH Multimap> fieldToDatatypeMap = FetchDataTypesVisitor.fetchDataTypes(metadataHelper, config.getDatatypeFilter(), queryTree, false); try { - return configureIndexedAndNormalizedFields(fieldToDatatypeMap, metadataHelper.getIndexedFields(null), metadataHelper.getReverseIndexedFields(null), + return configureIndexedAndNormalizedFields(fieldToDatatypeMap, getIndexedFields(), metadataHelper.getReverseIndexedFields(null), metadataHelper.getAllNormalized(), config, queryTree); } catch (InstantiationException | IllegalAccessException | TableNotFoundException e) { throw new DatawaveFatalQueryException(e); @@ -3241,8 +3273,211 @@ public static Date getEndDateForIndexLookup(Date endDate) { @Override public void finalize() { - if (null != builderThread) { - builderThread.shutdown(); + if (null != executor) { + executor.shutdown(); + } + } + + protected CompositeMetadata getCompositeMetadata() { + if (compositeMetadata == null && compositeMetadataCallable != null) { + TraceStopwatch stopwatch = stageStopWatch.newStartedStopwatch(compositeMetadataCallable.stageName()); + try { + while (compositeMetadata == null) { + compositeMetadata = compositeMetadataFuture.get(concurrentTimeoutMillis, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Failed to fetch CompositeMetadata", e); + throw new DatawaveAsyncOperationException("Failed to fetch CompositeMetadata", e); + } finally { + stopwatch.stop(); + } + } + return compositeMetadata; + } + + protected TypeMetadata getTypeMetadata() { + if (typeMetadata == null && typeMetadataCallable != null) { + TraceStopwatch stopwatch = stageStopWatch.newStartedStopwatch(typeMetadataCallable.stageName()); + try { + while (typeMetadata == null) { + typeMetadata = typeMetadataFuture.get(concurrentTimeoutMillis, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Failed to fetch TypeMetadata", e); + throw new DatawaveAsyncOperationException("Failed to fetch TypeMetadata", e); + } finally { + stopwatch.stop(); + } + } + return typeMetadata; + } + + protected String getContentExpansionFields() { + if (contentExpansionFields == null && contentExpansionFieldsCallable != null) { + TraceStopwatch stopwatch = stageStopWatch.newStartedStopwatch(contentExpansionFieldsCallable.stageName()); + try { + while (contentExpansionFields == null) { + contentExpansionFields = contentExpansionFieldsFuture.get(concurrentTimeoutMillis, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Failed to fetch Content Expansion fields", e); + throw new DatawaveAsyncOperationException("Failed to fetch Content Expansion fields", e); + } finally { + stopwatch.stop(); + } + } + return contentExpansionFields; + } + + protected String getSerializedIvaratorDirs() { + if (serializedIvaratorDirs == null && ivaratorCacheDirCallable != null) { + TraceStopwatch stopwatch = stageStopWatch.newStartedStopwatch(ivaratorCacheDirCallable.stageName()); + try { + while (serializedIvaratorDirs == null) { + serializedIvaratorDirs = ivaratorCacheDirFuture.get(concurrentTimeoutMillis, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Failed to serialize ivarator cache dirs", e); + throw new DatawaveAsyncOperationException("Failed to serialize ivarator cache dirs", e); + } finally { + stopwatch.stop(); + } + } + return serializedIvaratorDirs; + } + + protected Set getIndexedFields() { + if (indexedFields == null && indexedFieldsCallable != null) { + indexedFields = getFieldSet(indexedFieldsCallable.stageName(), indexedFieldsFuture); + } + + return Objects.requireNonNullElse(indexedFields, Collections.emptySet()); + } + + protected Set getIndexOnlyFields() { + if (indexOnlyFields == null && indexOnlyFieldsCallable != null) { + indexOnlyFields = getFieldSet(indexOnlyFieldsCallable.stageName(), indexOnlyFieldsFuture); + } + + return Objects.requireNonNullElse(indexOnlyFields, Collections.emptySet()); + } + + protected Set getNonEventFields() { + if (nonEventFields == null && nonEventFieldsCallable != null) { + nonEventFields = getFieldSet(nonEventFieldsCallable.stageName(), nonEventFieldsFuture); + } + + return Objects.requireNonNullElse(nonEventFields, Collections.emptySet()); + } + + protected Set getTermFrequencyFields() { + if (termFrequencyFields == null && termFrequencyFieldsCallable != null) { + termFrequencyFields = getFieldSet(termFrequencyFieldsCallable.stageName(), termFrequencyFieldsFuture); + } + + return Objects.requireNonNullElse(termFrequencyFields, Collections.emptySet()); + } + + /** + * Common code to fetch a field set or throw an exception + * + * @param stageName + * the stage name associated with the callable + * @param future + * the future + * @return the field set + */ + protected Set getFieldSet(String stageName, Future> future) { + TraceStopwatch stopwatch = stageStopWatch.newStartedStopwatch(stageName); + try { + Set fields = null; + while (fields == null) { + fields = future.get(concurrentTimeoutMillis, TimeUnit.MILLISECONDS); + } + return fields; + } catch (ExecutionException | InterruptedException | TimeoutException e) { + log.error("Stage[" + stageName + "] failed", e); + throw new DatawaveAsyncOperationException("Stage[" + stageName + "] failed", e); + } finally { + stopwatch.stop(); + } + } + + /** + * Log the execution time for each stage and use the time spent waiting on each future to calculate the time saved via concurrent execution. + *

+ * If a particular stage spent a lot of time waiting on the result, consider refactoring query planning to avoid busy waiting. + */ + protected void logTimeSavedViaConcurrentExecution() { + // timers share stage names, so we can compare the time spent executing the task with + // the time spent waiting on the future to calculate time saved + + long totalExecution = 0L; + long totalGetFuture = 0L; + long totalTimeSaved = 0L; + + log.info("Execution\tGetFuture\tTimeSaved\t\tStage"); + for (String stageName : getStageNames()) { + TraceStopwatch future = futureStopWatch.get(stageName); + TraceStopwatch stage = stageStopWatch.get(stageName); + + if (future == null) { + continue; + } + + long execution = future.elapsed(TimeUnit.NANOSECONDS); + long get; + if (stage == null) { + get = 0; // handles the case where a task was submitted but the result was never used + } else { + get = stage.elapsed(TimeUnit.NANOSECONDS); + } + + long saved = execution - get; + + totalExecution += execution; + totalGetFuture += get; + totalTimeSaved += saved; + + log.info(execution + "\t\t" + get + "\t\t" + saved + "\t\t" + stageName); } + + log.info("Total concurrent execution time: " + TimeUnit.NANOSECONDS.toMillis(totalExecution) + " ms"); + log.info("Total get future time: " + TimeUnit.NANOSECONDS.toMillis(totalGetFuture) + " ms"); + log.info("Total time saved: " + TimeUnit.NANOSECONDS.toMillis(totalTimeSaved) + " ms"); + } + + /** + * Collect the stage names for all {@link AbstractQueryPlannerCallable}s. + * + * @return a list of stage names + */ + protected List getStageNames() { + List names = new ArrayList<>(); + names.add(compositeMetadataCallable.stageName()); + names.add(typeMetadataCallable.stageName()); + names.add(contentExpansionFieldsCallable.stageName()); + names.add(ivaratorCacheDirCallable.stageName()); + names.add(indexedFieldsCallable.stageName()); + names.add(indexOnlyFieldsCallable.stageName()); + names.add(nonEventFieldsCallable.stageName()); + names.add(termFrequencyFieldsCallable.stageName()); + return names; + } + + public void setLogConcurrentStageExecution(boolean logConcurrentStageExecution) { + this.logConcurrentStageExecution = logConcurrentStageExecution; + } + + public boolean getLogConcurrentStageExecution() { + return logConcurrentStageExecution; + } + + public int getConcurrentTimeoutMillis() { + return concurrentTimeoutMillis; + } + + public void setConcurrentTimeoutMillis(int concurrentTimeoutMillis) { + this.concurrentTimeoutMillis = concurrentTimeoutMillis; } } diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/FacetedQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/FacetedQueryPlanner.java index 008cfae747a..218cbaaa8d3 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/FacetedQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/FacetedQueryPlanner.java @@ -51,15 +51,15 @@ public FacetedQueryPlanner(final FacetedConfiguration config) { } @Override - public IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQueryConfiguration config, Query settings, String queryString, - Boolean isFullTable, boolean isPreload) throws DatawaveQueryException { + public IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQueryConfiguration config, String queryString, Boolean isFullTable, + boolean isPreload) throws DatawaveQueryException { if (isFullTable) { QueryException qe = new QueryException(DatawaveErrorCode.FULL_TABLE_SCAN_DISALLOWED); throw new FullTableScansDisallowedException(qe); } - IteratorSetting cfg = super.getQueryIterator(metadataHelper, config, settings, queryString, isFullTable, isPreload); + IteratorSetting cfg = super.getQueryIterator(metadataHelper, config, queryString, isFullTable, isPreload); if (!usePrecomputedFacets) cfg.setIteratorClass(DynamicFacetIterator.class.getName()); else { diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java index 9ead8720514..2a29c0c8ebb 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java @@ -33,14 +33,14 @@ public IndexQueryPlanner() { } @Override - public IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQueryConfiguration config, Query settings, String queryString, - Boolean isFullTable, boolean isPreload) throws DatawaveQueryException { + public IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQueryConfiguration config, String queryString, Boolean isFullTable, + boolean isPreload) throws DatawaveQueryException { if (isFullTable) { QueryException qe = new QueryException(DatawaveErrorCode.FULL_TABLE_SCAN_DISALLOWED); throw new FullTableScansDisallowedException(qe); } - IteratorSetting cfg = super.getQueryIterator(metadataHelper, config, settings, queryString, isFullTable, isPreload); + IteratorSetting cfg = super.getQueryIterator(metadataHelper, config, queryString, isFullTable, isPreload); if (null == cfg) { try { cfg = settingFuture.get(); diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/async/AbstractQueryPlannerCallable.java b/warehouse/query-core/src/main/java/datawave/query/planner/async/AbstractQueryPlannerCallable.java new file mode 100644 index 00000000000..c446f3b41b3 --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/planner/async/AbstractQueryPlannerCallable.java @@ -0,0 +1,34 @@ +package datawave.query.planner.async; + +import java.util.concurrent.Callable; + +import datawave.query.util.QueryStopwatch; + +/** + * Generic interface that allows stage names to be associated with various tasks. Extending classes may pass in a {@link QueryStopwatch} to capture timing + * details of the operation. + * + * @param + * the object type + */ +public abstract class AbstractQueryPlannerCallable implements Callable { + + protected QueryStopwatch timer; + + /** + * Constructor that supports timing operations + * + * @param timer + * a stop watch + */ + protected AbstractQueryPlannerCallable(QueryStopwatch timer) { + this.timer = timer; + } + + /** + * The stage name used for one or more timers + * + * @return the stage name + */ + public abstract String stageName(); +} diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchCompositeMetadata.java b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchCompositeMetadata.java new file mode 100644 index 00000000000..b56e3d0a6f0 --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchCompositeMetadata.java @@ -0,0 +1,49 @@ +package datawave.query.planner.async; + +import java.util.Set; + +import datawave.query.composite.CompositeMetadata; +import datawave.query.util.MetadataHelper; +import datawave.query.util.QueryStopwatch; +import datawave.util.time.TraceStopwatch; + +/** + * A wrapper around {@link MetadataHelper#getCompositeMetadata(Set)} that allows for concurrent execution of expensive planner steps. + */ +public class FetchCompositeMetadata extends AbstractQueryPlannerCallable { + + private final MetadataHelper helper; + private final Set datatypes; + + private TraceStopwatch stopwatch; + + public FetchCompositeMetadata(MetadataHelper helper, Set datatypes) { + this(null, helper, datatypes); + } + + public FetchCompositeMetadata(QueryStopwatch timer, MetadataHelper helper, Set datatypes) { + super(timer); + this.helper = helper; + this.datatypes = datatypes; + } + + @Override + public CompositeMetadata call() throws Exception { + if (timer != null) { + stopwatch = timer.newStartedStopwatch(stageName()); + } + + CompositeMetadata compositeMetadata = helper.getCompositeMetadata(datatypes); + + if (stopwatch != null) { + stopwatch.stop(); + } + + return compositeMetadata; + } + + @Override + public String stageName() { + return "Fetch CompositeMetadata"; + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchContentExpansionFields.java b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchContentExpansionFields.java new file mode 100644 index 00000000000..5f10a3747a8 --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchContentExpansionFields.java @@ -0,0 +1,62 @@ +package datawave.query.planner.async; + +import java.util.Set; +import java.util.concurrent.Callable; + +import org.apache.accumulo.core.client.TableNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; + +import datawave.query.util.MetadataHelper; +import datawave.query.util.QueryStopwatch; +import datawave.util.time.TraceStopwatch; + +/** + * A wrapper around the {@link MetadataHelper#getContentFields(Set)} method that allows for concurrent execution of expensive planner steps. + */ +public class FetchContentExpansionFields extends AbstractQueryPlannerCallable { + + private static final Logger log = LoggerFactory.getLogger(FetchContentExpansionFields.class); + + private final MetadataHelper helper; + private final Set datatypes; + + private TraceStopwatch stopwatch; + + public FetchContentExpansionFields(MetadataHelper helper, Set datatypes) { + this(null, helper, datatypes); + } + + public FetchContentExpansionFields(QueryStopwatch timer, MetadataHelper helper, Set datatypes) { + super(timer); + this.helper = helper; + this.datatypes = datatypes; + } + + @Override + public String call() { + try { + if (timer != null) { + stopwatch = timer.newStartedStopwatch(stageName()); + } + + String fields = Joiner.on(',').join(helper.getContentFields(datatypes)); + + if (stopwatch != null) { + stopwatch.stop(); + } + + return fields; + } catch (TableNotFoundException e) { + log.error("Failed to fetch content expansion fields"); + throw new RuntimeException(e); + } + } + + @Override + public String stageName() { + return "Fetch ContentExpansionFields"; + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchIndexOnlyFields.java b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchIndexOnlyFields.java new file mode 100644 index 00000000000..fbe3f2cc03b --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchIndexOnlyFields.java @@ -0,0 +1,46 @@ +package datawave.query.planner.async; + +import java.util.Set; + +import datawave.query.util.MetadataHelper; +import datawave.query.util.QueryStopwatch; +import datawave.util.time.TraceStopwatch; + +public class FetchIndexOnlyFields extends AbstractQueryPlannerCallable> { + + private final MetadataHelper helper; + private final Set datatypes; + + private TraceStopwatch stopwatch; + + public FetchIndexOnlyFields(MetadataHelper helper, Set datatypes) { + this(null, helper, datatypes); + } + + public FetchIndexOnlyFields(QueryStopwatch timer, MetadataHelper helper, Set datatypes) { + super(timer); + this.helper = helper; + this.datatypes = datatypes; + } + + @Override + public Set call() throws Exception { + if (timer != null) { + stopwatch = timer.newStartedStopwatch(stageName()); + } + + Set indexOnlyFields = helper.getIndexOnlyFields(datatypes); + + if (stopwatch != null) { + stopwatch.stop(); + } + + return indexOnlyFields; + } + + @Override + public String stageName() { + return "Fetch IndexOnly Fields"; + } + +} diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchIndexedFields.java b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchIndexedFields.java new file mode 100644 index 00000000000..520d99d0361 --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchIndexedFields.java @@ -0,0 +1,46 @@ +package datawave.query.planner.async; + +import java.util.Set; + +import datawave.query.util.MetadataHelper; +import datawave.query.util.QueryStopwatch; +import datawave.util.time.TraceStopwatch; + +public class FetchIndexedFields extends AbstractQueryPlannerCallable> { + + private final MetadataHelper helper; + private final Set datatypes; + + private TraceStopwatch stopwatch; + + public FetchIndexedFields(MetadataHelper helper, Set datatypes) { + this(null, helper, datatypes); + } + + public FetchIndexedFields(QueryStopwatch timer, MetadataHelper helper, Set datatypes) { + super(timer); + this.helper = helper; + this.datatypes = datatypes; + } + + @Override + public Set call() throws Exception { + if (timer != null) { + stopwatch = timer.newStartedStopwatch(stageName()); + } + + Set indexedFields = helper.getIndexedFields(datatypes); + + if (stopwatch != null) { + stopwatch.stop(); + } + + return indexedFields; + } + + @Override + public String stageName() { + return "Fetch Indexed Fields"; + } + +} diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchNonEventFields.java b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchNonEventFields.java new file mode 100644 index 00000000000..7a2e8876829 --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchNonEventFields.java @@ -0,0 +1,46 @@ +package datawave.query.planner.async; + +import java.util.Set; + +import datawave.query.util.MetadataHelper; +import datawave.query.util.QueryStopwatch; +import datawave.util.time.TraceStopwatch; + +public class FetchNonEventFields extends AbstractQueryPlannerCallable> { + + private final MetadataHelper helper; + private final Set datatypes; + + private TraceStopwatch stopwatch; + + public FetchNonEventFields(MetadataHelper helper, Set datatypes) { + this(null, helper, datatypes); + } + + public FetchNonEventFields(QueryStopwatch timer, MetadataHelper helper, Set datatypes) { + super(timer); + this.helper = helper; + this.datatypes = datatypes; + } + + @Override + public Set call() throws Exception { + if (timer != null) { + stopwatch = timer.newStartedStopwatch(stageName()); + } + + Set nonEventFields = helper.getNonEventFields(datatypes); + + if (stopwatch != null) { + stopwatch.stop(); + } + + return nonEventFields; + } + + @Override + public String stageName() { + return "Fetch NonEvent Fields"; + } + +} diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchTermFrequencyFields.java b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchTermFrequencyFields.java new file mode 100644 index 00000000000..942268f618e --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchTermFrequencyFields.java @@ -0,0 +1,45 @@ +package datawave.query.planner.async; + +import java.util.Set; + +import datawave.query.util.MetadataHelper; +import datawave.query.util.QueryStopwatch; +import datawave.util.time.TraceStopwatch; + +public class FetchTermFrequencyFields extends AbstractQueryPlannerCallable> { + + private final MetadataHelper helper; + private final Set datatypes; + + private TraceStopwatch stopwatch; + + public FetchTermFrequencyFields(MetadataHelper helper, Set datatypes) { + this(null, helper, datatypes); + } + + public FetchTermFrequencyFields(QueryStopwatch timer, MetadataHelper helper, Set datatypes) { + super(timer); + this.helper = helper; + this.datatypes = datatypes; + } + + @Override + public Set call() throws Exception { + if (timer != null) { + stopwatch = timer.newStartedStopwatch(stageName()); + } + + Set termFrequencyFields = helper.getTermFrequencyFields(datatypes); + + if (stopwatch != null) { + stopwatch.stop(); + } + + return termFrequencyFields; + } + + @Override + public String stageName() { + return "Fetch TermFrequency Fields"; + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchTypeMetadata.java b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchTypeMetadata.java new file mode 100644 index 00000000000..2c884e7af4e --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/planner/async/FetchTypeMetadata.java @@ -0,0 +1,49 @@ +package datawave.query.planner.async; + +import java.util.Set; + +import datawave.query.util.MetadataHelper; +import datawave.query.util.QueryStopwatch; +import datawave.query.util.TypeMetadata; +import datawave.util.time.TraceStopwatch; + +/** + * Wrapper around {@link MetadataHelper#getTypeMetadata(Set)} that allows for concurrent execution of expensive planner steps. + */ +public class FetchTypeMetadata extends AbstractQueryPlannerCallable { + + private final MetadataHelper helper; + private final Set datatypes; + + private TraceStopwatch stopwatch; + + public FetchTypeMetadata(MetadataHelper helper, Set datatypes) { + this(null, helper, datatypes); + } + + public FetchTypeMetadata(QueryStopwatch timer, MetadataHelper helper, Set datatypes) { + super(timer); + this.helper = helper; + this.datatypes = datatypes; + } + + @Override + public TypeMetadata call() throws Exception { + if (timer != null) { + stopwatch = timer.newStartedStopwatch(stageName()); + } + + TypeMetadata typeMetadata = helper.getTypeMetadata(datatypes); + + if (stopwatch != null) { + stopwatch.stop(); + } + + return typeMetadata; + } + + @Override + public String stageName() { + return "Fetch TypeMetadata"; + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/async/SerializeIvaratorCacheDirs.java b/warehouse/query-core/src/main/java/datawave/query/planner/async/SerializeIvaratorCacheDirs.java new file mode 100644 index 00000000000..40041e35c7a --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/planner/async/SerializeIvaratorCacheDirs.java @@ -0,0 +1,48 @@ +package datawave.query.planner.async; + +import datawave.query.config.ShardQueryConfiguration; +import datawave.query.iterator.ivarator.IvaratorCacheDirConfig; +import datawave.query.planner.DefaultQueryPlanner; +import datawave.query.util.QueryStopwatch; +import datawave.util.time.TraceStopwatch; + +/** + * Wrapper around Ivarator json serialization that allows for asynchronous execution. + */ +public class SerializeIvaratorCacheDirs extends AbstractQueryPlannerCallable { + + private final DefaultQueryPlanner planner; + private final ShardQueryConfiguration config; + + private TraceStopwatch stopwatch; + + public SerializeIvaratorCacheDirs(DefaultQueryPlanner planner, ShardQueryConfiguration config) { + this(null, planner, config); + } + + public SerializeIvaratorCacheDirs(QueryStopwatch timer, DefaultQueryPlanner planner, ShardQueryConfiguration config) { + super(timer); + this.planner = planner; + this.config = config; + } + + @Override + public String call() throws Exception { + if (timer != null) { + stopwatch = timer.newStartedStopwatch(stageName()); + } + + String serialized = IvaratorCacheDirConfig.toJson(planner.getShuffledIvaratoCacheDirConfigs(config)); + + if (stopwatch != null) { + stopwatch.stop(); + } + + return serialized; + } + + @Override + public String stageName() { + return "Serialize IvaratorCacheDirs"; + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/util/QueryStopwatch.java b/warehouse/query-core/src/main/java/datawave/query/util/QueryStopwatch.java index f30db8b073b..2a64d8c9086 100644 --- a/warehouse/query-core/src/main/java/datawave/query/util/QueryStopwatch.java +++ b/warehouse/query-core/src/main/java/datawave/query/util/QueryStopwatch.java @@ -8,6 +8,7 @@ import java.util.ArrayDeque; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; @@ -28,8 +29,9 @@ * */ public class QueryStopwatch { - public static final String NEWLINE = "\n", INDENT = " "; - protected ArrayDeque> watches = Queues.newArrayDeque(); + public static final String NEWLINE = "\n"; + public static final String INDENT = " "; + protected final ArrayDeque> watches = Queues.newArrayDeque(); /** * Creates a new Stopwatch for use but does not start it @@ -43,7 +45,9 @@ private TraceStopwatch newStopwatch(String header) { TraceStopwatch sw = new TraceStopwatch(header); - watches.add(Maps.immutableEntry(header, sw)); + synchronized (watches) { + watches.add(Maps.immutableEntry(header, sw)); + } return sw; } @@ -55,7 +59,7 @@ public TraceStopwatch newStartedStopwatch(String header) { return sw; } - public TraceStopwatch peek() { + public synchronized TraceStopwatch peek() { Entry entry = watches.peekLast(); if (null == entry) { NotFoundQueryException qe = new NotFoundQueryException(DatawaveErrorCode.STOPWATCH_MISSING); @@ -65,13 +69,31 @@ public TraceStopwatch peek() { return entry.getValue(); } + /** + * Get the stopwatch associated with the stage name, or null if no such stopwatch exists + * + * @param stageName + * the stage name + * @return the stopwatch, or null if no such stopwatch exists + */ + public TraceStopwatch get(String stageName) { + synchronized (watches) { + for (Map.Entry entry : watches) { + if (entry.getKey().equals(stageName)) { + return entry.getValue(); + } + } + } + return null; + } + public String summarize() { List logLines = summarizeAsList(); return Joiner.on('\n').join(logLines); } - public List summarizeAsList() { + public synchronized List summarizeAsList() { if (this.watches.isEmpty()) { return Collections.emptyList(); } @@ -109,7 +131,9 @@ public List summarizeAsList() { } public void appendTimers(QueryStopwatch queryStopwatch) { - this.watches.addAll(queryStopwatch.watches); + synchronized (watches) { + this.watches.addAll(queryStopwatch.watches); + } } protected String formatMillis(long elapsedMillis) { diff --git a/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java b/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java index 18d255f1808..0805be9d0b4 100644 --- a/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/QueryPlanTest.java @@ -196,7 +196,7 @@ public void planInMetricsAfterTableNotFoundExceptionDefaultQueryPlannerNE() thro runTestQuery(Collections.emptyList(), query, this.dataManager.getShardStartEndDate()[0], this.dataManager.getShardStartEndDate()[1], Collections.emptyMap()); fail("Expected DatawaveFatalQueryException."); - } catch (DatawaveFatalQueryException e) { + } catch (RuntimeException e) { assertEquals(expectedPlan, metric.getPlan()); } } @@ -212,7 +212,7 @@ public void planInMetricsAfterTableNotFoundExceptionDefaultQueryPlannerNotEq() t runTestQuery(Collections.emptyList(), query, this.dataManager.getShardStartEndDate()[0], this.dataManager.getShardStartEndDate()[1], Collections.emptyMap()); fail("Expected DatawaveFatalQueryException."); - } catch (DatawaveFatalQueryException e) { + } catch (RuntimeException e) { assertEquals(expectedPlan, metric.getPlan()); } } diff --git a/warehouse/query-core/src/test/java/datawave/query/tables/IndexQueryLogicTest.java b/warehouse/query-core/src/test/java/datawave/query/tables/IndexQueryLogicTest.java index 16887ff25c1..98f74f942f2 100644 --- a/warehouse/query-core/src/test/java/datawave/query/tables/IndexQueryLogicTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/tables/IndexQueryLogicTest.java @@ -24,7 +24,6 @@ import datawave.marking.MarkingFunctions; import datawave.query.Constants; import datawave.query.QueryTestTableHelper; -import datawave.query.planner.DefaultQueryPlanner; import datawave.query.planner.FederatedQueryPlanner; import datawave.query.testframework.AbstractFunctionalQuery; import datawave.query.testframework.AccumuloSetup;