Skip to content

Commit 0bb1573

Browse files
authored
atlas: handle OR with multiple matches (#1184)
Update query index to automatically dedup results to handle the case of OR clauses where both sides match the same data point. Before, the same match would be returned for each OR clause that matched a given data point. If the consumer did not handle that case, then they could aggregate the same value multiple times leading to incorrect results. This will add some overhead as the set needs to be maintained, but should improve correctness for general usage.
1 parent cdd5bbe commit 0bb1573

File tree

2 files changed

+35
-1
lines changed

2 files changed

+35
-1
lines changed

spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/QueryIndex.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.ArrayList;
2424
import java.util.Collections;
2525
import java.util.Deque;
26+
import java.util.HashSet;
2627
import java.util.List;
2728
import java.util.Set;
2829
import java.util.concurrent.ConcurrentHashMap;
@@ -63,6 +64,27 @@ public Cache<String, List<QueryIndex<V>>> get() {
6364
}
6465
}
6566

67+
/**
68+
* Dedup values as they are consumed. This is used to avoid processing the same result
69+
* multiple times in the case of OR clauses where multiple match the same data point.
70+
*/
71+
private static class DedupConsumer<T> implements Consumer<T> {
72+
73+
private final Consumer<T> consumer;
74+
private final Set<T> alreadySeen;
75+
76+
DedupConsumer(Consumer<T> consumer) {
77+
this.consumer = consumer;
78+
this.alreadySeen = new HashSet<>();
79+
}
80+
81+
@Override public void accept(T t) {
82+
if (alreadySeen.add(t)) {
83+
consumer.accept(t);
84+
}
85+
}
86+
}
87+
6688
/**
6789
* Return a new instance of an index that is empty. The default caching behavior will be
6890
* used.
@@ -365,7 +387,7 @@ public List<T> findMatches(Id id) {
365387
* Function to invoke for values associated with a query that matches the id.
366388
*/
367389
public void forEachMatch(Id id, Consumer<T> consumer) {
368-
forEachMatch(id, 0, consumer);
390+
forEachMatch(id, 0, new DedupConsumer<>(consumer));
369391
}
370392

371393
@SuppressWarnings("PMD.NPathComplexity")
@@ -473,6 +495,10 @@ public List<T> findMatches(Function<String, String> tags) {
473495
* Function to invoke for values associated with a query that matches the id.
474496
*/
475497
public void forEachMatch(Function<String, String> tags, Consumer<T> consumer) {
498+
forEachMatchImpl(tags, new DedupConsumer<>(consumer));
499+
}
500+
501+
private void forEachMatchImpl(Function<String, String> tags, Consumer<T> consumer) {
476502
// Matches for this level
477503
matches.forEach(consumer);
478504

spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/impl/QueryIndexTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,14 @@ public void inClauseExpansion() {
316316
assertEquals(list(query), idx, id1);
317317
}
318318

319+
@Test
320+
public void orMultiMatch() {
321+
Query q = Parser.parseQuery("name,a,:eq,name,a,:re,:or");
322+
QueryIndex<Query> idx = QueryIndex.newInstance(cacheSupplier);
323+
idx.add(q, q);
324+
assertEquals(list(q), idx, id("a"));
325+
}
326+
319327
@Test
320328
public void manyQueries() {
321329
// CpuUsage for all instances

0 commit comments

Comments
 (0)