Skip to content

Commit

Permalink
Offload potentially expensive query planning steps to async tasks. (#…
Browse files Browse the repository at this point in the history
…2575)

* Offload potentially expensive query planning steps to async tasks.

* fix doc build

* combine similar try-catch blocks

* log exception with error message, throw dedicated exception for async operations

* Extract common code for getting field sets to method

---------

Co-authored-by: Moon Moon <[email protected]>
  • Loading branch information
apmoriarty and mineralntl authored Nov 13, 2024
1 parent 7329cff commit 8b10852
Show file tree
Hide file tree
Showing 17 changed files with 868 additions and 125 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package datawave.query.exceptions;

import datawave.query.planner.DefaultQueryPlanner;

/**
* An exception thrown when the {@link DefaultQueryPlanner} encounters a problem during an async operation like fetching field sets or serializing iterator
* options in another thread
*/
public class DatawaveAsyncOperationException extends RuntimeException {

private static final long serialVersionUID = -5455973957749708049L;

public DatawaveAsyncOperationException() {
super();
}

public DatawaveAsyncOperationException(String message, Throwable cause) {
super(message, cause);
}

public DatawaveAsyncOperationException(String message) {
super(message);
}

public DatawaveAsyncOperationException(Throwable cause) {
super(cause);
}

protected DatawaveAsyncOperationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datawave.query.index.lookup;

import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
Expand All @@ -16,11 +17,13 @@
import org.apache.commons.jexl3.parser.JexlNode;

import com.google.common.base.Function;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Multimap;

import datawave.data.type.Type;
import datawave.query.CloseableIterable;
import datawave.query.config.ShardQueryConfiguration;
import datawave.query.exceptions.DatawaveQueryException;
import datawave.query.index.lookup.IndexStream.StreamContext;
import datawave.query.iterator.FieldIndexOnlyQueryIterator;
import datawave.query.iterator.QueryOptions;
Expand Down Expand Up @@ -65,7 +68,7 @@ public CloseableIterable<QueryPlan> streamPlans(JexlNode node) {
DefaultQueryPlanner.addOption(cfg, QueryOptions.DATATYPE_FILTER, config.getDatatypeFilterAsString(), false);
DefaultQueryPlanner.addOption(cfg, QueryOptions.END_TIME, Long.toString(config.getEndDate().getTime()), false);

DefaultQueryPlanner.configureTypeMappings(config, cfg, metadataHelper, true);
configureTypeMappings(config, cfg, metadataHelper);

scanner.setRanges(Collections.singleton(rangeForTerm(null, null, config)));

Expand Down Expand Up @@ -97,7 +100,7 @@ public CloseableIterable<QueryPlan> streamPlans(JexlNode node) {

}

} catch (TableNotFoundException | DatawaveQueryException e) {
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
} finally {
// shut down the executor as all threads have completed
Expand Down Expand Up @@ -134,4 +137,29 @@ public QueryPlan apply(Entry<Key,Value> entry) {
// @formatter:on
}
}

/**
* Lift and shift from DefaultQueryPlanner to avoid reliance on static methods
*/
private void configureTypeMappings(ShardQueryConfiguration config, IteratorSetting cfg, MetadataHelper metadataHelper) {
DefaultQueryPlanner.addOption(cfg, QueryOptions.QUERY_MAPPING_COMPRESS, Boolean.toString(true), false);

Multimap<String,Type<?>> nonIndexedQueryFieldsDatatypes = HashMultimap.create(config.getQueryFieldsDatatypes());
nonIndexedQueryFieldsDatatypes.keySet().removeAll(config.getIndexedFields());
String nonIndexedTypes = QueryOptions.buildFieldNormalizerString(nonIndexedQueryFieldsDatatypes);
DefaultQueryPlanner.addOption(cfg, QueryOptions.NON_INDEXED_DATATYPES, nonIndexedTypes, false);

try {
String serializedTypeMetadata = metadataHelper.getTypeMetadata(config.getDatatypeFilter()).toString();
DefaultQueryPlanner.addOption(cfg, QueryOptions.TYPE_METADATA, serializedTypeMetadata, false);

String requiredAuthsString = metadataHelper.getUsersMetadataAuthorizationSubset();
requiredAuthsString = QueryOptions.compressOption(requiredAuthsString, QueryOptions.UTF8);
DefaultQueryPlanner.addOption(cfg, QueryOptions.TYPE_METADATA_AUTHS, requiredAuthsString, false);
} catch (TableNotFoundException | IOException e) {
throw new RuntimeException(e);
}

DefaultQueryPlanner.addOption(cfg, QueryOptions.METADATA_TABLE_NAME, config.getMetadataTableName(), false);
}
}
Loading

0 comments on commit 8b10852

Please sign in to comment.