Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@
import com.google.common.collect.Iterators;

import datawave.query.jexl.visitors.JexlStringBuildingVisitor;
import datawave.query.tables.RangeStreamScanner;
import datawave.query.tables.ScannerSession;
import datawave.query.util.Tuple2;

/**
* Provides a core set of variables for the ScannerStream, Union, and Intersection.
*
* <p>
* A reference to the underlying {@link datawave.query.tables.RangeStreamScanner} is required for seeking.
*
* <p>
* Note that the BaseIndexStream does not implement the {@link IndexStream#seek(String)} method. Inheriting classes are responsible for determining the correct
* implementation.
*/
public abstract class BaseIndexStream implements IndexStream {

protected RangeStreamScanner rangeStreamScanner;
protected ScannerSession scannerSession;

protected EntryParser entryParser;

Expand All @@ -39,8 +39,8 @@ public abstract class BaseIndexStream implements IndexStream {
/**
* This constructor is used by BaseIndexStreams that have a backing range stream scanner. I.e., this will actually scan the global index
*
* @param rangeStreamScanner
* a range stream scanner
* @param scannerSession
* a scanner session, likely a RangeStreamScanner
* @param entryParser
* an entry parser
* @param node
Expand All @@ -50,18 +50,18 @@ public abstract class BaseIndexStream implements IndexStream {
* @param debugDelegate
* a delegate used for debugging (not in use)
*/
public BaseIndexStream(RangeStreamScanner rangeStreamScanner, EntryParser entryParser, JexlNode node, StreamContext context, IndexStream debugDelegate) {
this.rangeStreamScanner = Preconditions.checkNotNull(rangeStreamScanner);
public BaseIndexStream(ScannerSession scannerSession, EntryParser entryParser, JexlNode node, StreamContext context, IndexStream debugDelegate) {
this.scannerSession = Preconditions.checkNotNull(scannerSession);
this.entryParser = Preconditions.checkNotNull(entryParser);
this.node = node;
this.backingIter = Iterators.transform(this.rangeStreamScanner, this.entryParser);
this.backingIter = Iterators.transform(this.scannerSession, this.entryParser);
this.context = context;
this.debugDelegate = debugDelegate;
}

/**
* This constructor is for terms that do not have a range stream scanner.
*
* <p>
* This is used by the SHARDS_AND_DAYS hint and terms that do not hit anything in the global index (delayed terms)
*
* @param iterator
Expand All @@ -74,7 +74,7 @@ public BaseIndexStream(RangeStreamScanner rangeStreamScanner, EntryParser entryP
* delegate used for debugging (not in use)
*/
public BaseIndexStream(Iterator<Tuple2<String,IndexInfo>> iterator, JexlNode node, StreamContext context, IndexStream debugDelegate) {
this.rangeStreamScanner = null;
this.scannerSession = null;
this.entryParser = null;
this.node = node;
this.backingIter = Preconditions.checkNotNull(iterator);
Expand All @@ -91,10 +91,10 @@ public BaseIndexStream() {
* Reset the backing iterator after a seek. State must stay in sync with changes to the RangeStreamScanner.
*/
public void resetBackingIterator() {
if (rangeStreamScanner != null && entryParser != null) {
if (scannerSession != null && entryParser != null) {
this.peekedElement = null;
this.hasPeeked = false;
this.backingIter = Iterators.transform(this.rangeStreamScanner, this.entryParser);
this.backingIter = Iterators.transform(this.scannerSession, this.entryParser);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ public class RangeStream extends BaseVisitor implements QueryPlanStream {

protected NumShardFinder numShardFinder;

private int maxLinesToPrint = -1;
private int linesPrinted = 0;
protected int maxLinesToPrint = -1;
protected int linesPrinted = 0;

public RangeStream(ShardQueryConfiguration config, ScannerFactory scanners, MetadataHelper metadataHelper) {
this.config = config;
Expand Down Expand Up @@ -718,7 +718,7 @@ public Object visit(ASTFalseNode node, Object data) {
return ScannerStream.noData(node);
}

private boolean isUnOrNotFielded(JexlNode node) {
protected boolean isUnOrNotFielded(JexlNode node) {
List<ASTIdentifier> identifiers = JexlASTHelper.getIdentifiers(node);
for (ASTIdentifier identifier : identifiers) {
if (identifier.getName().equals(Constants.ANY_FIELD) || identifier.getName().equals(Constants.NO_FIELD)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import com.google.common.collect.PeekingIterator;

import datawave.query.tables.RangeStreamScanner;
import datawave.query.tables.ScannerSession;
import datawave.query.util.Tuple2;

/**
Expand All @@ -17,12 +17,12 @@
*/
public class ScannerStream extends BaseIndexStream {

private ScannerStream(RangeStreamScanner scanSession, EntryParser entryParser, StreamContext ctx, JexlNode currNode, IndexStream debugDelegate) {
private ScannerStream(ScannerSession scanSession, EntryParser entryParser, StreamContext ctx, JexlNode currNode, IndexStream debugDelegate) {
super(scanSession, entryParser, currNode, ctx, debugDelegate);
}

private ScannerStream(BaseIndexStream itr, StreamContext ctx, JexlNode currNode) {
this(itr.rangeStreamScanner, itr.entryParser, ctx, currNode, null);
this(itr.scannerSession, itr.entryParser, ctx, currNode, null);
}

private ScannerStream(Iterator<Tuple2<String,IndexInfo>> iterator, StreamContext context, JexlNode node, IndexStream debugDelegate) {
Expand Down Expand Up @@ -74,24 +74,24 @@ public static ScannerStream initialized(Iterator<Tuple2<String,IndexInfo>> itr,
return new ScannerStream(itr, StreamContext.INITIALIZED, currNode);
}

public static ScannerStream initialized(RangeStreamScanner scannerStream, EntryParser entryParser, JexlNode currNode) {
public static ScannerStream initialized(ScannerSession scannerStream, EntryParser entryParser, JexlNode currNode) {
return new ScannerStream(scannerStream, entryParser, StreamContext.INITIALIZED, currNode, null);
}

/**
* Seek this ScannerStream to the specified shard.
*
* <p>
* If no underlying RangeStreamScanner exists then the seek operation is delegated to {@link #seekByNext(String)}.
*
* @param seekShard
* the shard to seek to.
* @return the next element great than or equal to the seek shard, or null if all elements were exhausted.
* @return the next element greater than or equal to the seek shard, or null if all elements were exhausted.
*/
@Override
public String seek(String seekShard) {
if (rangeStreamScanner != null) {
if (scannerSession != null) {

String seekedShard = rangeStreamScanner.seek(seekShard);
String seekedShard = scannerSession.seek(seekShard);
if (seekedShard == null) {
// If the underlying RangeStreamScanner returns null we are done.
this.peekedElement = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package datawave.query.index.lookup;

import java.io.IOException;
import java.util.BitSet;
import java.util.Collection;
import java.util.Map;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An iterator that can scan a global index of truncated keys in the form:
*
* <pre>
* value FIELD:yyyyMMdd0x00datatype VIZ (bitset value)
* </pre>
*
* This iterator will aggregate bitsets for a given day across all datatypes and visibilities, returning the combined bitset as a single value
*/
public class TruncatedIndexIterator implements SortedKeyValueIterator<Key,Value> {

private static final Logger log = LoggerFactory.getLogger(TruncatedIndexIterator.class);

protected SortedKeyValueIterator<Key,Value> source;
protected Key tk;
protected Value tv = null;
protected BitSet bitset = null;

@Override
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
this.source = source;
}

@Override
public boolean hasTop() {
return tk != null;
}

@Override
public void next() throws IOException {
this.tk = null;
this.tv = null;
this.bitset = null;
if (source.hasTop()) {
tk = source.getTopKey();

String cq = tk.getColumnQualifier().toString();
String date = cq.substring(0, cq.indexOf('\u0000'));

bitset = BitSet.valueOf(source.getTopValue().get());
source.next();

while (source.hasTop() && sameDay(date, source.getTopKey())) {
BitSet candidate = BitSet.valueOf(source.getTopValue().get());
bitset.or(candidate);
source.next();
}

tv = new Value(bitset.toByteArray());
}
}

protected boolean sameDay(String date, Key next) {
String nextCQ = next.getColumnQualifier().toString();
return nextCQ.startsWith(date);
}

@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
if (!range.isStartKeyInclusive()) {
Key start = range.getStartKey();
Key next = new Key(start.getRow(), start.getColumnFamily(), new Text(start.getColumnQualifier() + "\u0000\uFFFF"));
range = new Range(next, true, range.getEndKey(), range.isEndKeyInclusive());
}

source.seek(range, columnFamilies, inclusive);
next();
}

@Override
public Key getTopKey() {
return tk;
}

@Override
public Value getTopValue() {
return tv;
}

@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
TruncatedIndexIterator copy = new TruncatedIndexIterator();
copy.source = source.deepCopy(env);
return copy;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package datawave.query.index.lookup;

import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.commons.jexl3.parser.ASTEQNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import datawave.query.config.ShardQueryConfiguration;
import datawave.query.iterator.QueryOptions;
import datawave.query.jexl.JexlASTHelper;
import datawave.query.tables.ScannerFactory;
import datawave.query.tables.TruncatedIndexScanner;
import datawave.query.util.MetadataHelper;
import datawave.security.util.AuthorizationsMinimizer;

/**
* A {@link QueryPlanStream} configured to run against a truncated shard index
*/
public class TruncatedRangeStream extends RangeStream {

private static final Logger log = LoggerFactory.getLogger(TruncatedRangeStream.class);

public TruncatedRangeStream(ShardQueryConfiguration config, ScannerFactory scanners, MetadataHelper metadataHelper) {
super(config, scanners, metadataHelper);
}

@Override
public ScannerStream visit(ASTEQNode node, Object data) {

if (isUnOrNotFielded(node)) {
return ScannerStream.noData(node);
}

// We are looking for identifier = literal
JexlASTHelper.IdentifierOpLiteral op = JexlASTHelper.getIdentifierOpLiteral(node);
if (op == null) {
return ScannerStream.delayed(node);
}

final String fieldName = op.deconstructIdentifier();

// Null literals cannot be resolved against the index.
if (op.getLiteralValue() == null) {
return ScannerStream.delayed(node);
}

// toString of String returns the String
String literal = op.getLiteralValue().toString();

if (QueryOptions.DEFAULT_DATATYPE_FIELDNAME.equals(fieldName)) {
return ScannerStream.delayed(node);
}

// Check if field is not indexed
if (!isIndexed(fieldName, config.getIndexedFields())) {
try {
if (this.getAllFieldsFromHelper().contains(fieldName)) {
if (maxLinesToPrint > 0 && linesPrinted < maxLinesToPrint) {
linesPrinted++;
log.debug("{'{}':'{}'} is not an observed field.", fieldName, literal);
}
return ScannerStream.delayed(node);
}
} catch (TableNotFoundException e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
if (maxLinesToPrint > 0 && ++linesPrinted < maxLinesToPrint) {
linesPrinted++;
log.debug("{'{}':'{}'} is not indexed.", fieldName, literal);
}
// even though the field is not indexed it may still be valuable when evaluating an event. mark this scanner stream as delayed, so it is correctly
// propagated
return ScannerStream.delayed(node);
}

// Final case, field is indexed
if (maxLinesToPrint > 0 && linesPrinted < maxLinesToPrint) {
linesPrinted++;
log.debug("{'{}':'{}'} is indexed.", fieldName, literal);
}

try {
TruncatedIndexScanner scanner = new TruncatedIndexScanner(config);
if (config.getDatatypeFilter() != null && !config.getDatatypeFilter().isEmpty()) {
scanner.setDatatypes(config.getDatatypeFilter());
}
scanner.setTableName(config.getTruncatedIndexTableName());
scanner.setFieldValue(fieldName, literal);
scanner.setBasePriority(config.getBaseIteratorPriority());

Authorizations auths = AuthorizationsMinimizer.minimize(config.getAuthorizations()).iterator().next();
scanner.setAuths(auths);

return ScannerStream.initialized(scanner, node);
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}

@Override
public void close() {
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
import datawave.query.index.lookup.IndexStream;
import datawave.query.index.lookup.QueryPlanStream;
import datawave.query.index.lookup.RangeStream;
import datawave.query.index.lookup.TruncatedIndexIterator;
import datawave.query.index.lookup.TruncatedRangeStream;
import datawave.query.iterator.CloseableListIterable;
import datawave.query.iterator.QueryIterator;
import datawave.query.iterator.QueryOptions;
Expand Down Expand Up @@ -3101,6 +3103,10 @@ private QueryPlanStream getQueryPlanStream(ShardQueryConfiguration config, Scann

if (config.isUseShardedIndex()) {
return getDayIndexStream(config);
} else if (config.isUseTruncatedIndex()) {
this.rangeStreamClass = TruncatedRangeStream.class.getCanonicalName();
this.createUidsIteratorClass = TruncatedIndexIterator.class;
return initializeRangeStream(config, scannerFactory, metadataHelper);
} else {
return initializeRangeStream(config, scannerFactory, metadataHelper);
}
Expand Down
Loading
Loading