diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/BaseIndexStream.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/BaseIndexStream.java index 1d2b223a52d..d425155df69 100644 --- a/warehouse/query-core/src/main/java/datawave/query/index/lookup/BaseIndexStream.java +++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/BaseIndexStream.java @@ -8,20 +8,20 @@ import com.google.common.collect.Iterators; import datawave.query.jexl.visitors.JexlStringBuildingVisitor; -import datawave.query.tables.RangeStreamScanner; +import datawave.query.tables.ScannerSession; import datawave.query.util.Tuple2; /** * Provides a core set of variables for the ScannerStream, Union, and Intersection. - * + *

* A reference to the underlying {@link datawave.query.tables.RangeStreamScanner} is required for seeking. - * + *

* Note that the BaseIndexStream does not implement the {@link IndexStream#seek(String)} method. Inheriting classes are responsible for determining the correct * implementation. */ public abstract class BaseIndexStream implements IndexStream { - protected RangeStreamScanner rangeStreamScanner; + protected ScannerSession scannerSession; protected EntryParser entryParser; @@ -39,8 +39,8 @@ public abstract class BaseIndexStream implements IndexStream { /** * This constructor is used by BaseIndexStreams that have a backing range stream scanner. I.e., this will actually scan the global index * - * @param rangeStreamScanner - * a range stream scanner + * @param scannerSession + * a scanner session, likely a RangeStreamScanner * @param entryParser * an entry parser * @param node @@ -50,18 +50,18 @@ public abstract class BaseIndexStream implements IndexStream { * @param debugDelegate * a delegate used for debugging (not in use) */ - public BaseIndexStream(RangeStreamScanner rangeStreamScanner, EntryParser entryParser, JexlNode node, StreamContext context, IndexStream debugDelegate) { - this.rangeStreamScanner = Preconditions.checkNotNull(rangeStreamScanner); + public BaseIndexStream(ScannerSession scannerSession, EntryParser entryParser, JexlNode node, StreamContext context, IndexStream debugDelegate) { + this.scannerSession = Preconditions.checkNotNull(scannerSession); this.entryParser = Preconditions.checkNotNull(entryParser); this.node = node; - this.backingIter = Iterators.transform(this.rangeStreamScanner, this.entryParser); + this.backingIter = Iterators.transform(this.scannerSession, this.entryParser); this.context = context; this.debugDelegate = debugDelegate; } /** * This constructor is for terms that do not have a range stream scanner. - * + *

* This is used by the SHARDS_AND_DAYS hint and terms that do not hit anything in the global index (delayed terms) * * @param iterator @@ -74,7 +74,7 @@ public BaseIndexStream(RangeStreamScanner rangeStreamScanner, EntryParser entryP * delegate used for debugging (not in use) */ public BaseIndexStream(Iterator> iterator, JexlNode node, StreamContext context, IndexStream debugDelegate) { - this.rangeStreamScanner = null; + this.scannerSession = null; this.entryParser = null; this.node = node; this.backingIter = Preconditions.checkNotNull(iterator); @@ -91,10 +91,10 @@ public BaseIndexStream() { * Reset the backing iterator after a seek. State must stay in sync with changes to the RangeStreamScanner. */ public void resetBackingIterator() { - if (rangeStreamScanner != null && entryParser != null) { + if (scannerSession != null && entryParser != null) { this.peekedElement = null; this.hasPeeked = false; - this.backingIter = Iterators.transform(this.rangeStreamScanner, this.entryParser); + this.backingIter = Iterators.transform(this.scannerSession, this.entryParser); } } diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java index 4fd46d05dac..6c811456f5a 100644 --- a/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java +++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/RangeStream.java @@ -152,8 +152,8 @@ public class RangeStream extends BaseVisitor implements QueryPlanStream { protected NumShardFinder numShardFinder; - private int maxLinesToPrint = -1; - private int linesPrinted = 0; + protected int maxLinesToPrint = -1; + protected int linesPrinted = 0; public RangeStream(ShardQueryConfiguration config, ScannerFactory scanners, MetadataHelper metadataHelper) { this.config = config; @@ -718,7 +718,7 @@ public Object visit(ASTFalseNode node, Object data) { return ScannerStream.noData(node); } - private boolean isUnOrNotFielded(JexlNode node) { + protected boolean isUnOrNotFielded(JexlNode node) { List identifiers = JexlASTHelper.getIdentifiers(node); for (ASTIdentifier identifier : identifiers) { if (identifier.getName().equals(Constants.ANY_FIELD) || identifier.getName().equals(Constants.NO_FIELD)) { diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/ScannerStream.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/ScannerStream.java index 2440c7dc387..e789ecd0138 100644 --- a/warehouse/query-core/src/main/java/datawave/query/index/lookup/ScannerStream.java +++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/ScannerStream.java @@ -7,7 +7,7 @@ import com.google.common.collect.PeekingIterator; -import datawave.query.tables.RangeStreamScanner; +import datawave.query.tables.ScannerSession; import datawave.query.util.Tuple2; /** @@ -17,12 +17,12 @@ */ public class ScannerStream extends BaseIndexStream { - private ScannerStream(RangeStreamScanner scanSession, EntryParser entryParser, StreamContext ctx, JexlNode currNode, IndexStream debugDelegate) { + private ScannerStream(ScannerSession scanSession, EntryParser entryParser, StreamContext ctx, JexlNode currNode, IndexStream debugDelegate) { super(scanSession, entryParser, currNode, ctx, debugDelegate); } private ScannerStream(BaseIndexStream itr, StreamContext ctx, JexlNode currNode) { - this(itr.rangeStreamScanner, itr.entryParser, ctx, currNode, null); + this(itr.scannerSession, itr.entryParser, ctx, currNode, null); } private ScannerStream(Iterator> iterator, StreamContext context, JexlNode node, IndexStream debugDelegate) { @@ -74,24 +74,24 @@ public static ScannerStream initialized(Iterator> itr, return new ScannerStream(itr, StreamContext.INITIALIZED, currNode); } - public static ScannerStream initialized(RangeStreamScanner scannerStream, EntryParser entryParser, JexlNode currNode) { + public static ScannerStream initialized(ScannerSession scannerStream, EntryParser entryParser, JexlNode currNode) { return new ScannerStream(scannerStream, entryParser, StreamContext.INITIALIZED, currNode, null); } /** * Seek this ScannerStream to the specified shard. - * + *

* If no underlying RangeStreamScanner exists then the seek operation is delegated to {@link #seekByNext(String)}. * * @param seekShard * the shard to seek to. - * @return the next element great than or equal to the seek shard, or null if all elements were exhausted. + * @return the next element greater than or equal to the seek shard, or null if all elements were exhausted. */ @Override public String seek(String seekShard) { - if (rangeStreamScanner != null) { + if (scannerSession != null) { - String seekedShard = rangeStreamScanner.seek(seekShard); + String seekedShard = scannerSession.seek(seekShard); if (seekedShard == null) { // If the underlying RangeStreamScanner returns null we are done. this.peekedElement = null; diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/TruncatedIndexIterator.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/TruncatedIndexIterator.java new file mode 100644 index 00000000000..a7a5aab49ab --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/TruncatedIndexIterator.java @@ -0,0 +1,103 @@ +package datawave.query.index.lookup; + +import java.io.IOException; +import java.util.BitSet; +import java.util.Collection; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An iterator that can scan a global index of truncated keys in the form: + * + *

+ *     value FIELD:yyyyMMdd0x00datatype VIZ (bitset value)
+ * 
+ * + * This iterator will aggregate bitsets for a given day across all datatypes and visibilities, returning the combined bitset as a single value + */ +public class TruncatedIndexIterator implements SortedKeyValueIterator { + + private static final Logger log = LoggerFactory.getLogger(TruncatedIndexIterator.class); + + protected SortedKeyValueIterator source; + protected Key tk; + protected Value tv = null; + protected BitSet bitset = null; + + @Override + public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { + this.source = source; + } + + @Override + public boolean hasTop() { + return tk != null; + } + + @Override + public void next() throws IOException { + this.tk = null; + this.tv = null; + this.bitset = null; + if (source.hasTop()) { + tk = source.getTopKey(); + + String cq = tk.getColumnQualifier().toString(); + String date = cq.substring(0, cq.indexOf('\u0000')); + + bitset = BitSet.valueOf(source.getTopValue().get()); + source.next(); + + while (source.hasTop() && sameDay(date, source.getTopKey())) { + BitSet candidate = BitSet.valueOf(source.getTopValue().get()); + bitset.or(candidate); + source.next(); + } + + tv = new Value(bitset.toByteArray()); + } + } + + protected boolean sameDay(String date, Key next) { + String nextCQ = next.getColumnQualifier().toString(); + return nextCQ.startsWith(date); + } + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) throws IOException { + if (!range.isStartKeyInclusive()) { + Key start = range.getStartKey(); + Key next = new Key(start.getRow(), start.getColumnFamily(), new Text(start.getColumnQualifier() + "\u0000\uFFFF")); + range = new Range(next, true, range.getEndKey(), range.isEndKeyInclusive()); + } + + source.seek(range, columnFamilies, inclusive); + next(); + } + + @Override + public Key getTopKey() { + return tk; + } + + @Override + public Value getTopValue() { + return tv; + } + + @Override + public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { + TruncatedIndexIterator copy = new TruncatedIndexIterator(); + copy.source = source.deepCopy(env); + return copy; + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/index/lookup/TruncatedRangeStream.java b/warehouse/query-core/src/main/java/datawave/query/index/lookup/TruncatedRangeStream.java new file mode 100644 index 00000000000..1a60fbcb3cb --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/index/lookup/TruncatedRangeStream.java @@ -0,0 +1,107 @@ +package datawave.query.index.lookup; + +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.commons.jexl3.parser.ASTEQNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import datawave.query.config.ShardQueryConfiguration; +import datawave.query.iterator.QueryOptions; +import datawave.query.jexl.JexlASTHelper; +import datawave.query.tables.ScannerFactory; +import datawave.query.tables.TruncatedIndexScanner; +import datawave.query.util.MetadataHelper; +import datawave.security.util.AuthorizationsMinimizer; + +/** + * A {@link QueryPlanStream} configured to run against a truncated shard index + */ +public class TruncatedRangeStream extends RangeStream { + + private static final Logger log = LoggerFactory.getLogger(TruncatedRangeStream.class); + + public TruncatedRangeStream(ShardQueryConfiguration config, ScannerFactory scanners, MetadataHelper metadataHelper) { + super(config, scanners, metadataHelper); + } + + @Override + public ScannerStream visit(ASTEQNode node, Object data) { + + if (isUnOrNotFielded(node)) { + return ScannerStream.noData(node); + } + + // We are looking for identifier = literal + JexlASTHelper.IdentifierOpLiteral op = JexlASTHelper.getIdentifierOpLiteral(node); + if (op == null) { + return ScannerStream.delayed(node); + } + + final String fieldName = op.deconstructIdentifier(); + + // Null literals cannot be resolved against the index. + if (op.getLiteralValue() == null) { + return ScannerStream.delayed(node); + } + + // toString of String returns the String + String literal = op.getLiteralValue().toString(); + + if (QueryOptions.DEFAULT_DATATYPE_FIELDNAME.equals(fieldName)) { + return ScannerStream.delayed(node); + } + + // Check if field is not indexed + if (!isIndexed(fieldName, config.getIndexedFields())) { + try { + if (this.getAllFieldsFromHelper().contains(fieldName)) { + if (maxLinesToPrint > 0 && linesPrinted < maxLinesToPrint) { + linesPrinted++; + log.debug("{'{}':'{}'} is not an observed field.", fieldName, literal); + } + return ScannerStream.delayed(node); + } + } catch (TableNotFoundException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + if (maxLinesToPrint > 0 && ++linesPrinted < maxLinesToPrint) { + linesPrinted++; + log.debug("{'{}':'{}'} is not indexed.", fieldName, literal); + } + // even though the field is not indexed it may still be valuable when evaluating an event. mark this scanner stream as delayed, so it is correctly + // propagated + return ScannerStream.delayed(node); + } + + // Final case, field is indexed + if (maxLinesToPrint > 0 && linesPrinted < maxLinesToPrint) { + linesPrinted++; + log.debug("{'{}':'{}'} is indexed.", fieldName, literal); + } + + try { + TruncatedIndexScanner scanner = new TruncatedIndexScanner(config); + if (config.getDatatypeFilter() != null && !config.getDatatypeFilter().isEmpty()) { + scanner.setDatatypes(config.getDatatypeFilter()); + } + scanner.setTableName(config.getTruncatedIndexTableName()); + scanner.setFieldValue(fieldName, literal); + scanner.setBasePriority(config.getBaseIteratorPriority()); + + Authorizations auths = AuthorizationsMinimizer.minimize(config.getAuthorizations()).iterator().next(); + scanner.setAuths(auths); + + return ScannerStream.initialized(scanner, node); + } catch (Exception e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + @Override + public void close() { + super.close(); + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java index 2478fdb3e9f..ae914b52c78 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java @@ -100,6 +100,8 @@ import datawave.query.index.lookup.IndexStream; import datawave.query.index.lookup.QueryPlanStream; import datawave.query.index.lookup.RangeStream; +import datawave.query.index.lookup.TruncatedIndexIterator; +import datawave.query.index.lookup.TruncatedRangeStream; import datawave.query.iterator.CloseableListIterable; import datawave.query.iterator.QueryIterator; import datawave.query.iterator.QueryOptions; @@ -3101,6 +3103,10 @@ private QueryPlanStream getQueryPlanStream(ShardQueryConfiguration config, Scann if (config.isUseShardedIndex()) { return getDayIndexStream(config); + } else if (config.isUseTruncatedIndex()) { + this.rangeStreamClass = TruncatedRangeStream.class.getCanonicalName(); + this.createUidsIteratorClass = TruncatedIndexIterator.class; + return initializeRangeStream(config, scannerFactory, metadataHelper); } else { return initializeRangeStream(config, scannerFactory, metadataHelper); } diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/BitSetIterator.java b/warehouse/query-core/src/main/java/datawave/query/tables/BitSetIterator.java new file mode 100644 index 00000000000..a0fda3dabd4 --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/tables/BitSetIterator.java @@ -0,0 +1,32 @@ +package datawave.query.tables; + +import java.util.BitSet; +import java.util.Iterator; + +public class BitSetIterator implements Iterator { + + private int index; + private BitSet bitset; + + public BitSetIterator(BitSet bitset) { + this.bitset = bitset; + this.index = 0; + } + + public void reset(BitSet bitset) { + this.bitset = bitset; + this.index = 0; + } + + @Override + public boolean hasNext() { + return bitset != null && bitset.nextSetBit(index) >= 0; + } + + @Override + public Integer next() { + int next = bitset.nextSetBit(index); + index = next + 1; + return next; + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/DateIterator.java b/warehouse/query-core/src/main/java/datawave/query/tables/DateIterator.java new file mode 100644 index 00000000000..42e58c3362a --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/tables/DateIterator.java @@ -0,0 +1,52 @@ +package datawave.query.tables; + +import java.util.Calendar; +import java.util.Date; +import java.util.Iterator; + +import datawave.util.time.DateHelper; + +/** + * An iterator that can present each day given a start and end date in yyyyMMdd format + */ +public class DateIterator implements Iterator { + + private final Calendar start; + private final Calendar end; + + private String top; + + public DateIterator(String startDate, String endDate) { + start = getCalendarStartOfDay(DateHelper.parse(startDate)); + end = getCalendarStartOfDay(DateHelper.parse(endDate)); + top = DateHelper.format(start.getTime()); + } + + @Override + public boolean hasNext() { + return top != null; + } + + @Override + public String next() { + String next = top; + top = null; + + start.add(Calendar.DAY_OF_YEAR, 1); + if (start.compareTo(end) <= 0) { + top = DateHelper.format(start.getTime()); + } + + return next; + } + + private Calendar getCalendarStartOfDay(Date date) { + Calendar calendar = Calendar.getInstance(); + calendar.setTime(date); + calendar.set(Calendar.HOUR_OF_DAY, 0); + calendar.set(Calendar.MINUTE, 0); + calendar.set(Calendar.SECOND, 0); + calendar.set(Calendar.MILLISECOND, 0); + return calendar; + } +} diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java index 8792721cce8..bfa79068884 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java @@ -394,6 +394,17 @@ public Result next() { } } + /** + * Some implementations of {@link ScannerSession} support seeking + * + * @param seekShard + * the minimum shard to advance to + * @return the result of advancing the underlying iterator + */ + public String seek(String seekShard) { + return seekShard; + } + /** * Override this for your specific implementation. * diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/TruncatedIndexScanner.java b/warehouse/query-core/src/main/java/datawave/query/tables/TruncatedIndexScanner.java new file mode 100644 index 00000000000..61f3f53f05b --- /dev/null +++ b/warehouse/query-core/src/main/java/datawave/query/tables/TruncatedIndexScanner.java @@ -0,0 +1,192 @@ +package datawave.query.tables; + +import java.util.BitSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; + +import datawave.query.config.ShardQueryConfiguration; +import datawave.query.index.lookup.DataTypeFilter; +import datawave.query.index.lookup.IndexInfo; +import datawave.query.index.lookup.TruncatedIndexIterator; +import datawave.query.jexl.JexlNodeFactory; +import datawave.query.util.Tuple2; +import datawave.util.time.DateHelper; + +/** + * Wraps an {@link TruncatedIndexIterator} with the context of a query's date range. Transforms the aggregated bitset that denotes shard offsets to present a + * shard-specific view of the world to the caller. + */ +public class TruncatedIndexScanner implements Iterator> { + + private static final Logger log = LoggerFactory.getLogger(TruncatedIndexScanner.class); + + private String value; + private String field; + private Set datatypes = null; + + private final AccumuloClient client; + private Authorizations auths; + private String tableName; + + private Text columnFamily; + private String currentDay; + private int basePriority = 30; + + private Tuple2 top = null; + + private final DateIterator dateIterator; + private final BitSetIterator bitsetIterator; + + public TruncatedIndexScanner(ShardQueryConfiguration config) { + this(config.getClient(), DateHelper.format(config.getBeginDate()), DateHelper.format(config.getEndDate())); + } + + public TruncatedIndexScanner(AccumuloClient client, String startDate, String endDate) { + this.client = client; + this.dateIterator = new DateIterator(startDate, endDate); + this.bitsetIterator = new BitSetIterator(null); + } + + public void setFieldValue(String field, String value) { + this.field = field; + this.value = value; + this.columnFamily = new Text(field); + } + + public void setAuths(Authorizations auths) { + this.auths = auths; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setDatatypes(Set datatypes) { + this.datatypes = datatypes; + } + + public void setBasePriority(int basePriority) { + this.basePriority = basePriority; + } + + @Override + public boolean hasNext() { + while (top == null) { + + if (currentDay == null) { + currentDay = dateIterator.next(); + + BitSet bitset = scanNextDay(currentDay); + if (bitset == null || bitset.cardinality() == 0) { + currentDay = null; // no data found, advance to next day + } else { + bitsetIterator.reset(bitset); + } + } + + if (bitsetIterator.hasNext()) { + int index = bitsetIterator.next(); + if (currentDay != null && index != -1) { + String shard = currentDay + "_" + index; + top = createIndexInfo(shard); + } + } else { + currentDay = null; + } + + if (currentDay == null && !dateIterator.hasNext() && !bitsetIterator.hasNext()) { + break; + } + } + return top != null; + } + + /** + * Create a new Tuple using the provided shard. A new object must be created for each top value. + * + * @param shard + * the current shard + * @return a new Tuple of shard and index info + */ + private Tuple2 createIndexInfo(String shard) { + IndexInfo info = new IndexInfo(-1); + info.applyNode(JexlNodeFactory.buildEQNode(field, value)); + return new Tuple2<>(shard, info); + } + + @Override + public Tuple2 next() { + Tuple2 next = top; + top = null; + return next; + } + + private BitSet scanNextDay(String date) { + Objects.requireNonNull(tableName, "must set the index table name"); + Objects.requireNonNull(auths, "authorizations must be set"); + try (var scanner = client.createScanner(tableName, auths)) { + + Range range = createScanRange(value, field, date); + scanner.setRange(range); + scanner.fetchColumnFamily(columnFamily); + + if (datatypes != null && !datatypes.isEmpty()) { + scanner.addScanIterator(createDatatypeFilter(basePriority + 1)); + } + scanner.addScanIterator(createScanIterator(basePriority + 2)); + + BitSet bitset = null; + int count = 0; + for (Map.Entry entry : scanner) { + count++; + if (bitset == null) { + bitset = BitSet.valueOf(entry.getValue().get()); + } else { + BitSet result = BitSet.valueOf(entry.getValue().get()); + bitset.or(result); + } + } + + if (count > 1) { + // the TruncatedIndexIterator should never return more than one entry returned + log.error("found more than one entry: {}", count); + } + + return bitset; + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } + } + + private Range createScanRange(String value, String field, String date) { + Key start = new Key(value, field, date + "\u0000"); + Key stop = new Key(value, field, date + "\u0000\uFFFF"); + return new Range(start, true, stop, true); + } + + private IteratorSetting createDatatypeFilter(int priority) { + IteratorSetting setting = new IteratorSetting(priority, DataTypeFilter.class); + setting.addOption(DataTypeFilter.TYPES, Joiner.on(',').join(datatypes)); + return setting; + } + + private IteratorSetting createScanIterator(int priority) { + return new IteratorSetting(priority, TruncatedIndexIterator.class.getSimpleName(), TruncatedIndexIterator.class); + } + +} diff --git a/warehouse/query-core/src/test/java/datawave/query/ColorsTest.java b/warehouse/query-core/src/test/java/datawave/query/ColorsTest.java index caa927cfc3d..db16aa0f40d 100644 --- a/warehouse/query-core/src/test/java/datawave/query/ColorsTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/ColorsTest.java @@ -312,11 +312,12 @@ public ColorsTest planAndExecuteQueryAgainstMultipleIndices() throws Exception { for (String indexTableName : TestIndexTableNames.names()) { logic.setIndexTableName(indexTableName); switch (indexTableName) { + case TestIndexTableNames.SHARD_INDEX: case TestIndexTableNames.NO_UID_INDEX: break; case TestIndexTableNames.TRUNCATED_INDEX: - // TODO: not yet implemented - continue; + logic.setUseTruncatedIndex(true); + break; case TestIndexTableNames.SHARDED_DAY_INDEX: case TestIndexTableNames.SHARDED_YEAR_INDEX: logic.setUseShardedIndex(true); @@ -329,6 +330,7 @@ public ColorsTest planAndExecuteQueryAgainstMultipleIndices() throws Exception { planAndExecuteQuery(); switch (indexTableName) { + case TestIndexTableNames.SHARD_INDEX: case TestIndexTableNames.NO_UID_INDEX: break; case TestIndexTableNames.TRUNCATED_INDEX: @@ -507,7 +509,7 @@ public void testColorRed() throws Exception { withQuery("COLOR == 'red'"); withRequiredAllOf("COLOR:red"); withFullExpectedCount(); - planAndExecuteQuery(); + planAndExecuteQueryAgainstMultipleIndices(); } @Test @@ -515,7 +517,7 @@ public void testColorYellow() throws Exception { withQuery("COLOR == 'yellow'"); withRequiredAllOf("COLOR:yellow"); withFullExpectedCount(); - planAndExecuteQuery(); + planAndExecuteQueryAgainstMultipleIndices(); } @Test @@ -523,7 +525,7 @@ public void testColorBlue() throws Exception { withQuery("COLOR == 'blue'"); withRequiredAllOf("COLOR:blue"); withFullExpectedCount(); - planAndExecuteQuery(); + planAndExecuteQueryAgainstMultipleIndices(); } @Test @@ -531,19 +533,19 @@ public void testAllColors() throws Exception { withQuery("COLOR == 'red' || COLOR == 'yellow' || COLOR == 'blue'"); withOptionalAnyOf("COLOR:red", "COLOR:yellow", "COLOR:blue"); withExpectedCount(3 * getTotalEventCount()); - planAndExecuteQuery(); + planAndExecuteQueryAgainstMultipleIndices(); } @Test public void testSearchAllShardsDefeatedAtFieldIndex() throws Exception { - withQuery("COLOR == 'red' && !COLOR == 'red'"); - planAndExecuteQuery(); + withQuery("COLOR == 'red' && !(COLOR == 'red')"); + planAndExecuteQueryAgainstMultipleIndices(); } @Test public void testSearchAllShardsDefeatedAtEvaluation() throws Exception { withQuery("COLOR == 'red' && filter:includeRegex(COLOR, 'yellow')"); - planAndExecuteQuery(); + planAndExecuteQueryAgainstMultipleIndices(); } @Test @@ -554,7 +556,7 @@ public void testReturnedShardsForEarlierDate() throws Exception { withExpectedCount(ColorsIngest.getNumShards()); withExpectedDays("20250301"); withExpectedShards("20250301", ColorsIngest.getNumShards()); - planAndExecuteQuery(); + planAndExecuteQueryAgainstMultipleIndices(); } @Test @@ -565,7 +567,7 @@ public void testReturnedShardsForLaterDate() throws Exception { withExpectedCount(ColorsIngest.getNewShards()); withExpectedDays("20250331"); withExpectedShards("20250331", ColorsIngest.getNewShards()); - planAndExecuteQuery(); + planAndExecuteQueryAgainstMultipleIndices(); } @Test @@ -577,7 +579,7 @@ public void testReturnedShardsForQueryThatCrossesBoundary() throws Exception { withExpectedDays("20250326", "20250327"); withExpectedShards("20250326", ColorsIngest.getNumShards()); withExpectedShards("20250327", ColorsIngest.getNewShards()); - planAndExecuteQuery(); + planAndExecuteQueryAgainstMultipleIndices(); } // TODO: unique diff --git a/warehouse/query-core/src/test/java/datawave/query/SizesTest.java b/warehouse/query-core/src/test/java/datawave/query/SizesTest.java index ff38e144f36..7e93e56a983 100644 --- a/warehouse/query-core/src/test/java/datawave/query/SizesTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/SizesTest.java @@ -1,12 +1,8 @@ package datawave.query; -import static datawave.query.util.AbstractQueryTest.RangeType.DOCUMENT; -import static datawave.query.util.AbstractQueryTest.RangeType.SHARD; - import javax.inject.Inject; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.security.Authorizations; import org.jboss.arquillian.container.test.api.Deployment; import org.jboss.arquillian.junit.Arquillian; import org.jboss.shrinkwrap.api.ShrinkWrap; @@ -23,22 +19,27 @@ import datawave.accumulo.inmemory.InMemoryAccumuloClient; import datawave.accumulo.inmemory.InMemoryInstance; import datawave.configuration.spring.SpringBean; -import datawave.helpers.PrintUtility; import datawave.ingest.data.TypeRegistry; +import datawave.query.index.day.IndexIngestUtil; import datawave.query.tables.ShardQueryLogic; import datawave.query.tables.edge.DefaultEdgeEventQueryLogic; import datawave.query.util.AbstractQueryTest; import datawave.query.util.SizesIngest; -import datawave.util.TableName; +import datawave.query.util.TestIndexTableNames; import datawave.webservice.edgedictionary.RemoteEdgeDictionary; /** * This suite of tests exercises many random events over a small number of shards */ -public abstract class SizesTest extends AbstractQueryTest { +@RunWith(Arquillian.class) +public class SizesTest extends AbstractQueryTest { private static final Logger log = LoggerFactory.getLogger(SizesTest.class); + private static final IndexIngestUtil ingestUtil = new IndexIngestUtil(); + + private static AccumuloClient clientForSetup; + @Inject @SpringBean(name = "EventQuery") protected ShardQueryLogic logic; @@ -48,56 +49,6 @@ public ShardQueryLogic getLogic() { return logic; } - @RunWith(Arquillian.class) - public static class ShardRangeTest extends SizesTest { - - protected static AccumuloClient client = null; - - @BeforeClass - public static void setUp() throws Exception { - InMemoryInstance i = new InMemoryInstance(ShardRangeTest.class.getName()); - client = new InMemoryAccumuloClient("", i); - - SizesIngest ingest = new SizesIngest(client); - ingest.write(SHARD); - - Authorizations auths = new Authorizations("ALL"); - PrintUtility.printTable(client, auths, TableName.SHARD); - PrintUtility.printTable(client, auths, TableName.SHARD_INDEX); - PrintUtility.printTable(client, auths, TableName.METADATA); - } - - @Before - public void beforeEach() { - setClientForTest(client); - } - } - - @RunWith(Arquillian.class) - public static class DocumentRangeTest extends SizesTest { - - protected static AccumuloClient client = null; - - @BeforeClass - public static void setUp() throws Exception { - InMemoryInstance i = new InMemoryInstance(DocumentRangeTest.class.getName()); - client = new InMemoryAccumuloClient("", i); - - SizesIngest ingest = new SizesIngest(client); - ingest.write(DOCUMENT); - - Authorizations auths = new Authorizations("ALL"); - PrintUtility.printTable(client, auths, TableName.SHARD); - PrintUtility.printTable(client, auths, TableName.SHARD_INDEX); - PrintUtility.printTable(client, auths, TableName.METADATA); - } - - @Before - public void beforeEach() { - setClientForTest(client); - } - } - @Deployment public static JavaArchive createDeployment() throws Exception { // @formatter:off @@ -113,15 +64,22 @@ public static JavaArchive createDeployment() throws Exception { // @formatter:on } - public void planAndExecuteQuery() throws Exception { - planQuery(); - executeQuery(); - // TODO: assert based on test metadata + @BeforeClass + public static void beforeClass() throws Exception { + InMemoryInstance instance = new InMemoryInstance(SizesTest.class.getName()); + clientForSetup = new InMemoryAccumuloClient("", instance); + + SizesIngest ingest = new SizesIngest(clientForSetup); + ingest.write(); + + ingestUtil.write(clientForSetup, auths); } @Before public void setup() throws Exception { withDate("20250606", "20250606"); + clientForTest = clientForSetup; + setClientForTest(clientForTest); } @AfterClass @@ -129,6 +87,53 @@ public static void teardown() { TypeRegistry.reset(); } + /** + * Plans and executes a query against multiple versions of a shard index + * + * @throws Exception + * if something goes wrong + */ + @Override + public void planAndExecuteQuery() throws Exception { + for (String indexTableName : TestIndexTableNames.names()) { + logic.setIndexTableName(indexTableName); + switch (indexTableName) { + case TestIndexTableNames.SHARD_INDEX: + case TestIndexTableNames.NO_UID_INDEX: + break; + case TestIndexTableNames.TRUNCATED_INDEX: + logic.setUseTruncatedIndex(true); + break; + case TestIndexTableNames.SHARDED_DAY_INDEX: + case TestIndexTableNames.SHARDED_YEAR_INDEX: + logic.setUseShardedIndex(true); + break; + default: + throw new IllegalStateException("Unknown index table name " + indexTableName); + } + + log.debug("=== using index: {} ===", indexTableName); + planQuery(); + executeQuery(); + // TODO: assert based on test metadata + + switch (indexTableName) { + case TestIndexTableNames.SHARD_INDEX: + case TestIndexTableNames.NO_UID_INDEX: + break; + case TestIndexTableNames.TRUNCATED_INDEX: + logic.setUseTruncatedIndex(false); + break; + case TestIndexTableNames.SHARDED_DAY_INDEX: + case TestIndexTableNames.SHARDED_YEAR_INDEX: + logic.setUseShardedIndex(false); + break; + default: + throw new IllegalStateException("Unknown index table name " + indexTableName); + } + } + } + @Test public void testSizeSmall() throws Exception { withQuery("SIZE == 'small'"); diff --git a/warehouse/query-core/src/test/java/datawave/query/index/lookup/TruncatedIndexIteratorTest.java b/warehouse/query-core/src/test/java/datawave/query/index/lookup/TruncatedIndexIteratorTest.java new file mode 100644 index 00000000000..10e78afaa34 --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/index/lookup/TruncatedIndexIteratorTest.java @@ -0,0 +1,191 @@ +package datawave.query.index.lookup; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iteratorsImpl.system.SortedMapIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import datawave.query.iterator.SourceManagerTest.MockIteratorEnvironment; + +/** + * Note: because this class doesn't use combiners each key must be distinct + */ +public class TruncatedIndexIteratorTest { + + private static final Logger log = LoggerFactory.getLogger(TruncatedIndexIteratorTest.class); + + private final String DEFAULT_DATE = "20251010"; + private final String DEFAULT_DATATYPE = "datatype-a"; + private final String DEFAULT_VISIBILITY = "VIZ-A"; + + private final List> expected = new ArrayList<>(); + private final List> results = new ArrayList<>(); + private final SortedMap data = new TreeMap<>(); + + private String field; + private String value; + private String startDate; + private String endDate; + + @BeforeEach + public void beforeEach() { + expected.clear(); + results.clear(); + data.clear(); + + field = null; + value = null; + startDate = null; + endDate = null; + } + + @Test + public void testSingleKeySingleBit() throws IOException { + write(createKey("value-a", "FIELD_A"), createValue(0)); + expect(createKey("value-a", "FIELD_A"), createValue(0)); + withFieldValue("FIELD_A", "value-a"); + withDateRange("20251010", "20251010"); + iterate(); + } + + @Test + public void testSingleKeyMultiBit() throws IOException { + write(createKey("value-a", "FIELD_A"), createValue(0, 17)); + expect(createKey("value-a", "FIELD_A"), createValue(0, 17)); + withFieldValue("FIELD_A", "value-a"); + withDateRange("20251010", "20251010"); + iterate(); + } + + @Test + public void testAggregateDatatypes() throws IOException { + write(createKey("value-a", "FIELD_A", DEFAULT_DATE, "datatype-a"), createValue(0)); + write(createKey("value-a", "FIELD_A", DEFAULT_DATE, "datatype-b"), createValue(17)); + expect(createKey("value-a", "FIELD_A"), createValue(0, 17)); + withFieldValue("FIELD_A", "value-a"); + withDateRange("20251010", "20251010"); + iterate(); + } + + @Test + public void testAggregateVisibilities() throws IOException { + write(createKey("value-a", "FIELD_A", DEFAULT_DATE, DEFAULT_DATATYPE, "VIZ-A"), createValue(0)); + write(createKey("value-a", "FIELD_A", DEFAULT_DATE, DEFAULT_DATATYPE, "VIZ-B"), createValue(17)); + expect(createKey("value-a", "FIELD_A"), createValue(0, 17)); + withFieldValue("FIELD_A", "value-a"); + withDateRange("20251010", "20251010"); + iterate(); + } + + // each call to this iterator should be restricted to a single day, but maybe not + @Test + public void testMultipleDays() throws IOException { + write(createKey("value-a", "FIELD_A", "20251010"), createValue(0)); + write(createKey("value-a", "FIELD_A", "20251011"), createValue(1)); + write(createKey("value-a", "FIELD_A", "20251012"), createValue(2)); + expect(createKey("value-a", "FIELD_A", "20251010"), createValue(0)); + expect(createKey("value-a", "FIELD_A", "20251011"), createValue(1)); + expect(createKey("value-a", "FIELD_A", "20251012"), createValue(2)); + withFieldValue("FIELD_A", "value-a"); + withDateRange("20251010", "20251012"); + iterate(); + } + + private void iterate() throws IOException { + Key start = new Key(value, field, startDate); + Key end = new Key(value, field, endDate + "\u0000\uFFFF"); + Range range = new Range(start, end); + + Collection cfs = Set.of(new ArrayByteSequence(field)); + + TruncatedIndexIterator iter = new TruncatedIndexIterator(); + iter.init(new SortedMapIterator(data), Collections.emptyMap(), new MockIteratorEnvironment()); + iter.seek(range, cfs, true); + + while (iter.hasTop()) { + Key key = iter.getTopKey(); + Value value = iter.getTopValue(); + results.add(new SimpleEntry<>(key, value)); + iter.next(); + } + + if (expected.size() != results.size()) { + log.warn("expected {} results but got {}", expected.size(), results.size()); + } + for (int i = 0; i < expected.size(); i++) { + assertEquals(expected.get(i).getKey(), results.get(i).getKey()); + BitSet expectedBitSet = BitSet.valueOf(expected.get(i).getValue().get()); + BitSet resultBitSet = BitSet.valueOf(results.get(i).getValue().get()); + assertEquals(expectedBitSet, resultBitSet); + } + assertEquals(expected.size(), results.size()); + } + + private void withFieldValue(String field, String value) { + this.field = field; + this.value = value; + } + + private void withDateRange(String start, String end) { + this.startDate = start; + this.endDate = end; + } + + private void write(Key key, Value value) { + data.put(key, value); + } + + private void expect(Key key, Value value) { + expected.add(new SimpleEntry<>(key, value)); + } + + private Key createKey(String value, String field) { + return createKey(value, field, DEFAULT_DATE, DEFAULT_DATATYPE, DEFAULT_VISIBILITY); + } + + private Key createKey(String value, String field, String date) { + return createKey(value, field, date, DEFAULT_DATATYPE, DEFAULT_VISIBILITY); + } + + private Key createKey(String value, String field, String date, String datatype) { + return createKey(value, field, date, datatype, DEFAULT_VISIBILITY); + } + + private Key createKey(String value, String field, String date, String datatype, String visibility) { + return new Key(value, field, date + "\u0000" + datatype, visibility); + } + + private Value createValue(int... offsets) { + BitSet bits = createBitSet(offsets); + return new Value(bits.toByteArray()); + } + + private BitSet createBitSet(int... offsets) { + BitSet bitSet = new BitSet(); + for (int offset : offsets) { + bitSet.set(offset); + } + return bitSet; + } + +} diff --git a/warehouse/query-core/src/test/java/datawave/query/index/lookup/TruncatedRangeStreamTest.java b/warehouse/query-core/src/test/java/datawave/query/index/lookup/TruncatedRangeStreamTest.java new file mode 100644 index 00000000000..2de4b8ffe8d --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/index/lookup/TruncatedRangeStreamTest.java @@ -0,0 +1,246 @@ +package datawave.query.index.lookup; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.commons.jexl3.parser.ASTJexlScript; +import org.apache.commons.jexl3.parser.ParseException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import datawave.accumulo.inmemory.InMemoryAccumuloClient; +import datawave.accumulo.inmemory.InMemoryInstance; +import datawave.query.config.ShardQueryConfiguration; +import datawave.query.jexl.JexlASTHelper; +import datawave.query.planner.QueryPlan; +import datawave.query.tables.ScannerFactory; +import datawave.query.util.MockMetadataHelper; +import datawave.util.TableName; +import datawave.util.time.DateHelper; + +/** + * A suite of tests for a truncated index. Keys take the form: + * + *
+ *     value FIELD:yyyyMMdd0x00datatype VIZ (bitset)
+ * 
+ * + * The test assertions are simpler than other RangeStream-style tests due to the fact that only shard ranges are generated. + */ +public class TruncatedRangeStreamTest { + + private static final Logger log = LoggerFactory.getLogger(TruncatedRangeStreamTest.class); + + private String query; + + private MockMetadataHelper helper; + private ShardQueryConfiguration config; + + private final List expected = new ArrayList<>(); + private final List results = new ArrayList<>(); + + private final InMemoryInstance instance = new InMemoryInstance(TruncatedRangeStreamTest.class.getName()); + private final AccumuloClient client = new InMemoryAccumuloClient("", instance); + private final Authorizations auths = new Authorizations("VIZ-A", "VIZ-B", "VIZ-C"); + private final String indexTableName = TableName.TRUNCATED_SHARD_INDEX; + private final Set indexedFields = Set.of("FIELD_A", "FIELD_B", "FIELD_C", "FIELD_D", "FIELD_E"); + + private final String DEFAULT_FIELD = "FIELD_A"; + private final String DEFAULT_VALUE = "value-a"; + private final String DEFAULT_DATE = "20251010"; + private final String DEFAULT_DATATYPE = "datatype-a"; + private final String DEFAULT_VISIBILITY = "VIZ-A"; + + private TruncatedRangeStreamTest() throws AccumuloSecurityException { + // required for creating instance/client + } + + @BeforeEach + public void beforeEach() { + expected.clear(); + results.clear(); + + TableOperations tops = client.tableOperations(); + try { + if (tops.exists(indexTableName)) { + tops.delete(indexTableName); + } + tops.create(indexTableName); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + + helper = new MockMetadataHelper(); + helper.setIndexedFields(indexedFields); + + config = new ShardQueryConfiguration(); + config.setIndexedFields(indexedFields); + config.setClient(client); + config.setTruncatedIndexTableName(indexTableName); + config.setAuthorizations(Collections.singleton(auths)); + config.setBeginDate(DateHelper.parse(DEFAULT_DATE)); + config.setEndDate(DateHelper.parse(DEFAULT_DATE)); + } + + @Test + public void testSingleTermSingleDay() { + write("value-a", "FIELD_A", createValue(0, 1, 2)); + expect("20251010_0", "20251010_1", "20251010_2"); + withQuery("FIELD_A == 'value-a'"); + withDateRange("20251010", "20251010"); + scan(); + } + + @Test + public void testSingleTermMultiDay() { + write("value-a", "FIELD_A", "20251010\0datatype-a", createValue(0, 1, 2)); + write("value-a", "FIELD_A", "20251017\0datatype-a", createValue(11, 22)); + expect("20251010_0", "20251010_1", "20251010_2", "20251017_11", "20251017_22"); + withQuery("FIELD_A == 'value-a'"); + withDateRange("20251010", "20251020"); + scan(); + } + + @Test + public void testFieldsSkipped() { + write("value-a", "FIELD_A", createValue(1)); + write("value-a", "FIELD_B", createValue(2)); + write("value-a", "FIELD_C", createValue(3)); + expect("20251010_2"); + withQuery("FIELD_B == 'value-a'"); + scan(); + } + + @Test + public void testDateRangeCorrectlyApplied() { + write("value-a", "FIELD_A", "20251010\0datatype-a", createValue(1)); + write("value-a", "FIELD_A", "20251011\0datatype-a", createValue(2)); + write("value-a", "FIELD_A", "20251012\0datatype-a", createValue(3)); + write("value-a", "FIELD_A", "20251013\0datatype-a", createValue(4)); + write("value-a", "FIELD_A", "20251014\0datatype-a", createValue(5)); + expect("20251011_2", "20251012_3", "20251013_4"); + withQuery("FIELD_A == 'value-a'"); + withDateRange("20251011", "20251013"); + scan(); + } + + @Test + public void testDateRangeWithGapCorrectlyApplied() { + write("value-a", "FIELD_A", "20251010\0datatype-a", createValue(1)); + write("value-a", "FIELD_A", "20251014\0datatype-a", createValue(5)); + expect("20251010_1", "20251014_5"); + withQuery("FIELD_A == 'value-a'"); + withDateRange("20251010", "20251020"); + scan(); + } + + @Test + public void testDatatypeFilterCorrectlyApplied() { + write("value-a", "FIELD_A", "20251010\0datatype-a", createValue(1)); + write("value-a", "FIELD_A", "20251010\0datatype-b", createValue(2)); + write("value-a", "FIELD_A", "20251010\0datatype-c", createValue(3)); + write("value-a", "FIELD_A", "20251010\0datatype-d", createValue(4)); + write("value-a", "FIELD_A", "20251010\0datatype-e", createValue(5)); + expect("20251010_1", "20251010_3", "20251010_5"); + withQuery("FIELD_A == 'value-a'"); + withDatatypes(Set.of("datatype-a", "datatype-c", "datatype-e")); + scan(); + } + + private void withQuery(String query) { + this.query = query; + } + + private void withDateRange(String startDate, String endDate) { + config.setBeginDate(DateHelper.parse(startDate)); + config.setEndDate(DateHelper.parse(endDate)); + } + + private void withDatatypes(Set datatypes) { + config.setDatatypeFilter(datatypes); + } + + private void write(String row, String cf, Value value) { + String cq = DEFAULT_DATE + "\0" + DEFAULT_DATATYPE; + write(row, cf, cq, DEFAULT_VISIBILITY, value); + } + + private void write(String row, String cf, String cq, Value value) { + write(row, cf, cq, DEFAULT_VISIBILITY, value); + } + + private void write(String row, String cf, String cq, String viz, Value value) { + try (var bw = client.createBatchWriter(TableName.TRUNCATED_SHARD_INDEX)) { + Mutation m = new Mutation(row); + ColumnVisibility cv = new ColumnVisibility(viz); + m.put(cf, cq, cv, value); + bw.addMutation(m); + } catch (TableNotFoundException | MutationsRejectedException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + private Value createValue(int... offsets) { + BitSet bitset = new BitSet(); + for (int offset : offsets) { + bitset.set(offset); + } + return new Value(bitset.toByteArray()); + } + + private void scan() { + ScannerFactory scannerFactory = new ScannerFactory(client); + try (var stream = new TruncatedRangeStream(config, scannerFactory, helper); var planIter = stream.streamPlans(parse(query))) { + + for (QueryPlan plan : planIter) { + Collection ranges = plan.getRanges(); + assertEquals(1, ranges.size()); + Range range = ranges.iterator().next(); + assertEquals(0, range.getStartKey().getColumnFamily().getLength()); + String row = range.getStartKey().getRow().toString(); + results.add(row); + } + } catch (IOException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + + assertEquals(expected, results); + } + + private ASTJexlScript parse(String query) { + try { + return JexlASTHelper.parseAndFlattenJexlQuery(query); + } catch (ParseException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + private void expect(String... shards) { + this.expected.addAll(List.of(shards)); + } +} diff --git a/warehouse/query-core/src/test/java/datawave/query/tables/BitSetIteratorTest.java b/warehouse/query-core/src/test/java/datawave/query/tables/BitSetIteratorTest.java new file mode 100644 index 00000000000..3bf4c8675d9 --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/tables/BitSetIteratorTest.java @@ -0,0 +1,80 @@ +package datawave.query.tables; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class BitSetIteratorTest { + + private final List data = new ArrayList<>(); + private final List expected = new ArrayList<>(); + private final List results = new ArrayList<>(); + + @BeforeEach + public void beforeEach() { + data.clear(); + expected.clear(); + results.clear(); + } + + @Test + public void testZeroIteration() { + data(0); + expect(0); + iterate(); + } + + @Test + public void testEvenIteration() { + data(0, 2, 4, 6, 8); + expect(0, 2, 4, 6, 8); + iterate(); + } + + @Test + public void testOddIteration() { + data(1, 3, 5, 7, 9); + expect(1, 3, 5, 7, 9); + iterate(); + } + + private void iterate() { + BitSet bitset = new BitSet(); + for (int n : data) { + bitset.set(n); + } + + BitSetIterator iter = new BitSetIterator(bitset); + while (iter.hasNext()) { + int result = iter.next(); + results.add(result); + } + + assertEquals(expected, results); + } + + private void data(int... numbers) { + for (int number : numbers) { + data.add(number); + } + } + + private void expect(int... numbers) { + for (int number : numbers) { + expected.add(number); + } + } + + private BitSet create(int... offsets) { + BitSet bitset = new BitSet(); + for (int offset : offsets) { + bitset.set(offset); + } + return bitset; + } +} diff --git a/warehouse/query-core/src/test/java/datawave/query/tables/DateIteratorTest.java b/warehouse/query-core/src/test/java/datawave/query/tables/DateIteratorTest.java new file mode 100644 index 00000000000..08b6a7add67 --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/tables/DateIteratorTest.java @@ -0,0 +1,83 @@ +package datawave.query.tables; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.format.DateTimeParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class DateIteratorTest { + + private final List expected = new ArrayList<>(); + private final List results = new ArrayList<>(); + + private String start; + private String end; + + @BeforeEach + public void beforeEach() { + expected.clear(); + results.clear(); + } + + @Test + public void testSingleDay() { + expect("20251010"); + withDateRange("20251010"); + iterate(); + } + + @Test + public void testSingleDayThatDoesNotExist() { + expect("20251099"); + withDateRange("20251099"); + assertThrows(DateTimeParseException.class, this::iterate); + } + + @Test + public void testMultiDay() { + expect("20251010", "20251011", "20251012", "20251013"); + withDateRange("20251010", "20251013"); + iterate(); + } + + @Test + public void testMonthBoundary() { + expect("20251031", "20251101"); + withDateRange("20251031", "20251101"); + iterate(); + } + + @Test + public void testYearBoundary() { + expect("20251231", "20260101"); + withDateRange("20251231", "20260101"); + iterate(); + } + + private void expect(String... dates) { + expected.addAll(Arrays.asList(dates)); + } + + private void withDateRange(String date) { + withDateRange(date, date); + } + + private void withDateRange(String start, String end) { + this.start = start; + this.end = end; + } + + private void iterate() { + DateIterator iter = new DateIterator(start, end); + while (iter.hasNext()) { + results.add(iter.next()); + } + assertEquals(expected, results); + } +} diff --git a/warehouse/query-core/src/test/java/datawave/query/tables/TruncatedIndexScannerTest.java b/warehouse/query-core/src/test/java/datawave/query/tables/TruncatedIndexScannerTest.java new file mode 100644 index 00000000000..79047ed2134 --- /dev/null +++ b/warehouse/query-core/src/test/java/datawave/query/tables/TruncatedIndexScannerTest.java @@ -0,0 +1,208 @@ +package datawave.query.tables; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import datawave.accumulo.inmemory.InMemoryAccumuloClient; +import datawave.accumulo.inmemory.InMemoryInstance; +import datawave.query.index.lookup.IndexInfo; +import datawave.query.util.Tuple2; +import datawave.util.TableName; + +public class TruncatedIndexScannerTest { + + private static final Logger log = LoggerFactory.getLogger(TruncatedIndexScannerTest.class); + + private final InMemoryInstance instance = new InMemoryInstance(TruncatedIndexScannerTest.class.getName()); + private final AccumuloClient client = new InMemoryAccumuloClient("", instance); + private final Authorizations auths = new Authorizations("VIZ-A", "VIZ-B", "VIZ-C"); + private final String indexTableName = TableName.TRUNCATED_SHARD_INDEX; + + private final String DEFAULT_FIELD = "FIELD_A"; + private final String DEFAULT_VALUE = "value-a"; + private final String DEFAULT_DATE = "20251010"; + private final String DEFAULT_DATATYPE = "datatype-a"; + private final String DEFAULT_VISIBILITY = "VIZ-A"; + + private String field; + private String value; + private String startDate; + private String endDate; + private final Set datatypes = new HashSet<>(); + + private final List expected = new ArrayList<>(); + private final List results = new ArrayList<>(); + + private TruncatedIndexScannerTest() throws AccumuloSecurityException { + // required for setting up instance and client + } + + @BeforeEach + public void beforeEach() { + expected.clear(); + results.clear(); + + field = DEFAULT_FIELD; + value = DEFAULT_VALUE; + startDate = DEFAULT_DATE; + endDate = DEFAULT_DATE; + datatypes.clear(); + + TableOperations tops = client.tableOperations(); + try { + if (tops.exists(indexTableName)) { + tops.delete(indexTableName); + } + tops.create(indexTableName); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + @Test + public void testSingleDayScan() { + write("value-a", "FIELD_A", createValue(1, 2, 3)); + expect("20251010_1", "20251010_2", "20251010_3"); + scan(); + } + + @Test + public void testMultiDayScan() { + write("value-a", "FIELD_A", "20251010\0datatype-a", createValue(2, 4)); + write("value-a", "FIELD_A", "20251011\0datatype-a", createValue(7, 9)); + expect("20251010_2", "20251010_4", "20251011_7", "20251011_9"); + withDateRange("20251010", "20251011"); + scan(); + } + + @Test + public void testFieldsSkipped() { + write("value-a", "FIELD_A", createValue(1)); + write("value-a", "FIELD_B", createValue(2)); + write("value-a", "FIELD_C", createValue(3)); + expect("20251010_2"); + withFieldValue("FIELD_B", "value-a"); + scan(); + } + + @Test + public void testDateRangeCorrectlyApplied() { + write("value-a", "FIELD_A", "20251010\0datatype-a", createValue(1)); + write("value-a", "FIELD_A", "20251011\0datatype-a", createValue(2)); + write("value-a", "FIELD_A", "20251012\0datatype-a", createValue(3)); + write("value-a", "FIELD_A", "20251013\0datatype-a", createValue(4)); + write("value-a", "FIELD_A", "20251014\0datatype-a", createValue(5)); + expect("20251011_2", "20251012_3", "20251013_4"); + withDateRange("20251011", "20251013"); + scan(); + } + + @Test + public void testDateRangeWithGapCorrectlyApplied() { + write("value-a", "FIELD_A", "20251010\0datatype-a", createValue(1)); + write("value-a", "FIELD_A", "20251014\0datatype-a", createValue(5)); + expect("20251010_1", "20251014_5"); + withDateRange("20251010", "20251020"); + scan(); + } + + @Test + public void testDatatypeFilterCorrectlyApplied() { + write("value-a", "FIELD_A", "20251010\0datatype-a", createValue(1)); + write("value-a", "FIELD_A", "20251010\0datatype-b", createValue(2)); + write("value-a", "FIELD_A", "20251010\0datatype-c", createValue(3)); + write("value-a", "FIELD_A", "20251010\0datatype-d", createValue(4)); + write("value-a", "FIELD_A", "20251010\0datatype-e", createValue(5)); + expect("20251010_1", "20251010_3", "20251010_5"); + withDatatypes(Set.of("datatype-a", "datatype-c", "datatype-e")); + scan(); + } + + private void withFieldValue(String field, String value) { + this.field = field; + this.value = value; + } + + private void withDateRange(String startDate, String endDate) { + this.startDate = startDate; + this.endDate = endDate; + } + + private void withDatatypes(Set datatypes) { + this.datatypes.addAll(datatypes); + } + + private void write(String row, String cf, Value value) { + String cq = DEFAULT_DATE + "\0" + DEFAULT_DATATYPE; + write(row, cf, cq, DEFAULT_VISIBILITY, value); + } + + private void write(String row, String cf, String cq, Value value) { + write(row, cf, cq, DEFAULT_VISIBILITY, value); + } + + private void write(String row, String cf, String cq, String viz, Value value) { + try (var bw = client.createBatchWriter(TableName.TRUNCATED_SHARD_INDEX)) { + Mutation m = new Mutation(row); + ColumnVisibility cv = new ColumnVisibility(viz); + m.put(cf, cq, cv, value); + bw.addMutation(m); + } catch (TableNotFoundException | MutationsRejectedException e) { + log.error(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + private Value createValue(int... offsets) { + BitSet bitset = new BitSet(); + for (int offset : offsets) { + bitset.set(offset); + } + return new Value(bitset.toByteArray()); + } + + private void expect(String... shards) { + expected.addAll(List.of(shards)); + } + + private void scan() { + TruncatedIndexScanner scanner = new TruncatedIndexScanner(client, startDate, endDate); + scanner.setFieldValue(field, value); + scanner.setAuths(auths); + scanner.setTableName(indexTableName); + if (!datatypes.isEmpty()) { + scanner.setDatatypes(datatypes); + } + scanner.setBasePriority(30); + + while (scanner.hasNext()) { + // only extract the shard, the IndexInfo is always the same + Tuple2 entry = scanner.next(); + results.add(entry.first()); + } + assertEquals(expected, results); + } + +} diff --git a/warehouse/query-core/src/test/java/datawave/query/util/AbstractQueryTest.java b/warehouse/query-core/src/test/java/datawave/query/util/AbstractQueryTest.java index 76e53228dbb..20df634e87a 100644 --- a/warehouse/query-core/src/test/java/datawave/query/util/AbstractQueryTest.java +++ b/warehouse/query-core/src/test/java/datawave/query/util/AbstractQueryTest.java @@ -58,8 +58,8 @@ public enum RangeType { DOCUMENT, SHARD } - protected Authorizations auths = new Authorizations("ALL"); - protected Set authSet = Collections.singleton(auths); + protected static Authorizations auths = new Authorizations("ALL"); + protected static Set authSet = Collections.singleton(auths); protected final DateFormat format = new SimpleDateFormat("yyyyMMdd"); protected final KryoDocumentDeserializer deserializer = new KryoDocumentDeserializer(); diff --git a/warehouse/query-core/src/test/java/datawave/query/util/SizesIngest.java b/warehouse/query-core/src/test/java/datawave/query/util/SizesIngest.java index 90bb49195d8..6cffff3af01 100644 --- a/warehouse/query-core/src/test/java/datawave/query/util/SizesIngest.java +++ b/warehouse/query-core/src/test/java/datawave/query/util/SizesIngest.java @@ -35,7 +35,6 @@ import datawave.data.type.NumberType; import datawave.data.type.PointType; import datawave.ingest.protobuf.Uid; -import datawave.query.util.AbstractQueryTest.RangeType; import datawave.util.TableName; import datawave.util.time.DateHelper; @@ -95,10 +94,10 @@ public SizesIngest(AccumuloClient client) { this.client = client; } - public void write(RangeType type) throws Exception { + public void write() throws Exception { createTables(); loadMetadata(); - writeEvents(type); + writeEvents(); log.info("wrote {} events", numberOfEvents); } @@ -130,22 +129,22 @@ private void loadMetadata() throws Exception { } } - private void writeEvents(RangeType type) throws Exception { + private void writeEvents() throws Exception { for (int i = 0; i < NUM_SHARDS; i++) { String shard = ROW + "_" + i; - writeEventsForShard(shard, type); + writeEventsForShard(shard); } } - private void writeEventsForShard(String shard, RangeType type) throws Exception { + private void writeEventsForShard(String shard) throws Exception { // each shard gets its own random events createRandomEvents(); - writeShardIndex(shard, type); + writeShardIndex(shard); writeFieldIndex(shard); writeEvent(shard); } - private void writeShardIndex(String shard, RangeType type) throws Exception { + private void writeShardIndex(String shard) throws Exception { long ts = DateHelper.parse(ROW).getTime(); try (BatchWriter bw = client.createBatchWriter(TableName.SHARD_INDEX)) { for (Multimap event : events) { @@ -154,7 +153,7 @@ private void writeShardIndex(String shard, RangeType type) throws Exception { Mutation m = new Mutation(value); for (String field : inverted.get(value)) { String uid = uidForEvent(shard, event.get("COUNTER").iterator().next()); - m.put(field, shard + "\0" + datatype, cv, ts, getValue(type, uid)); + m.put(field, shard + "\0" + datatype, cv, ts, getValue(uid)); bw.addMutation(m); } } @@ -389,16 +388,11 @@ private Multimap invert(Multimap event) { return inverted; } - private static Value getValue(RangeType type, String uid) { + private static Value getValue(String uid) { Uid.List.Builder builder = Uid.List.newBuilder(); - if (type.equals(RangeType.DOCUMENT)) { - builder.setIGNORE(false); - builder.setCOUNT(1L); - builder.addUID(uid); - } else { - builder.setIGNORE(true); - builder.setCOUNT(17L); // arbitrary prime number below the 20 max uid limit - } + builder.setIGNORE(false); + builder.setCOUNT(1L); + builder.addUID(uid); return new Value(builder.build().toByteArray()); }