Skip to content

Commit d3f650a

Browse files
committed
Add test demonstrating that #1573 is fixed
Fixes #1573
1 parent f8087b6 commit d3f650a

File tree

1 file changed

+114
-0
lines changed

1 file changed

+114
-0
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Collections.Generic;
34+
using System.Threading.Tasks;
35+
using RabbitMQ.Client;
36+
using RabbitMQ.Client.Events;
37+
using Xunit;
38+
using Xunit.Abstractions;
39+
40+
namespace Test.SequentialIntegration
41+
{
42+
public class TestConnectionBlockedChannelLeak : SequentialIntegrationFixture
43+
{
44+
public TestConnectionBlockedChannelLeak(ITestOutputHelper output) : base(output)
45+
{
46+
}
47+
48+
public override async Task InitializeAsync()
49+
{
50+
await UnblockAsync();
51+
_connFactory = new ConnectionFactory
52+
{
53+
AutomaticRecoveryEnabled = true,
54+
ClientProvidedName = _testDisplayName,
55+
ContinuationTimeout = TimeSpan.FromSeconds(2)
56+
};
57+
_conn = await _connFactory.CreateConnectionAsync();
58+
_channel = await _conn.CreateChannelAsync();
59+
}
60+
61+
public override async Task DisposeAsync()
62+
{
63+
await UnblockAsync();
64+
await base.DisposeAsync();
65+
}
66+
67+
[Fact]
68+
public async Task TestConnectionBlockedChannelLeak_GH1573()
69+
{
70+
string exchangeName = GenerateExchangeName();
71+
72+
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
73+
_conn.ConnectionBlocked += (object sender, ConnectionBlockedEventArgs args) =>
74+
{
75+
UnblockAsync();
76+
};
77+
78+
_conn.ConnectionUnblocked += (object sender, EventArgs ea) =>
79+
{
80+
tcs.SetResult(true);
81+
};
82+
83+
await BlockAsync(_channel);
84+
85+
using (IChannel publishChannel = await _conn.CreateChannelAsync())
86+
{
87+
await publishChannel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true);
88+
await publishChannel.BasicPublishAsync(exchangeName, exchangeName, GetRandomBody(), mandatory: true);
89+
await publishChannel.CloseAsync();
90+
}
91+
92+
var channels = new List<IChannel>();
93+
for (int i = 1; i <= 5; i++)
94+
{
95+
IChannel c = await _conn.CreateChannelAsync();
96+
channels.Add(c);
97+
}
98+
99+
await Task.Delay(TimeSpan.FromSeconds(5));
100+
101+
var rmq = new RabbitMQCtl(_output);
102+
string output = await rmq.ExecRabbitMQCtlAsync("list_channels");
103+
_output.WriteLine("CHANNELS 0: {0}", output);
104+
105+
await UnblockAsync();
106+
107+
output = await rmq.ExecRabbitMQCtlAsync("list_channels");
108+
_output.WriteLine("CHANNELS 1: {0}", output);
109+
110+
await tcs.Task.WaitAsync(WaitSpan);
111+
Assert.True(await tcs.Task, "Unblock notification not received.");
112+
}
113+
}
114+
}

0 commit comments

Comments
 (0)