Skip to content

Commit

Permalink
Add fields to QueueSpecification
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Jul 18, 2024
1 parent d65966b commit 6a17666
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 5 deletions.
2 changes: 1 addition & 1 deletion RabbitMQ.AMQP.Client/IConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ public interface IStreamOptions
IConsumerBuilder Builder();
}


}
38 changes: 38 additions & 0 deletions RabbitMQ.AMQP.Client/IEntities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ public interface IEntityDeclaration
Task Declare();
}

public enum OverFlowStrategy
{
DropHead,
RejectPublish,
RejectPublishDlx
// DROP_HEAD("drop-head"),
// REJECT_PUBLISH("reject-publish"),
// REJECT_PUBLISH_DLX("reject-publish-dlx");
}




public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>
{
IQueueSpecification Name(string name);
Expand All @@ -38,6 +51,31 @@ public interface IQueueSpecification : IEntityInfoDeclaration<IQueueInfo>
IQueueSpecification Type(QueueType type);
public QueueType Type();


IQueueSpecification DeadLetterExchange(string dlx);

IQueueSpecification DeadLetterRoutingKey(string dlrk);

IQueueSpecification OverflowStrategy(OverFlowStrategy overflow);

IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes);

IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer);


IQueueSpecification Expires(TimeSpan expiration);



IQueueSpecification MaxLength(long maxLength);


IQueueSpecification MessageTtl(TimeSpan ttl);





// IQuorumQueueSpecification Quorum();
}

Expand Down
61 changes: 61 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpQueueSpecification.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public uint ConsumerCount()
/// <param name="management"></param>
public class AmqpQueueSpecification(AmqpManagement management) : IQueueSpecification
{
private static TimeSpan s_tenYears = TimeSpan.FromDays(365 * 10);

private string? _name;
private bool _exclusive = false;
private bool _autoDelete = false;
Expand Down Expand Up @@ -175,6 +177,65 @@ public QueueType Type()
return (QueueType)Enum.Parse(typeof(QueueType), type.ToUpperInvariant());
}

public IQueueSpecification DeadLetterExchange(string dlx)
{
_arguments["x-dead-letter-exchange"] = dlx;
return this;
}

public IQueueSpecification DeadLetterRoutingKey(string dlrk)
{
_arguments["x-dead-letter-routing-key"] = dlrk;
return this;
}

public IQueueSpecification OverflowStrategy(OverFlowStrategy overflow)
{
_arguments["x-overflow"] = overflow switch
{
OverFlowStrategy.DropHead => "drop-head",
OverFlowStrategy.RejectPublish => "reject-publish",
OverFlowStrategy.RejectPublishDlx => "reject-publish-dlx",
_ => throw new ArgumentOutOfRangeException(nameof(overflow), overflow, null)
};
return this;
}

public IQueueSpecification MaxLengthBytes(ByteCapacity maxLengthBytes)
{

Utils.ValidatePositive("Max length", maxLengthBytes.ToBytes());
_arguments["x-max-length-bytes"] = maxLengthBytes.ToBytes();
return this;
}

public IQueueSpecification SingleActiveConsumer(bool singleActiveConsumer)
{
_arguments["x-single-active-consumer"] = singleActiveConsumer;
return this;
}

public IQueueSpecification Expires(TimeSpan expiration)
{
Utils.ValidatePositive("Expiration", (long)expiration.TotalMilliseconds, (long)s_tenYears.TotalMilliseconds);
_arguments["x-expires"] = (long)expiration.TotalMilliseconds;
return this;
}

public IQueueSpecification MaxLength(long maxLength)
{
Utils.ValidatePositive("Max length", maxLength);
_arguments["x-max-length"] = maxLength;
return this;
}

public IQueueSpecification MessageTtl(TimeSpan ttl)
{
Utils.ValidateNonNegative("TTL", (long)ttl.TotalMilliseconds, (long)s_tenYears.TotalMilliseconds);
_arguments["x-message-ttl"] = (long)ttl.TotalMilliseconds;
return this;
}

public async Task<IQueueInfo> Declare()
{
if (Type() is QueueType.QUORUM or QueueType.STREAM)
Expand Down
47 changes: 46 additions & 1 deletion RabbitMQ.AMQP.Client/Impl/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,48 @@ private static string GenerateName(string prefix)
return resultError;
}

public static void ValidateNonNegative(string label, long value, long max)
{
if (value < 0)
{
throw new ArgumentException($"'{label}' must be greater than or equal to 0");
}

if (max > 0)
{
if (value > max)
{
throw new ArgumentException($"'{label}' must be lesser than {max}");
}
}
}


