Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a sparse outstanding index for the dynamo db outbox [v9] #3295

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using Amazon;
using Amazon.Runtime;

Expand Down
19 changes: 9 additions & 10 deletions src/Paramore.Brighter.Outbox.DynamoDB/DynamoDbOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ await _context.SaveAsync(
message,
_dynamoOverwriteTableConfig,
cancellationToken);
}
}

public async Task MarkDispatchedAsync(IEnumerable<Guid> ids, DateTime? dispatchedAt = null, Dictionary<string, object> args = null,
CancellationToken cancellationToken = default)
Expand All @@ -296,13 +296,16 @@ public void MarkDispatched(Guid id, DateTime? dispatchedAt = null, Dictionary<st
message,
_dynamoOverwriteTableConfig)
.Wait(_configuration.Timeout);

}

private static void MarkMessageDispatched(DateTime? dispatchedAt, MessageItem message)
{
message.DeliveryTime = dispatchedAt.Value.Ticks;
message.DeliveredAt = $"{dispatchedAt:yyyy-MM-dd}";

// Set the outstanding created time to null to remove the attribute
// from the item in dynamo
message.OutstandingCreatedTime = null;
}

/// <summary>
Expand Down Expand Up @@ -386,7 +389,7 @@ private Task<TransactWriteItemsRequest> AddToTransactionWrite(MessageItem messag

private async Task<Message> GetMessage(Guid id, CancellationToken cancellationToken = default)
{
MessageItem messageItem = await _context.LoadAsync<MessageItem>(id.ToString(), _dynamoOverwriteTableConfig, cancellationToken);
var messageItem = await _context.LoadAsync<MessageItem>(id.ToString(), _dynamoOverwriteTableConfig, cancellationToken);
return messageItem?.ConvertToMessage() ?? new Message();
}

Expand Down Expand Up @@ -460,9 +463,9 @@ private async Task<IEnumerable<Message>> OutstandingMessagesForAllTopicsAsync(do
private async Task<IEnumerable<Message>> OutstandingMessagesForTopicAsync(double millisecondsDispatchedSince,
string topicName, CancellationToken cancellationToken)
{
var olrderThan = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));
var olderThan = DateTime.UtcNow.Subtract(TimeSpan.FromMilliseconds(millisecondsDispatchedSince));

var messages = (await QueryAllOutstandingShardsAsync(topicName, olrderThan, cancellationToken)).ToList();
var messages = (await QueryAllOutstandingShardsAsync(topicName, olderThan, cancellationToken)).ToList();
return messages.Select(msg => msg.ConvertToMessage());
}

