From 31dfa62b650cdcc7d0b665cb7b2318597f2a9ad9 Mon Sep 17 00:00:00 2001 From: AsafMah Date: Mon, 4 Sep 2023 13:25:41 +0300 Subject: [PATCH] Added ranked buckets --- .../resources/RankedStorageAccount.java | 72 ++++++++++- .../resources/RankedStorageAccountSet.java | 55 +++++++- .../ingest/utils/DefaultRandomProvider.java | 22 ++++ .../kusto/ingest/utils/RandomProvider.java | 7 + .../ingest/utils/SystemTimeProvider.java | 8 ++ .../kusto/ingest/utils/TimeProvider.java | 5 + .../azure/kusto/ingest/MockTimeProvider.java | 21 +++ .../RankedStorageAccountSetTest.java | 120 ++++++++++++++++++ .../resources/RankedStorageAccountTest.java | 117 +++++++++++++++++ 9 files changed, 419 insertions(+), 8 deletions(-) create mode 100644 ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/DefaultRandomProvider.java create mode 100644 ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/RandomProvider.java create mode 100644 ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/SystemTimeProvider.java create mode 100644 ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/TimeProvider.java create mode 100644 ingest/src/test/java/com/microsoft/azure/kusto/ingest/MockTimeProvider.java create mode 100644 ingest/src/test/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountSetTest.java create mode 100644 ingest/src/test/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountTest.java diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccount.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccount.java index 9a5962cf..0016aec2 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccount.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccount.java @@ -1,18 +1,86 @@ package com.microsoft.azure.kusto.ingest.resources; +import com.microsoft.azure.kusto.ingest.utils.TimeProvider; + +import java.util.ArrayDeque; + public class RankedStorageAccount { + class Bucket { + public int successCount; + public int failureCount; + + public Bucket() { + this.successCount = 0; + this.failureCount = 0; + } + } + + private final ArrayDeque buckets = new ArrayDeque<>(); private final String accountName; + private final int bucketCount; + private final int bucketDurationInSec; + private final TimeProvider timeProvider; - public RankedStorageAccount(String accountName) { + private long lastActionTimestamp; + + public RankedStorageAccount(String accountName, int bucketCount, int bucketDurationInSec, TimeProvider timeProvider) { this.accountName = accountName; + this.bucketCount = bucketCount; + this.bucketDurationInSec = bucketDurationInSec; + this.timeProvider = timeProvider; + lastActionTimestamp = timeProvider.currentTimeMillis(); } public void addResult(boolean success) { + adjustForTimePassed(); + Bucket lastBucket = buckets.peek(); + assert lastBucket != null; + if (success) { + lastBucket.successCount++; + } else { + lastBucket.failureCount++; + } + } + + private void adjustForTimePassed() { + if (buckets.isEmpty()) { + buckets.push(new Bucket()); + return; + } + + long timePassed = timeProvider.currentTimeMillis() - lastActionTimestamp; + long bucketsToCreate = timePassed / (bucketDurationInSec * 1000L); + if (bucketsToCreate >= bucketCount) { + buckets.clear(); + buckets.push(new Bucket()); + return; + } + for (int i = 0; i < bucketsToCreate; i++) { + buckets.push(new Bucket()); + if (buckets.size() > bucketCount) { + buckets.poll(); + } + } + lastActionTimestamp = timeProvider.currentTimeMillis(); } public double getRank() { - return 1; + int penalty = buckets.size() + 1; + double rank = 0; + double totalPenalty = 0; + for (Bucket bucket : buckets) { + int total = bucket.successCount + bucket.failureCount; + if (total == 0) { + penalty--; + continue; + } + double successRate = (double) bucket.successCount / total; + rank += successRate * penalty; + totalPenalty += penalty; + penalty--; + } + return rank / totalPenalty; } public String getAccountName() { diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountSet.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountSet.java index 1308d92e..a4dffac3 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountSet.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountSet.java @@ -1,17 +1,42 @@ package com.microsoft.azure.kusto.ingest.resources; +import com.microsoft.azure.kusto.ingest.utils.DefaultRandomProvider; +import com.microsoft.azure.kusto.ingest.utils.RandomProvider; +import com.microsoft.azure.kusto.ingest.utils.SystemTimeProvider; +import com.microsoft.azure.kusto.ingest.utils.TimeProvider; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.util.*; +import java.util.stream.Collectors; public class RankedStorageAccountSet { + private static final int DEFAULT_BUCKET_COUNT = 6; + private static final int DEFAULT_BUCKET_DURATION_IN_SEC = 10; + private static final int[] DEFAULT_TIERS = new int[]{90, 70, 30, 0}; + + public static final TimeProvider DEFAULT_TIME_PROVIDER = new SystemTimeProvider(); + public static final RandomProvider DEFAULT_RANDOM_PROVIDER = new DefaultRandomProvider(); private final Map accounts; + private final int bucketCount; + private final int bucketDurationInSec; + private final int[] tiers; + private final TimeProvider timeProvider; + private final RandomProvider randomProvider; - public RankedStorageAccountSet() { + public RankedStorageAccountSet(int bucketCount, int bucketDurationInSec, int[] tiers, TimeProvider timeProvider, RandomProvider randomProvider) { + this.bucketCount = bucketCount; + this.bucketDurationInSec = bucketDurationInSec; + this.tiers = tiers; + this.timeProvider = timeProvider; + this.randomProvider = randomProvider; this.accounts = new HashMap<>(); } + public RankedStorageAccountSet() { + this(DEFAULT_BUCKET_COUNT, DEFAULT_BUCKET_DURATION_IN_SEC, DEFAULT_TIERS, DEFAULT_TIME_PROVIDER, DEFAULT_RANDOM_PROVIDER); + } + public void addResultToAccount(String accountName, boolean success) { RankedStorageAccount account = accounts.get(accountName); if (account != null) { @@ -23,7 +48,7 @@ public void addResultToAccount(String accountName, boolean success) { public void addAccount(String accountName) { if (!accounts.containsKey(accountName)) { - accounts.put(accountName, new RankedStorageAccount(accountName)); + accounts.put(accountName, new RankedStorageAccount(accountName, bucketCount, bucketDurationInSec, timeProvider)); } else { throw new IllegalArgumentException("Account " + accountName + " already exists"); } @@ -44,10 +69,28 @@ public RankedStorageAccount getAccount(String accountName) { @NotNull public List getRankedShuffledAccounts() { - List accounts = new ArrayList<>(this.accounts.values()); - // shuffle accounts - Collections.shuffle(accounts); - return accounts; + List> tiersList = new ArrayList<>(); + + for (int i = 0; i < tiers.length; i++) { + tiersList.add(new ArrayList<>()); + } + + for (RankedStorageAccount account : this.accounts.values()) { + double rankPercentage = account.getRank() * 100.0; + for (int i = 0; i < tiers.length; i++) { + if (rankPercentage >= tiers[i]) { + tiersList.get(i).add(account); + break; + } + } + } + + for (List tier : tiersList) { + randomProvider.shuffle(tier); + } + + // flatten tiers + return tiersList.stream().flatMap(Collection::stream).collect(Collectors.toList()); } } diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/DefaultRandomProvider.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/DefaultRandomProvider.java new file mode 100644 index 00000000..a05f1218 --- /dev/null +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/DefaultRandomProvider.java @@ -0,0 +1,22 @@ +package com.microsoft.azure.kusto.ingest.utils; + +import java.util.Collections; +import java.util.List; +import java.util.Random; + +public class DefaultRandomProvider implements RandomProvider { + public Random random; + + public DefaultRandomProvider(Random random) { + this.random = random; + } + + public DefaultRandomProvider() { + this.random = new Random(); + } + + @Override + public void shuffle(List list) { + Collections.shuffle(list, random); + } +} diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/RandomProvider.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/RandomProvider.java new file mode 100644 index 00000000..c7e47489 --- /dev/null +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/RandomProvider.java @@ -0,0 +1,7 @@ +package com.microsoft.azure.kusto.ingest.utils; + +import java.util.List; + +public interface RandomProvider { + void shuffle(List list); +} diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/SystemTimeProvider.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/SystemTimeProvider.java new file mode 100644 index 00000000..1a6f28b4 --- /dev/null +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/SystemTimeProvider.java @@ -0,0 +1,8 @@ +package com.microsoft.azure.kusto.ingest.utils; + +public class SystemTimeProvider implements TimeProvider { + @Override + public long currentTimeMillis() { + return System.currentTimeMillis(); + } +} diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/TimeProvider.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/TimeProvider.java new file mode 100644 index 00000000..ffd376a4 --- /dev/null +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/utils/TimeProvider.java @@ -0,0 +1,5 @@ +package com.microsoft.azure.kusto.ingest.utils; + +public interface TimeProvider { + long currentTimeMillis(); +} diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/MockTimeProvider.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/MockTimeProvider.java new file mode 100644 index 00000000..256ce3ab --- /dev/null +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/MockTimeProvider.java @@ -0,0 +1,21 @@ +package com.microsoft.azure.kusto.ingest; + +import com.microsoft.azure.kusto.ingest.utils.TimeProvider; + +public class MockTimeProvider implements TimeProvider { + private long currentTimeMillis; + + public MockTimeProvider(long currentTimeMillis) { + this.currentTimeMillis = currentTimeMillis; + } + + @Override + public long currentTimeMillis() { + return currentTimeMillis; + } + + public void setCurrentTimeMillis(long currentTimeMillis) { + this.currentTimeMillis = currentTimeMillis; + } + +} diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountSetTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountSetTest.java new file mode 100644 index 00000000..532e8d95 --- /dev/null +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountSetTest.java @@ -0,0 +1,120 @@ +package com.microsoft.azure.kusto.ingest.resources; + + +import com.microsoft.azure.kusto.ingest.MockTimeProvider; +import com.microsoft.azure.kusto.ingest.utils.RandomProvider; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class RankedStorageAccountSetTest { + + class ByNameReverseOrderRandomProvider implements RandomProvider { + @Override + public void shuffle(List list) { + list.sort((o1, o2) -> ((RankedStorageAccount) o2).getAccountName().compareTo(((RankedStorageAccount) o1).getAccountName())); + } + } + + @Test + public void testShuffledAccountsNoTiers() { + int[] ints = new int[]{0}; + + RankedStorageAccountSet rankedStorageAccountSet = new RankedStorageAccountSet( + 6, + 10, + ints, + new MockTimeProvider(System.currentTimeMillis()), + new ByNameReverseOrderRandomProvider()); + + rankedStorageAccountSet.addAccount("aSuccessful"); + rankedStorageAccountSet.addAccount("bFailed"); + rankedStorageAccountSet.addAccount("cHalf"); + + rankedStorageAccountSet.addResultToAccount("aSuccessful", true); + rankedStorageAccountSet.addResultToAccount("bFailed", false); + rankedStorageAccountSet.addResultToAccount("cHalf", true); + rankedStorageAccountSet.addResultToAccount("cHalf", false); + + List accounts = rankedStorageAccountSet.getRankedShuffledAccounts(); + assertEquals("cHalf", accounts.get(0).getAccountName()); + assertEquals("bFailed", accounts.get(1).getAccountName()); + assertEquals("aSuccessful", accounts.get(2).getAccountName()); + } + + @Test + public void testShuffledAccounts() { + int[] tiers = new int[]{90, 70, 30, 0}; + + RankedStorageAccountSet rankedStorageAccountSet = new RankedStorageAccountSet( + 6, + 10, + tiers, + new MockTimeProvider(System.currentTimeMillis()), + new ByNameReverseOrderRandomProvider()); + + int[] values = new int[] {95, 40, 80, 20, 97, 10, 50, 75, 29, 0}; + for (int i = 0; i < values.length; i++) { + String name = String.format("%s%d", (char)('a' + i), values[i]); + rankedStorageAccountSet.addAccount(name); + for (int j = 0; j < 100; j++) { + rankedStorageAccountSet.addResultToAccount(name, j < values[i]); + } + } + + String[][] expected = new String[][] { + {"e97", "a95"}, + {"h75", "c80"}, + {"g50", "b40"}, + {"j0", "i29", "f10", "d20"} + }; + + List accounts = rankedStorageAccountSet.getRankedShuffledAccounts(); + int total = 0; + for (String[] strings : expected) { + for (int j = 0; j < strings.length; j++) { + assertEquals(strings[j], accounts.get(total + j).getAccountName()); + } + total += strings.length; + } + } + + @Test + public void testShuffledAccountsEmptyTier() { + int[] tiers = new int[]{90, 70, 30, 0}; + + RankedStorageAccountSet rankedStorageAccountSet = new RankedStorageAccountSet( + 6, + 10, + tiers, + new MockTimeProvider(System.currentTimeMillis()), + new ByNameReverseOrderRandomProvider()); + + int[] values = new int[] {95, 40, 20, 97, 10, 50}; + for (int i = 0; i < values.length; i++) { + String name = String.format("%s%d", (char)('a' + i), values[i]); + rankedStorageAccountSet.addAccount(name); + for (int j = 0; j < 100; j++) { + rankedStorageAccountSet.addResultToAccount(name, j < values[i]); + } + } + + String[][] expected = new String[][] { + {"d97", "a95"}, + {"f50", "b40"}, + {"e10", "c20"} + }; + + List accounts = rankedStorageAccountSet.getRankedShuffledAccounts(); + int total = 0; + for (String[] strings : expected) { + for (int j = 0; j < strings.length; j++) { + assertEquals(strings[j], accounts.get(total + j).getAccountName()); + } + total += strings.length; + } + } + +} diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountTest.java new file mode 100644 index 00000000..cc13be55 --- /dev/null +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/resources/RankedStorageAccountTest.java @@ -0,0 +1,117 @@ +package com.microsoft.azure.kusto.ingest.resources; + +import com.microsoft.azure.kusto.ingest.MockTimeProvider; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +class RankedStorageAccountTest { + @Test + public void testGetRankWithAllSuccesses() { + // Rationale: To ensure that the getRank method correctly calculates the rank with all successes + MockTimeProvider timeProvider = new MockTimeProvider(System.currentTimeMillis()); + RankedStorageAccount account = new RankedStorageAccount("testAccount", 5, 10, timeProvider); + + for(int i = 0; i < 10; i++){ + account.addResult(true); + } + + double rank = account.getRank(); + System.out.println(rank); + assertEquals(1.0, rank, 0.001); + } + + @Test + public void testGetRankWithAllFailures() { + // Rationale: To ensure that the getRank method correctly calculates the rank with all failures + MockTimeProvider timeProvider = new MockTimeProvider(System.currentTimeMillis()); + RankedStorageAccount account = new RankedStorageAccount("testAccount", 5, 10, timeProvider); + + for(int i = 0; i < 10; i++){ + account.addResult(false); + } + + double rank = account.getRank(); + System.out.println(rank); + assertEquals(0.0, rank, 0.001); + } + + // Test getRank method with mixed results + @Test + public void testGetRankWithMixedResults() { + // Rationale: To ensure that the getRank method correctly calculates the rank with mixed results + MockTimeProvider timeProvider = new MockTimeProvider(System.currentTimeMillis()); + RankedStorageAccount account = new RankedStorageAccount("testAccount", 5, 10, timeProvider); + + for(int i = 0; i < 5; i++){ + account.addResult(true); + } + for(int i = 0; i < 5; i++){ + account.addResult(false); + } + + double rank = account.getRank(); + System.out.println(rank); + assertEquals(0.5, rank, 0.001); + } + + @Test + public void testNewBucketOverride() { + // Rationale: To ensure that the new bucket override works as expected + MockTimeProvider timeProvider = new MockTimeProvider(System.currentTimeMillis()); + RankedStorageAccount account = new RankedStorageAccount("testAccount", 5, 10, timeProvider); + + account.addResult(false); + account.addResult(false); + + double rank = account.getRank(); + System.out.println(rank); + assertEquals(0.0, rank, 0.001); + + timeProvider.setCurrentTimeMillis(timeProvider.currentTimeMillis() + 11000); + account.addResult(true); + rank = account.getRank(); + System.out.println(rank); + assertEquals(0.6, rank, 0.001); // it would be 0.333 without the override + } + + @Test + public void testSkipBucket() { + // Rationale: To ensure that the skip bucket works as expected + MockTimeProvider timeProvider = new MockTimeProvider(System.currentTimeMillis()); + RankedStorageAccount account = new RankedStorageAccount("testAccount", 5, 10, timeProvider); + + account.addResult(false); + account.addResult(false); + + double rank = account.getRank(); + System.out.println(rank); + assertEquals(0.0, rank, 0.001); + + timeProvider.setCurrentTimeMillis(timeProvider.currentTimeMillis() + 21000); + account.addResult(true); + rank = account.getRank(); + System.out.println(rank); + assertEquals(0.666, rank, 0.001); // it would be 0.5 without the override + } + + @Test + public void testClearBuckets() { + // Rationale: To ensure that the clear buckets works as expected + MockTimeProvider timeProvider = new MockTimeProvider(System.currentTimeMillis()); + RankedStorageAccount account = new RankedStorageAccount("testAccount", 5, 10, timeProvider); + + account.addResult(false); + account.addResult(false); + + double rank = account.getRank(); + System.out.println(rank); + assertEquals(0.0, rank, 0.001); + + timeProvider.setCurrentTimeMillis(timeProvider.currentTimeMillis() + 51000); + account.addResult(true); + rank = account.getRank(); + System.out.println(rank); + assertEquals(1.0, rank, 0.001); // it would be 0.5 without the override + } +}