public static void ValidatePositive(string label, long value, long max)
{
if (value <= 0)
{
throw new ArgumentException($"'{label}' must be greater than or equal to 0");
}

if (max >= 0)
{
if (value > max)
{
throw new ArgumentException($"'{label}' must be lesser than {max}");
}
}
}

public static void ValidatePositive(string label, long value)
{
if (value <= 0)
{
throw new ArgumentException($"'{label}' must be greater than or equal to 0");
}
}


// switch (options.deliveryMode()) {
// case AT_MOST_ONCE:
// protonSender.setSenderSettleMode(SenderSettleMode.SETTLED);
Expand All @@ -69,7 +111,10 @@ public static Attach CreateAttach(string address,
// Role = true,
Target = new Target()
{
Address = address, ExpiryPolicy = new Symbol("SESSION_END"), Dynamic = false, Durable = 0,
Address = address,
ExpiryPolicy = new Symbol("SESSION_END"),
Dynamic = false,
Durable = 0,
},
Source = new Source()
{
Expand Down
2 changes: 1 addition & 1 deletion Tests/ConsumerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public async Task ConsumerWithStreamFilterShouldReceiveOnlyPartOfTheMessages(str
await connection.CloseAsync();
}


/// <summary>
/// Test the offset value for the stream queue
/// </summary>
Expand Down
57 changes: 55 additions & 2 deletions Tests/ManagementTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,61 @@ public async Task DeclareAndDeleteTwoTimesShouldNotRaiseErrors()
await connection.CloseAsync();
}


[Fact]
public async Task DeclareQueueWithDifferentArguments()
{
var connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build());
var management = connection.Management();

IQueueInfo q = await management.Queue().Name("DeclareQueueWithDifferentArguments")
.DeadLetterExchange("my_exchange")
.DeadLetterRoutingKey("my_key").OverflowStrategy(OverFlowStrategy.DropHead)
.MaxLengthBytes(ByteCapacity.Gb(1)).MaxLength(50000).MessageTtl(TimeSpan.FromSeconds(10))
.Expires(TimeSpan.FromSeconds(2)).SingleActiveConsumer(true).Declare();

Assert.Equal("DeclareQueueWithDifferentArguments", q.Name());
Assert.Equal("my_exchange", q.Arguments()["x-dead-letter-exchange"]);
Assert.Equal("my_key", q.Arguments()["x-dead-letter-routing-key"]);
Assert.Equal("drop-head", q.Arguments()["x-overflow"]);
Assert.Equal(50000L, q.Arguments()["x-max-length"]);
Assert.Equal(1000000000L, q.Arguments()["x-max-length-bytes"]);
Assert.Equal(10000L, q.Arguments()["x-message-ttl"]);
Assert.Equal(2000L, q.Arguments()["x-expires"]);
Assert.Equal(true, q.Arguments()["x-single-active-consumer"]);

await management.QueueDeletion().Delete("DeclareQueueWithDifferentArguments");
await connection.CloseAsync();
}


[Fact]
public async Task ValidateDeclareQueueArguments()
{
IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build());
IManagement management = connection.Management();

await Assert.ThrowsAsync<ArgumentException>(() =>
management.Queue().Name("ValidateDeclareQueueWithDifferentArguments").MessageTtl(TimeSpan.FromSeconds(-1))
.Declare());


await Assert.ThrowsAsync<ArgumentException>(() =>
management.Queue().Name("ValidateDeclareQueueWithDifferentArguments").Expires(TimeSpan.FromSeconds(0))
.Declare());

await Assert.ThrowsAsync<ArgumentException>(() =>
management.Queue().Name("ValidateDeclareQueueWithDifferentArguments").MaxLengthBytes(ByteCapacity.Gb(-1))
.Declare());


await Assert.ThrowsAsync<ArgumentException>(() =>
management.Queue().Name("ValidateDeclareQueueWithDifferentArguments").MaxLength(-1).Declare());

await connection.CloseAsync();
}


////////////// ----------------- Exchanges TESTS ----------------- //////////////


Expand Down Expand Up @@ -258,8 +313,6 @@ await management.Exchange("my_exchange_raise_precondition_fail").AutoDelete(fals
}




////////////// ----------------- Topology TESTS ----------------- //////////////

/// <summary>
Expand Down

0 comments on commit 6a17666

Please sign in to comment.