Expand All @@ -485,14 +488,10 @@ private async Task<IEnumerable<MessageItem>> QueryAllOutstandingShardsAsync(stri

for (int shard = 0; shard < _configuration.NumberOfShards; shard++)
{
// We get all the messages for topic, added within a time range
// There should be few enough of those that we can efficiently filter for those
// that don't have a delivery date.
var queryConfig = new QueryOperationConfig
{
IndexName = _configuration.OutstandingIndexName,
KeyExpression = new KeyTopicCreatedTimeExpression().Generate(topic, minimumAge, shard),
FilterExpression = new NoDispatchTimeExpression().Generate(),
KeyExpression = new KeyTopicOutstandingCreatedTimeExpression().Generate(topic, minimumAge, shard),
ConsistentRead = false
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
using System;
using System;
using System.Collections.Generic;
using Amazon.DynamoDBv2.DocumentModel;

namespace Paramore.Brighter.Outbox.DynamoDB
{
internal class KeyTopicCreatedTimeExpression
internal class KeyTopicOutstandingCreatedTimeExpression
{
private readonly Expression _expression;

public KeyTopicCreatedTimeExpression()
public KeyTopicOutstandingCreatedTimeExpression()
{
_expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and CreatedTime < :v_CreatedTime" };
_expression = new Expression { ExpressionStatement = "TopicShard = :v_TopicShard and OutstandingCreatedTime < :v_OutstandingCreatedTime" };
}

public override string ToString()
Expand All @@ -22,7 +22,7 @@ public Expression Generate(string topicName, DateTime createdTime, int shard)
{
var values = new Dictionary<string, DynamoDBEntry>();
values.Add(":v_TopicShard", $"{topicName}_{shard}");
values.Add(":v_CreatedTime", createdTime.Ticks);
values.Add(":v_OutstandingCreatedTime", createdTime.Ticks);

_expression.ExpressionAttributeValues = values;

Expand Down
26 changes: 14 additions & 12 deletions src/Paramore.Brighter.Outbox.DynamoDB/MessageItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,16 @@ public class MessageItem
/// <summary>
/// The time at which the message was created, in ticks
/// </summary>
[DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")]
[DynamoDBProperty]
public long CreatedTime { get; set; }

/// <summary>
/// The time at which the message was created, in ticks. Null if the message has been dispatched.
/// </summary>
[DynamoDBGlobalSecondaryIndexRangeKey(indexName: "Outstanding")]
[DynamoDBProperty]
public long? OutstandingCreatedTime { get; set; }

/// <summary>
/// The time at which the message was delivered, formatted as a string yyyy-MM-dd
/// </summary>
Expand Down Expand Up @@ -122,6 +128,7 @@ public MessageItem(Message message, int shard = 0, long? expiresAt = null)
CharacterEncoding = message.Body.CharacterEncoding.ToString();
CreatedAt = $"{date}";
CreatedTime = date.Ticks;
OutstandingCreatedTime = date.Ticks;
DeliveryTime = 0;
HeaderBag = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
MessageId = message.Id.ToString();
Expand Down Expand Up @@ -163,42 +170,37 @@ public Message ConvertToMessage()

return new Message(header, body);
}

public void MarkMessageDelivered(DateTime deliveredAt)
{
DeliveryTime = deliveredAt.Ticks;
DeliveredAt = $"{deliveredAt:yyyy-MM-dd}";
}
}

public class MessageItemBodyConverter : IPropertyConverter
{
public DynamoDBEntry ToEntry(object value)
{
byte[] body = value as byte[];
if (body == null) throw new ArgumentOutOfRangeException("Expected the body to be a byte array");
if (body == null)
throw new ArgumentOutOfRangeException("Expected the body to be a byte array");

DynamoDBEntry entry = new Primitive
{
Value = body,
Type = DynamoDBEntryType.Binary

};

return entry;
}

public object FromEntry(DynamoDBEntry entry)
{
byte[] data = Array.Empty<byte>();
Primitive primitive = entry as Primitive;
Primitive primitive = entry as Primitive;
if (primitive?.Value is byte[] bytes)
data = bytes;
if (primitive?.Value is string text) //for historical data that used UTF-8 strings
data = Encoding.UTF8.GetBytes(text);
if (primitive == null || !(primitive.Value is string || primitive.Value is byte[]))
throw new ArgumentOutOfRangeException("Expected Dynamo to have stored a byte array");

return data;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,54 @@ public async Task When_there_are_outstanding_messages_for_multiple_topics()
}
}

[Fact]
public async Task When_an_outstanding_message_is_dispatched_async()
{
await _dynamoDbOutbox.AddAsync(_message);

await Task.Delay(1000);

var args = new Dictionary<string, object> { { "Topic", "test_topic" } };

var messages = await _dynamoDbOutbox.OutstandingMessagesAsync(0, 100, 1, args);

//Other tests may leave messages, so make sure that we grab ours
var message = messages.Single(m => m.Id == _message.Id);
message.Should().NotBeNull();

await _dynamoDbOutbox.MarkDispatchedAsync(_message.Id);

// Give the GSI a second to catch up
await Task.Delay(1000);

messages = await _dynamoDbOutbox.OutstandingMessagesAsync(0, 100, 1, args);
messages.All(m => m.Id != _message.Id);
}

[Fact]
public async Task When_an_outstanding_message_is_dispatched()
{
_dynamoDbOutbox.Add(_message);

await Task.Delay(1000);

var args = new Dictionary<string, object> { { "Topic", "test_topic" } };

var messages = _dynamoDbOutbox.OutstandingMessages(0, 100, 1, args);

//Other tests may leave messages, so make sure that we grab ours
var message = messages.Single(m => m.Id == _message.Id);
message.Should().NotBeNull();

_dynamoDbOutbox.MarkDispatched(_message.Id);

// Give the GSI a second to catch up
await Task.Delay(1000);

messages = _dynamoDbOutbox.OutstandingMessages(0, 100, 1, args);
messages.All(m => m.Id != _message.Id);
}

private Message CreateMessage(string topic)
{
return new Message(
Expand Down
Loading