Skip to content

Commit

Permalink
Benchmark to measure ShardingCoordinator.CoordinatorState serializa…
Browse files Browse the repository at this point in the history
…tion over DData (#7428)

* created benchmark to measure `ShardingCoordinator.CoordinatorState` serialization

Over DData

* scale regions with shards

* added benchmarks for more of the high-level replicator messages

* added API approvals

* fixed coordinator state calculation

* fix index calculation
  • Loading branch information
Aaronontheweb authored Dec 20, 2024
1 parent 0cf567a commit f2ec912
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// -----------------------------------------------------------------------
// <copyright file="DDataShardCoordinatorStateSerializationBenchmarks.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Akka.Actor;
using Akka.Benchmarks.Configurations;
using Akka.Cluster.Sharding;
using Akka.Actor.Dsl;
using Akka.Cluster.Sharding.Serialization.Proto.Msg;
using Akka.Cluster.Tests;
using Akka.Configuration;
using Akka.DistributedData;
using Akka.DistributedData.Internal;
using Akka.DistributedData.Serialization;
using Akka.Util;
using BenchmarkDotNet.Attributes;

namespace Akka.Cluster.Benchmarks.Serialization;

[Config(typeof(MicroBenchmarkConfig))]
public class DDataShardCoordinatorStateSerializationBenchmarks
{
private static readonly Config BaseConfig = ConfigurationFactory.ParseString("""
akka.actor.provider=cluster
akka.remote.dot-netty.tcp.port = 0
""").WithFallback(ClusterSharding.DefaultConfig());
private ExtendedActorSystem _system;
private ReplicatedDataSerializer _replicatedDataSerializer;
private ReplicatorMessageSerializer _replicatorMessageSerializer;

private static readonly Member FakeNode = TestMember.Create(new Address("akka.tcp", "sys", "b", 2552), MemberStatus.Up,
ImmutableHashSet.Create("r1"), appVersion: AppVersion.Create("1.1.0"));

[Params(1, 20, 100, 1000)]
public int ShardCount { get; set; }


public int RegionCount => Math.Max(ShardCount / 10, 1);

// Used to represent shards and regions
private IActorRef[] _placeHolders;

private readonly LWWRegisterKey<ShardCoordinator.CoordinatorState> _coordinatorStateKey = new("CoordinatorState");

private LWWRegister<ShardCoordinator.CoordinatorState> _coordinatorState;
private byte[] _serializedCoordinatorState;
private string _lwwRegisterManifest = string.Empty;

private DataEnvelope _dataEnvelope;
private byte[] _serializedDataEnvelope;
private string _dataEnvelopeManifest = string.Empty;

private Write _dataWrite;
private byte[] _serializedDataWrite;
private string _dataWriteManifest = string.Empty;


private static ShardCoordinator.CoordinatorState ComputedState(IActorRef[] placeholders, int shardCount,
int regionCount)
{
var shards = Enumerable.Range(0, shardCount)
.Select(i => new KeyValuePair<string, IActorRef>($"shard-{i}", placeholders[0]))
.ToImmutableDictionary( );

// evenly allocate shards to regions
var shardsPerRegionCount = shardCount / regionCount; // region count can't be zero
var shardsPerRegionBuilder = ImmutableDictionary.CreateBuilder<IActorRef, IImmutableList<string>>();
var i = 0;
foreach (var chunk in shards
.Chunk(shardsPerRegionCount))
{
var region = placeholders[i];
shardsPerRegionBuilder.Add(region, chunk.Select(kv => kv.Key).ToImmutableList());
i++;
}

var regionProxies = ImmutableHashSet<IActorRef>.Empty;
var unallocatedShards = ImmutableHashSet<string>.Empty;

return new ShardCoordinator.CoordinatorState(shards, shardsPerRegionBuilder.ToImmutable(), regionProxies, unallocatedShards);
}

[GlobalSetup]
public void Setup()
{
_system ??= (ExtendedActorSystem)ActorSystem.Create("system", BaseConfig);
_placeHolders ??= Enumerable.Range(0, RegionCount).Select(c => _system.ActorOf(ctx => { })).ToArray();
_replicatedDataSerializer = new ReplicatedDataSerializer(_system);
_replicatorMessageSerializer = new ReplicatorMessageSerializer(_system);
_coordinatorState = new LWWRegister<ShardCoordinator.CoordinatorState>(FakeNode.UniqueAddress,
ComputedState(_placeHolders, ShardCount, RegionCount));

// LWWRegister

_serializedCoordinatorState = _replicatedDataSerializer.ToBinary(_coordinatorState);
_lwwRegisterManifest = _replicatedDataSerializer.Manifest(_coordinatorState);

// DataEnvelope
_dataEnvelope = new DataEnvelope(_coordinatorState);
_serializedDataEnvelope = _replicatorMessageSerializer.ToBinary(_dataEnvelope);
_dataEnvelopeManifest = _replicatorMessageSerializer.Manifest(_dataEnvelope);

// Write
_dataWrite = new Write(_coordinatorStateKey.Id, _dataEnvelope, FakeNode.UniqueAddress);
_serializedDataWrite = _replicatorMessageSerializer.ToBinary(_dataWrite);
_dataWriteManifest = _replicatorMessageSerializer.Manifest(_dataWrite);
}

/// <summary>
/// Serialize the raw LWWRegister payload
/// </summary>
[Benchmark]
public byte[] SerializeCoordinatorState()
{
return _replicatedDataSerializer.ToBinary(_coordinatorState);
}

[Benchmark]
public object DeserializeCoordinatorState()
{
return _replicatedDataSerializer.FromBinary(_serializedCoordinatorState, _lwwRegisterManifest);
}

[Benchmark]
public byte[] SerializeDataEnvelope()
{
return _replicatorMessageSerializer.ToBinary(_dataEnvelope);
}

[Benchmark]
public object DeserializeDataEnvelope()
{
return _replicatorMessageSerializer.FromBinary(_serializedDataEnvelope, _dataEnvelopeManifest);
}

[Benchmark]
public byte[] SerializeDataWrite()
{
return _replicatorMessageSerializer.ToBinary(_dataWrite);
}

[Benchmark]
public object DeserializeDataWrite()
{
return _replicatorMessageSerializer.FromBinary(_serializedDataWrite, _dataWriteManifest);
}

[GlobalCleanup]
public void Cleanup()
{
_system.Dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests")]
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests.MultiNode")]
[assembly: InternalsVisibleTo("Akka.DistributedData.Tests")]
[assembly: InternalsVisibleTo("Akka.Cluster.Benchmarks")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding")]
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests.MultiNode")]
[assembly: InternalsVisibleTo("Akka.Cluster.Sharding.Tests")]
[assembly: InternalsVisibleTo("Akka.Cluster.Benchmarks")]
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Benchmarks")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests.MultiNode")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.DistributedData.Tests")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Benchmarks")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests.MultiNode")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.DistributedData.Tests")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Benchmarks")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests.MultiNode")]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[assembly: System.Reflection.AssemblyMetadataAttribute("RepositoryUrl", "https://github.com/akkadotnet/akka.net")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Benchmarks")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests")]
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Cluster.Sharding.Tests.MultiNode")]
Expand Down

0 comments on commit f2ec912

Please sign in to comment.