Skip to content

Commit 31dfa62

Browse files
committed
Added ranked buckets
1 parent b62ccd4 commit 31dfa62

File tree

9 files changed

+419
-8
lines changed

9 files changed

+419
-8
lines changed

ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccount.java

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,86 @@
11
package com.microsoft.azure.kusto.ingest.resources;
22

3+
import com.microsoft.azure.kusto.ingest.utils.TimeProvider;
4+
5+
import java.util.ArrayDeque;
6+
37
public class RankedStorageAccount {
8+
class Bucket {
9+
public int successCount;
10+
public int failureCount;
11+
12+
public Bucket() {
13+
this.successCount = 0;
14+
this.failureCount = 0;
15+
}
16+
}
17+
18+
private final ArrayDeque<Bucket> buckets = new ArrayDeque<>();
419
private final String accountName;
20+
private final int bucketCount;
21+
private final int bucketDurationInSec;
22+
private final TimeProvider timeProvider;
523

6-
public RankedStorageAccount(String accountName) {
24+
private long lastActionTimestamp;
25+
26+
public RankedStorageAccount(String accountName, int bucketCount, int bucketDurationInSec, TimeProvider timeProvider) {
727
this.accountName = accountName;
28+
this.bucketCount = bucketCount;
29+
this.bucketDurationInSec = bucketDurationInSec;
30+
this.timeProvider = timeProvider;
31+
lastActionTimestamp = timeProvider.currentTimeMillis();
832
}
933

1034
public void addResult(boolean success) {
35+
adjustForTimePassed();
36+
Bucket lastBucket = buckets.peek();
37+
assert lastBucket != null;
38+
if (success) {
39+
lastBucket.successCount++;
40+
} else {
41+
lastBucket.failureCount++;
42+
}
43+
}
44+
45+
private void adjustForTimePassed() {
46+
if (buckets.isEmpty()) {
47+
buckets.push(new Bucket());
48+
return;
49+
}
50+
51+
long timePassed = timeProvider.currentTimeMillis() - lastActionTimestamp;
52+
long bucketsToCreate = timePassed / (bucketDurationInSec * 1000L);
53+
if (bucketsToCreate >= bucketCount) {
54+
buckets.clear();
55+
buckets.push(new Bucket());
56+
return;
57+
}
1158

59+
for (int i = 0; i < bucketsToCreate; i++) {
60+
buckets.push(new Bucket());
61+
if (buckets.size() > bucketCount) {
62+
buckets.poll();
63+
}
64+
}
65+
lastActionTimestamp = timeProvider.currentTimeMillis();
1266
}
1367

1468
public double getRank() {
15-
return 1;
69+
int penalty = buckets.size() + 1;
70+
double rank = 0;
71+
double totalPenalty = 0;
72+
for (Bucket bucket : buckets) {
73+
int total = bucket.successCount + bucket.failureCount;
74+
if (total == 0) {
75+
penalty--;
76+
continue;
77+
}
78+
double successRate = (double) bucket.successCount / total;
79+
rank += successRate * penalty;
80+
totalPenalty += penalty;
81+
penalty--;
82+
}
83+
return rank / totalPenalty;
1684
}
1785

1886
public String getAccountName() {

ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountSet.java

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,42 @@
11
package com.microsoft.azure.kusto.ingest.resources;
22

3+
import com.microsoft.azure.kusto.ingest.utils.DefaultRandomProvider;
4+
import com.microsoft.azure.kusto.ingest.utils.RandomProvider;
5+
import com.microsoft.azure.kusto.ingest.utils.SystemTimeProvider;
6+
import com.microsoft.azure.kusto.ingest.utils.TimeProvider;
37
import org.jetbrains.annotations.NotNull;
48
import org.jetbrains.annotations.Nullable;
59

610
import java.util.*;
11+
import java.util.stream.Collectors;
712

813
public class RankedStorageAccountSet {
14+
private static final int DEFAULT_BUCKET_COUNT = 6;
15+
private static final int DEFAULT_BUCKET_DURATION_IN_SEC = 10;
16+
private static final int[] DEFAULT_TIERS = new int[]{90, 70, 30, 0};
17+
18+
public static final TimeProvider DEFAULT_TIME_PROVIDER = new SystemTimeProvider();
19+
public static final RandomProvider DEFAULT_RANDOM_PROVIDER = new DefaultRandomProvider();
920
private final Map<String, RankedStorageAccount> accounts;
21+
private final int bucketCount;
22+
private final int bucketDurationInSec;
23+
private final int[] tiers;
24+
private final TimeProvider timeProvider;
25+
private final RandomProvider randomProvider;
1026

11-
public RankedStorageAccountSet() {
27+
public RankedStorageAccountSet(int bucketCount, int bucketDurationInSec, int[] tiers, TimeProvider timeProvider, RandomProvider randomProvider) {
28+
this.bucketCount = bucketCount;
29+
this.bucketDurationInSec = bucketDurationInSec;
30+
this.tiers = tiers;
31+
this.timeProvider = timeProvider;
32+
this.randomProvider = randomProvider;
1233
this.accounts = new HashMap<>();
1334
}
1435

36+
public RankedStorageAccountSet() {
37+
this(DEFAULT_BUCKET_COUNT, DEFAULT_BUCKET_DURATION_IN_SEC, DEFAULT_TIERS, DEFAULT_TIME_PROVIDER, DEFAULT_RANDOM_PROVIDER);
38+
}
39+
1540
public void addResultToAccount(String accountName, boolean success) {
1641
RankedStorageAccount account = accounts.get(accountName);
1742
if (account != null) {
@@ -23,7 +48,7 @@ public void addResultToAccount(String accountName, boolean success) {
2348

2449
public void addAccount(String accountName) {
2550
if (!accounts.containsKey(accountName)) {
26-
accounts.put(accountName, new RankedStorageAccount(accountName));
51+
accounts.put(accountName, new RankedStorageAccount(accountName, bucketCount, bucketDurationInSec, timeProvider));
2752
} else {
2853
throw new IllegalArgumentException("Account " + accountName + " already exists");
2954
}
@@ -44,10 +69,28 @@ public RankedStorageAccount getAccount(String accountName) {
4469

4570
@NotNull
4671
public List<RankedStorageAccount> getRankedShuffledAccounts() {
47-
List<RankedStorageAccount> accounts = new ArrayList<>(this.accounts.values());
48-
// shuffle accounts
49-
Collections.shuffle(accounts);
50-
return accounts;
72+
List<List<RankedStorageAccount>> tiersList = new ArrayList<>();
73+
74+
for (int i = 0; i < tiers.length; i++) {
75+
tiersList.add(new ArrayList<>());
76+
}
77+
78+
for (RankedStorageAccount account : this.accounts.values()) {
79+
double rankPercentage = account.getRank() * 100.0;
80+
for (int i = 0; i < tiers.length; i++) {
81+
if (rankPercentage >= tiers[i]) {
82+
tiersList.get(i).add(account);
83+
break;
84+
}
85+
}
86+
}
87+
88+
for (List<RankedStorageAccount> tier : tiersList) {
89+
randomProvider.shuffle(tier);
90+
}
91+
92+
// flatten tiers
93+
return tiersList.stream().flatMap(Collection::stream).collect(Collectors.toList());
5194
}
5295

5396
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.microsoft.azure.kusto.ingest.utils;
2+
3+
import java.util.Collections;
4+
import java.util.List;
5+
import java.util.Random;
6+
7+
public class DefaultRandomProvider implements RandomProvider {
8+
public Random random;
9+
10+
public DefaultRandomProvider(Random random) {
11+
this.random = random;
12+
}
13+
14+
public DefaultRandomProvider() {
15+
this.random = new Random();
16+
}
17+
18+
@Override
19+
public void shuffle(List<?> list) {
20+
Collections.shuffle(list, random);
21+
}
22+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.microsoft.azure.kusto.ingest.utils;
2+
3+
import java.util.List;
4+
5+
public interface RandomProvider {
6+
void shuffle(List<?> list);
7+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.microsoft.azure.kusto.ingest.utils;
2+
3+
public class SystemTimeProvider implements TimeProvider {
4+
@Override
5+
public long currentTimeMillis() {
6+
return System.currentTimeMillis();
7+
}
8+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package com.microsoft.azure.kusto.ingest.utils;
2+
3+
public interface TimeProvider {
4+
long currentTimeMillis();
5+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.microsoft.azure.kusto.ingest;
2+
3+
import com.microsoft.azure.kusto.ingest.utils.TimeProvider;
4+
5+
public class MockTimeProvider implements TimeProvider {
6+
private long currentTimeMillis;
7+
8+
public MockTimeProvider(long currentTimeMillis) {
9+
this.currentTimeMillis = currentTimeMillis;
10+
}
11+
12+
@Override
13+
public long currentTimeMillis() {
14+
return currentTimeMillis;
15+
}
16+
17+
public void setCurrentTimeMillis(long currentTimeMillis) {
18+
this.currentTimeMillis = currentTimeMillis;
19+
}
20+
21+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package com.microsoft.azure.kusto.ingest.resources;
2+
3+
4+
import com.microsoft.azure.kusto.ingest.MockTimeProvider;
5+
import com.microsoft.azure.kusto.ingest.utils.RandomProvider;
6+
import org.junit.jupiter.api.Test;
7+
8+
import java.util.List;
9+
10+
import static org.junit.jupiter.api.Assertions.assertEquals;
11+
12+
class RankedStorageAccountSetTest {
13+
14+
class ByNameReverseOrderRandomProvider implements RandomProvider {
15+
@Override
16+
public void shuffle(List<?> list) {
17+
list.sort((o1, o2) -> ((RankedStorageAccount) o2).getAccountName().compareTo(((RankedStorageAccount) o1).getAccountName()));
18+
}
19+
}
20+
21+
@Test
22+
public void testShuffledAccountsNoTiers() {
23+
int[] ints = new int[]{0};
24+
25+
RankedStorageAccountSet rankedStorageAccountSet = new RankedStorageAccountSet(
26+
6,
27+
10,
28+
ints,
29+
new MockTimeProvider(System.currentTimeMillis()),
30+
new ByNameReverseOrderRandomProvider());
31+
32+
rankedStorageAccountSet.addAccount("aSuccessful");
33+
rankedStorageAccountSet.addAccount("bFailed");
34+
rankedStorageAccountSet.addAccount("cHalf");
35+
36+
rankedStorageAccountSet.addResultToAccount("aSuccessful", true);
37+
rankedStorageAccountSet.addResultToAccount("bFailed", false);
38+
rankedStorageAccountSet.addResultToAccount("cHalf", true);
39+
rankedStorageAccountSet.addResultToAccount("cHalf", false);
40+
41+
List<RankedStorageAccount> accounts = rankedStorageAccountSet.getRankedShuffledAccounts();
42+
assertEquals("cHalf", accounts.get(0).getAccountName());
43+
assertEquals("bFailed", accounts.get(1).getAccountName());
44+
assertEquals("aSuccessful", accounts.get(2).getAccountName());
45+
}
46+
47+
@Test
48+
public void testShuffledAccounts() {
49+
int[] tiers = new int[]{90, 70, 30, 0};
50+
51+
RankedStorageAccountSet rankedStorageAccountSet = new RankedStorageAccountSet(
52+
6,
53+
10,
54+
tiers,
55+
new MockTimeProvider(System.currentTimeMillis()),
56+
new ByNameReverseOrderRandomProvider());
57+
58+
int[] values = new int[] {95, 40, 80, 20, 97, 10, 50, 75, 29, 0};
59+
for (int i = 0; i < values.length; i++) {
60+
String name = String.format("%s%d", (char)('a' + i), values[i]);
61+
rankedStorageAccountSet.addAccount(name);
62+
for (int j = 0; j < 100; j++) {
63+
rankedStorageAccountSet.addResultToAccount(name, j < values[i]);
64+
}
65+
}
66+
67+
String[][] expected = new String[][] {
68+
{"e97", "a95"},
69+
{"h75", "c80"},
70+
{"g50", "b40"},
71+
{"j0", "i29", "f10", "d20"}
72+
};
73+
74+
List<RankedStorageAccount> accounts = rankedStorageAccountSet.getRankedShuffledAccounts();
75+
int total = 0;
76+
for (String[] strings : expected) {
77+
for (int j = 0; j < strings.length; j++) {
78+
assertEquals(strings[j], accounts.get(total + j).getAccountName());
79+
}
80+
total += strings.length;
81+
}
82+
}
83+
84+
@Test
85+
public void testShuffledAccountsEmptyTier() {
86+
int[] tiers = new int[]{90, 70, 30, 0};
87+
88+
RankedStorageAccountSet rankedStorageAccountSet = new RankedStorageAccountSet(
89+
6,
90+
10,
91+
tiers,
92+
new MockTimeProvider(System.currentTimeMillis()),
93+
new ByNameReverseOrderRandomProvider());
94+
95+
int[] values = new int[] {95, 40, 20, 97, 10, 50};
96+
for (int i = 0; i < values.length; i++) {
97+
String name = String.format("%s%d", (char)('a' + i), values[i]);
98+
rankedStorageAccountSet.addAccount(name);
99+
for (int j = 0; j < 100; j++) {
100+
rankedStorageAccountSet.addResultToAccount(name, j < values[i]);
101+
}
102+
}
103+
104+
String[][] expected = new String[][] {
105+
{"d97", "a95"},
106+
{"f50", "b40"},
107+
{"e10", "c20"}
108+
};
109+
110+
List<RankedStorageAccount> accounts = rankedStorageAccountSet.getRankedShuffledAccounts();
111+
int total = 0;
112+
for (String[] strings : expected) {
113+
for (int j = 0; j < strings.length; j++) {
114+
assertEquals(strings[j], accounts.get(total + j).getAccountName());
115+
}
116+
total += strings.length;
117+
}
118+
}
119+
120+
}

0 commit comments

Comments
 (0)