Skip to content

Commit 141087b

Browse files
jack-gitdevhgklohr
authored andcommitted
Create LazyLoadingRangesIterator for use with SSDeep (#3203)
This adds a LazyLoadingRangesIterator, which calculates a certain number of ranges at a time. The amount of ranges calculated at a time is configurable. This avoids calculating all ranges at the same time and storing them all in memory.
1 parent e51e9a6 commit 141087b

File tree

11 files changed

+264
-99
lines changed

11 files changed

+264
-99
lines changed

warehouse/query-core/src/main/java/datawave/query/config/SSDeepSimilarityQueryConfiguration.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ public class SSDeepSimilarityQueryConfiguration extends GenericQueryConfiguratio
1414

1515
int queryThreads = 100;
1616

17+
/** The number of ranges generated at a time by {@link datawave.query.tables.chained.iterators.LazyLoadingRangesIterator} */
18+
int numRangesPerScanner = 50000;
19+
1720
int ngramSize = NGramGenerator.DEFAULT_NGRAM_SIZE;
1821
int maxRepeatedCharacters = SSDeepHash.DEFAULT_MAX_REPEATED_CHARACTERS;
1922
int minHashSize = NGramGenerator.DEFAULT_MIN_HASH_SIZE;
@@ -73,6 +76,7 @@ public void copyFrom(SSDeepSimilarityQueryConfiguration other) {
7376
setMaxRepeatedCharacters(other.getMaxRepeatedCharacters());
7477
setMinHashSize(other.getMinHashSize());
7578
setNGramSize(other.getNGramSize());
79+
setNumRangesPerScanner(other.getNumRangesPerScanner());
7680
setQueryThreads(other.getQueryThreads());
7781
setDedupeSimilarityHashes(other.isDedupeSimilarityHashes());
7882
setMaxHashes(other.getMaxHashes());
@@ -96,6 +100,14 @@ public void setQueryThreads(int queryThreads) {
96100
this.queryThreads = queryThreads;
97101
}
98102

103+
public int getNumRangesPerScanner() {
104+
return numRangesPerScanner;
105+
}
106+
107+
public void setNumRangesPerScanner(int numRangesPerScanner) {
108+
this.numRangesPerScanner = numRangesPerScanner;
109+
}
110+
99111
public int getNGramSize() {
100112
return ngramSize;
101113
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package datawave.query.tables.chained.iterators;
2+
3+
import java.util.Collection;
4+
import java.util.HashSet;
5+
import java.util.Iterator;
6+
import java.util.Map;
7+
import java.util.concurrent.atomic.AtomicLong;
8+
9+
import org.apache.accumulo.core.client.BatchScanner;
10+
import org.apache.accumulo.core.client.TableNotFoundException;
11+
import org.apache.accumulo.core.data.Key;
12+
import org.apache.accumulo.core.data.Range;
13+
import org.apache.accumulo.core.data.Value;
14+
import org.apache.hadoop.io.Text;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
import datawave.query.config.SSDeepSimilarityQueryConfiguration;
19+
import datawave.query.exceptions.DatawaveFatalQueryException;
20+
import datawave.query.tables.ScannerFactory;
21+
import datawave.query.tables.ssdeep.SSDeepMaxHashPerNGramFilter;
22+
import datawave.query.tables.ssdeep.SSDeepParsingFunction;
23+
import datawave.query.tables.ssdeep.SSDeepScoringFunction;
24+
import datawave.query.tables.ssdeep.SSDeepSeenFunction;
25+
import datawave.query.tables.ssdeep.ScoredSSDeepPair;
26+
import datawave.util.ssdeep.ChunkSizeEncoding;
27+
import datawave.util.ssdeep.IntegerEncoding;
28+
import datawave.util.ssdeep.NGramTuple;
29+
import datawave.util.ssdeep.SSDeepHash;
30+
31+
/**
32+
* Iterator that lazily calculates ranges. The amount of ranges to be calculated at a time is configurable. This approach avoids creating all the ranges at the
33+
* same time and storing them all in memory for the life of the query.
34+
*/
35+
public class LazyLoadingRangesIterator implements Iterator<ScoredSSDeepPair> {
36+
private static final Logger log = LoggerFactory.getLogger(LazyLoadingRangesIterator.class);
37+
38+
private final Iterator<NGramTuple> queryMapKeysIterator;
39+
private final ScannerFactory scannerFactory;
40+
private BatchScanner scanner;
41+
private Iterator<Map.Entry<Key,Value>> scannerIterator;
42+
private final int numRangesPerScanner;
43+
44+
private final ChunkSizeEncoding chunkSizeEncoder = new ChunkSizeEncoding();
45+
private final IntegerEncoding bucketEncoder;
46+
private final int indexBuckets;
47+
48+
private final SSDeepSimilarityQueryConfiguration config;
49+
50+
private final SSDeepParsingFunction parsingFunction;
51+
private final SSDeepSeenFunction ssDeepDedupeFunction;
52+
private final SSDeepMaxHashPerNGramFilter maxHashPerNGramLimiter;
53+
private final AtomicLong count = new AtomicLong(0);
54+
private final long maxResults;
55+
private final SSDeepScoringFunction scoringFunction;
56+
57+
private Iterator<ScoredSSDeepPair> currentScoredSSDeepPairsIterator;
58+
59+
private ScoredSSDeepPair next;
60+
61+
public LazyLoadingRangesIterator(SSDeepSimilarityQueryConfiguration ssDeepSimilarityQueryConfiguration, ScannerFactory scannerFactory, long maxResults)
62+
throws TableNotFoundException {
63+
config = ssDeepSimilarityQueryConfiguration;
64+
this.maxResults = maxResults;
65+
queryMapKeysIterator = config.getState().getQueryMap().keys().iterator();
66+
bucketEncoder = new IntegerEncoding(config.getBucketEncodingBase(), config.getBucketEncodingLength());
67+
indexBuckets = config.getIndexBuckets();
68+
numRangesPerScanner = config.getNumRangesPerScanner();
69+
70+
this.scannerFactory = scannerFactory;
71+
72+
parsingFunction = new SSDeepParsingFunction(config);
73+
ssDeepDedupeFunction = new SSDeepSeenFunction();
74+
maxHashPerNGramLimiter = new SSDeepMaxHashPerNGramFilter(config);
75+
scoringFunction = new SSDeepScoringFunction(config);
76+
}
77+
78+
/**
79+
* Continue creating new iterators using a new calculated set of ranges until a next result can be found
80+
*
81+
* @return true if a result has been found, false if not
82+
*/
83+
@Override
84+
public boolean hasNext() {
85+
if (next != null) {
86+
return true;
87+
}
88+
89+
boolean foundNext = false;
90+
while (!foundNext) {
91+
if (currentScoredSSDeepPairsIterator == null || !currentScoredSSDeepPairsIterator.hasNext()) {
92+
while (scannerIterator == null || !scannerIterator.hasNext()) {
93+
if (queryMapKeysIterator.hasNext()) {
94+
try {
95+
scannerIterator = createIteratorWithNewRanges();
96+
} catch (TableNotFoundException e) {
97+
throw new RuntimeException(e);
98+
}
99+
} else {
100+
return false;
101+
}
102+
}
103+
104+
Map.Entry<Key,Value> entry = scannerIterator.next();
105+
106+
Map.Entry<NGramTuple,SSDeepHash> simplifiedEntry = parsingFunction.apply(entry);
107+
108+
if (config.isDedupeSimilarityHashes() && !ssDeepDedupeFunction.test(simplifiedEntry)) {
109+
continue;
110+
}
111+
112+
if (config.getMaxHashesPerNGram() > -1 && !maxHashPerNGramLimiter.test(simplifiedEntry)) {
113+
continue;
114+
}
115+
116+
if (maxResults > -1 && count.incrementAndGet() > maxResults) {
117+
throw new DatawaveFatalQueryException("Exceeded max work");
118+
}
119+
120+
if (currentScoredSSDeepPairsIterator == null || !currentScoredSSDeepPairsIterator.hasNext()) {
121+
currentScoredSSDeepPairsIterator = scoringFunction.apply(simplifiedEntry).iterator();
122+
}
123+
124+
if (currentScoredSSDeepPairsIterator.hasNext()) {
125+
foundNext = true;
126+
next = currentScoredSSDeepPairsIterator.next();
127+
}
128+
} else {
129+
foundNext = true;
130+
next = currentScoredSSDeepPairsIterator.next();
131+
}
132+
}
133+
134+
return true;
135+
}
136+
137+
/**
138+
* Return next and reset
139+
*
140+
* @return the next item
141+
*/
142+
@Override
143+
public ScoredSSDeepPair next() {
144+
ScoredSSDeepPair scoredPair = next;
145+
next = null;
146+
return scoredPair;
147+
}
148+
149+
/**
150+
* Process the query map keys to create ranges to scan in Accumulo. The number of ranges generated at a time is determined by {@code numRangesPerScanner}.
151+
* The number of ranges may go over numRangesPerScanner slightly due to processing all index buckets per NGramTuple
152+
*
153+
* @return an iterator based on the next set of calculated ranges
154+
*/
155+
private Iterator<Map.Entry<Key,Value>> createIteratorWithNewRanges() throws TableNotFoundException {
156+
final Collection<Range> ranges = new HashSet<>();
157+
158+
while (queryMapKeysIterator.hasNext() && ranges.size() <= numRangesPerScanner) {
159+
NGramTuple ct = queryMapKeysIterator.next();
160+
final String sizeAndChunk = chunkSizeEncoder.encode(ct.getChunkSize()) + ct.getChunk();
161+
for (int i = 0; i < indexBuckets; i++) {
162+
final String bucketedSizeAndChunk = bucketEncoder.encode(i) + sizeAndChunk;
163+
ranges.add(Range.exact(new Text(bucketedSizeAndChunk)));
164+
}
165+
}
166+
167+
if (scanner != null) {
168+
scanner.close();
169+
}
170+
scanner = scannerFactory.newScanner(config.getTableName(), config.getAuthorizations(), config.getQueryThreads(), config.getQuery());
171+
172+
scanner.setRanges(ranges);
173+
174+
log.debug("Lazy loaded {} ranges.", ranges.size());
175+
log.trace("Ranges are: {}", ranges);
176+
177+
return scanner.stream().iterator();
178+
}
179+
}

warehouse/query-core/src/main/java/datawave/query/tables/ssdeep/SSDeepChainedDiscoveryQueryLogic.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ public SSDeepChainedDiscoveryQueryLogic(SSDeepChainedDiscoveryQueryLogic other)
3434

3535
@Override
3636
public void close() {
37+
this.logic2.close();
38+
this.logic1.close();
3739
super.close();
40+
this.logic2 = null;
41+
this.logic1 = null;
3842
}
3943

4044
public GenericQueryConfiguration initialize(AccumuloClient client, Query settings, Set<Authorizations> auths) throws Exception {

warehouse/query-core/src/main/java/datawave/query/tables/ssdeep/SSDeepDiscoveryQueryLogic.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,12 @@ public Object clone() throws CloneNotSupportedException {
228228
return new SSDeepDiscoveryQueryLogic(this);
229229
}
230230

231+
@Override
232+
public void close() {
233+
super.close();
234+
discoveryDelegate.close();
235+
}
236+
231237
@Override
232238
public AccumuloConnectionFactory.Priority getConnectionPriority() {
233239
return discoveryDelegate.getConnectionPriority();

warehouse/query-core/src/main/java/datawave/query/tables/ssdeep/SSDeepScoringFunction.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package datawave.query.tables.ssdeep;
22

33
import java.util.Collection;
4+
import java.util.HashSet;
45
import java.util.Map;
56
import java.util.Set;
67
import java.util.function.Function;
7-
import java.util.stream.Stream;
88

99
import org.apache.log4j.Logger;
1010

@@ -18,7 +18,7 @@
1818
import datawave.util.ssdeep.SSDeepNGramOverlapScorer;
1919

2020
/** A function that transforms entries retrieved from Accumulo into Scored SSDeep hash matches */
21-
public class SSDeepScoringFunction implements Function<Map.Entry<NGramTuple,SSDeepHash>,Stream<ScoredSSDeepPair>> {
21+
public class SSDeepScoringFunction implements Function<Map.Entry<NGramTuple,SSDeepHash>,Collection<ScoredSSDeepPair>> {
2222

2323
public static final String MIN_SSDEEP_SCORE_PARAMETER = "minScore";
2424
private static final Logger log = Logger.getLogger(SSDeepScoringFunction.class);
@@ -55,7 +55,7 @@ public SSDeepScoringFunction(SSDeepSimilarityQueryConfiguration config) {
5555
*/
5656
private int readOptionalMinScoreThreshold(Query query) {
5757
QueryImpl.Parameter minScoreParameter = query.findParameter(MIN_SSDEEP_SCORE_PARAMETER);
58-
if (minScoreParameter != null) {
58+
if (minScoreParameter != null && !minScoreParameter.getParameterValue().isBlank()) {
5959
String minScoreString = minScoreParameter.getParameterValue();
6060
try {
6161
int minScore = Integer.parseInt(minScoreString);
@@ -79,27 +79,29 @@ private int readOptionalMinScoreThreshold(Query query) {
7979
*
8080
* @param entry
8181
* the function argument
82-
* @return A Stream of scored SSDeep pairs related to the row returned by Accumulo.
82+
* @return A Collection of scored SSDeep pairs related to the row returned by Accumulo.
8383
*/
8484
@Override
85-
public Stream<ScoredSSDeepPair> apply(Map.Entry<NGramTuple,SSDeepHash> entry) {
85+
public Collection<ScoredSSDeepPair> apply(Map.Entry<NGramTuple,SSDeepHash> entry) {
8686
NGramTuple ngram = entry.getKey();
8787
SSDeepHash matchingHash = entry.getValue();
8888

8989
// extract the query ssdeeps that contained this ngram from the query map.
9090
Collection<SSDeepHash> queryHashes = queryState.getQueryMap().get(ngram);
9191

92+
Collection<ScoredSSDeepPair> scoredSSDeepPairs = new HashSet<>();
93+
9294
// score the match between each query ssdeep and matching hash, keep those that exceed the match
9395
// threshold.
94-
return queryHashes.stream().flatMap(queryHash -> {
96+
for (SSDeepHash queryHash : queryHashes) {
9597
Set<NGramTuple> overlappingNGrams = ngramOverlapScorer.apply(queryHash, matchingHash);
9698
int weightedScore = editDistanceScorer.apply(queryHash, matchingHash);
9799
if (minScoreThreshold <= 0 || weightedScore > minScoreThreshold) {
98-
return Stream.of(new ScoredSSDeepPair(queryHash, matchingHash, overlappingNGrams, weightedScore));
99-
} else {
100-
return Stream.empty();
100+
scoredSSDeepPairs.add(new ScoredSSDeepPair(queryHash, matchingHash, overlappingNGrams, weightedScore));
101101
}
102-
});
102+
}
103+
104+
return scoredSSDeepPairs;
103105
}
104106

105107
}

0 commit comments

Comments
 (0)