Skip to content

Commit

Permalink
Reuse the same client rather than creating on each request
Browse files Browse the repository at this point in the history
  • Loading branch information
smfields committed Dec 17, 2023
1 parent e372d58 commit bc4f44d
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Orleans.Configuration;
using Orleans.EventSourcing.EventStorage;
using Orleans.EventSourcing.EventStorage.EventStore;
using Orleans.Runtime;

// ReSharper disable once CheckNamespace
namespace Orleans.Hosting;
Expand Down Expand Up @@ -93,6 +94,7 @@ public static ISiloBuilder AddEventStoreEventStorage(
}
services.AddKeyedSingleton(name, EventStoreStorageFactory.Create);
services.AddSingleton<ILifecycleParticipant<ISiloLifecycle>>(s => (ILifecycleParticipant<ISiloLifecycle>)s.GetRequiredKeyedService<IEventStorage>(name));
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace Orleans.EventSourcing.EventStorage.EventStore.Options;

/// <summary>
/// Exception for throwing from Redis event storage.
/// </summary>
[GenerateSerializer]
public class EventStoreEventStorageException : Exception
{
/// <summary>
/// Initializes a new instance of <see cref="EventStoreEventStorageException"/>.
/// </summary>
public EventStoreEventStorageException()
{
}

/// <summary>
/// Initializes a new instance of <see cref="EventStoreEventStorageException"/>.
/// </summary>
/// <param name="message">The error message that explains the reason for the exception.</param>
public EventStoreEventStorageException(string message) : base(message)
{
}

/// <summary>
/// Initializes a new instance of <see cref="EventStoreEventStorageException"/>.
/// </summary>
/// <param name="message">The error message that explains the reason for the exception.</param>
/// <param name="inner">The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified.</param>
public EventStoreEventStorageException(string message, Exception inner) : base(message, inner)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,29 @@ namespace Orleans.Configuration;

public class EventStoreOptions : IStorageProviderSerializerOptions
{
/// <summary>
/// Settings used to create the <see cref="EventStoreClient"/>
/// </summary>
public EventStoreClientSettings ClientSettings { get; set; } = null!;

/// <summary>
/// Stage of silo lifecycle where storage should be initialized. Storage must be initialized prior to use.
/// </summary>
public int InitStage { get; set; } = ServiceLifecycleStage.ApplicationServices;

/// <inheritdoc/>
public IGrainStorageSerializer? GrainStorageSerializer { get; set; }

/// <summary>
/// The delegate used to create the <see cref="EventStoreClient"/>
/// </summary>
public Func<EventStoreClientSettings, Task<EventStoreClient>> CreateClient { get; set; } = DefaultCreateClient;

/// <summary>
/// Default delegate used to create the <see cref="EventStoreClient"/>
/// </summary>
/// <param name="settings">EventStoreClientSettings</param>
/// <returns>The <see cref="EventStoreClient"/></returns>
public static Task<EventStoreClient> DefaultCreateClient(EventStoreClientSettings settings) =>
Task.FromResult(new EventStoreClient(settings));
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ public async Task Events_can_be_stored_and_retrieved()
Assert.That(eventList.First().Data, Is.EqualTo(sampleEvent));
}

[Test]
public async Task Many_events_can_be_added_sequentially()
{
const int numEvents = 1_000;
var grainId = GenerateGrainId();
var sampleEvents = Enumerable
.Range(0, numEvents)
.Select(i => new SampleEvent(i))
.ToList();

var expectedVersion = 0;
foreach (var sampleEvent in sampleEvents)
{
await EventStoreEventStorage.AppendEventsToStorage(grainId, [sampleEvent], expectedVersion++);
}

var eventStream = EventStoreEventStorage.ReadEventsFromStorage<SampleEvent>(grainId);
var eventList = await eventStream.ToListAsync();
Assert.That(eventList, Has.Count.EqualTo(numEvents));
}

[Test]
public async Task Retrieved_events_have_same_type_as_stored_events()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System.Diagnostics;
using EventStore.Client;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.EventSourcing.EventStorage.EventStore.Options;
using Orleans.Runtime;
using Orleans.Storage;

Expand All @@ -11,24 +13,28 @@ namespace Orleans.EventSourcing.EventStorage.EventStore;
/// Event storage provider that stores events using an EventStoreDB event stream.
/// </summary>
[DebuggerDisplay("EventStore:{" + nameof(_name) + "}")]
public class EventStoreEventStorage : IEventStorage
public class EventStoreEventStorage : IEventStorage, ILifecycleParticipant<ISiloLifecycle>
{
private readonly string _name;
private readonly string _serviceId;
private readonly EventStoreOptions _options;
private readonly ILogger<EventStoreEventStorage> _logger;
private readonly IGrainStorageSerializer _storageSerializer;
private EventStoreClient? _eventStoreClient;

public EventStoreEventStorage(
string name,
EventStoreOptions options,
ILogger<EventStoreEventStorage> logger,
IGrainStorageSerializer defaultStorageSerializer
IGrainStorageSerializer defaultStorageSerializer,
IOptions<ClusterOptions> clusterOptions
)
{
_name = name;
_options = options;
_logger = logger;
_storageSerializer = _options.GrainStorageSerializer ?? defaultStorageSerializer;
_serviceId = clusterOptions.Value.ServiceId;
}

/// <inheritdoc />
Expand All @@ -48,9 +54,7 @@ public async IAsyncEnumerable<EventRecord<TEvent>> ReadEventsFromStorage<TEvent>
throw new ArgumentOutOfRangeException(nameof(maxCount), "Max Count cannot be less than 0");
}

var client = new EventStoreClient(_options.ClientSettings);

var results = client.ReadStreamAsync(
var results = _eventStoreClient!.ReadStreamAsync(
Direction.Forwards,
grainId.ToString(),
revision: (ulong) version,
Expand Down Expand Up @@ -80,8 +84,6 @@ int expectedVersion
throw new ArgumentOutOfRangeException(nameof(expectedVersion), "Expected version cannot be less than 0");
}

var client = new EventStoreClient(_options.ClientSettings);

var eventsToAppend = events.Select(
x => new EventData(Uuid.NewUuid(), x.GetType().FullName!, _storageSerializer.Serialize(x))
);
Expand All @@ -90,7 +92,7 @@ int expectedVersion

try
{
await client.AppendToStreamAsync(
await _eventStoreClient!.AppendToStreamAsync(
grainId.ToString(),
expectedRevision: expectedRevision,
eventsToAppend
Expand All @@ -103,4 +105,60 @@ await client.AppendToStreamAsync(

return true;
}

public void Participate(ISiloLifecycle lifecycle)
{
var name = OptionFormattingUtilities.Name<EventStoreEventStorage>(_name);
lifecycle.Subscribe(name, _options.InitStage, Init, Close);
}

private async Task Init(CancellationToken cancellationToken)
{
var timer = Stopwatch.StartNew();

try
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug(
"EventStoreEventStorage {Name} is initializing: ServiceId={ServiceId}",
_name,
_serviceId
);
}

_eventStoreClient = await _options.CreateClient(_options.ClientSettings);

if (_logger.IsEnabled(LogLevel.Debug))
{
timer.Stop();
_logger.LogDebug(
"Init: Name={Name} ServiceId={ServiceId}, initialized in {ElapsedMilliseconds} ms",
_name,
_serviceId,
timer.Elapsed.TotalMilliseconds.ToString("0.00")
);
}
}
catch (Exception ex)
{
timer.Stop();
_logger.LogError(
ex,
"Init: Name={Name} ServiceId={ServiceId}, errored in {ElapsedMilliseconds} ms.",
_name,
_serviceId,
timer.Elapsed.TotalMilliseconds.ToString("0.00")
);

throw new EventStoreEventStorageException($"{ex.GetType()}: {ex.Message}");
}
}

private async Task Close(CancellationToken cancellationToken)
{
if (_eventStoreClient is null) return;

await _eventStoreClient.DisposeAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ public class EventStoreDbSetup
[OneTimeSetUp]
public async Task OneTimeSetUp()
{
var container = new EventStoreDbBuilder().Build();
var container = new EventStoreDbBuilder()
.WithImage("eventstore/eventstore:lts")
.Build();
await container.StartAsync();

ConnectionString = container.GetConnectionString();
Expand Down

0 comments on commit bc4f44d

Please sign in to comment.