Skip to content

Commit 1fdf6c2

Browse files
authored
#2304 Utilize ConcurrentQueue{T} for managing service versions during polling by the PollKube discovery provider (#2335)
* CA1816: Call GC.SuppressFinalize correctly * Enable testing for the PollKube provider * Use ConcurrentQueue<T> to handle service versions while polling * PollKube unit tests * PollKube acceptance testing
1 parent 9127387 commit 1fdf6c2

File tree

5 files changed

+200
-73
lines changed

5 files changed

+200
-73
lines changed

src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provide
3131
return new WatchKube(configuration, factory, kubeClient, serviceBuilder, Scheduler.Default);
3232
}
3333

34-
var defaultK8sProvider = new Kube(configuration, factory, kubeClient, serviceBuilder);
34+
var kubeProvider = new Kube(configuration, factory, kubeClient, serviceBuilder);
3535

3636
return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)
37-
? new PollKube(config.PollingInterval, factory, defaultK8sProvider)
38-
: defaultK8sProvider;
37+
? new PollKube(config.PollingInterval, factory, kubeProvider)
38+
: kubeProvider;
3939
}
4040
}
Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,30 @@
11
using Ocelot.Logging;
22
using Ocelot.Values;
3+
using System.Collections.Concurrent;
34

45
namespace Ocelot.Provider.Kubernetes;
56

67
public class PollKube : IServiceDiscoveryProvider, IDisposable
78
{
89
private readonly IOcelotLogger _logger;
9-
private readonly IServiceDiscoveryProvider _kubeServiceDiscoveryProvider;
10-
private readonly Timer _timer;
11-
10+
private readonly IServiceDiscoveryProvider _discoveryProvider;
11+
private readonly ConcurrentQueue<List<Service>> _queue = new();
12+
13+
private Timer _timer;
1214
private bool _polling;
13-
private List<Service> _services;
1415

15-
public PollKube(int pollingInterval, IOcelotLoggerFactory factory, IServiceDiscoveryProvider kubeServiceDiscoveryProvider)
16+
public PollKube(int pollingInterval, IOcelotLoggerFactory factory, IServiceDiscoveryProvider kubeProvider)
1617
{
1718
_logger = factory.CreateLogger<PollKube>();
18-
_kubeServiceDiscoveryProvider = kubeServiceDiscoveryProvider;
19-
_services = new List<Service>();
20-
21-
_timer = new Timer(OnTimerCallbackAsync, null, pollingInterval, pollingInterval);
19+
_discoveryProvider = kubeProvider;
20+
_timer = new(OnTimerCallbackAsync, null, pollingInterval, pollingInterval);
2221
}
2322

2423
private async void OnTimerCallbackAsync(object state)
2524
{
26-
if (_polling)
25+
// Avoid polling if already in progress due to a slow completion of the Poll task,
26+
// and ensure no more than three versions of services remain in the queue.
27+
if (_polling || _queue.Count > 3)
2728
{
2829
return;
2930
}
@@ -33,18 +34,54 @@ private async void OnTimerCallbackAsync(object state)
3334
_polling = false;
3435
}
3536

36-
public Task<List<Service>> GetAsync()
37+
public async Task<List<Service>> GetAsync()
3738
{
38-
return Task.FromResult(_services);
39+
// First cold request must call the provider
40+
if (_queue.IsEmpty)
41+
{
42+
return await Poll();
43+
}
44+
else if (_polling && _queue.TryPeek(out var oldVersion))
45+
{
46+
return oldVersion;
47+
}
48+
49+
// For services with multiple versions, remove outdated versions and retain only the latest one
50+
while (!_polling && _queue.Count > 1 && _queue.TryDequeue(out _))
51+
{
52+
}
53+
54+
_queue.TryPeek(out var latestVersion);
55+
return latestVersion;
3956
}
4057

41-
private async Task Poll()
58+
protected virtual async Task<List<Service>> Poll()
4259
{
43-
_services = await _kubeServiceDiscoveryProvider.GetAsync();
60+
_polling = true;
61+
try
62+
{
63+
var services = await _discoveryProvider.GetAsync();
64+
_queue.Enqueue(services);
65+
return services;
66+
}
67+
finally
68+
{
69+
_polling = false;
70+
}
4471
}
4572

4673
public void Dispose()
4774
{
48-
_timer.Dispose();
75+
Dispose(true);
76+
GC.SuppressFinalize(this);
77+
}
78+
79+
protected virtual void Dispose(bool disposing)
80+
{
81+
if (disposing)
82+
{
83+
_timer.Dispose();
84+
_timer = null;
85+
}
4986
}
5087
}

src/Ocelot/LoadBalancer/Balancers/RoundRobin.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public RoundRobin(Func<Task<List<Service>>> services, string serviceName)
3434

3535
public virtual async Task<Response<ServiceHostAndPort>> LeaseAsync(HttpContext httpContext)
3636
{
37-
var services = await _servicesDelegate.Invoke() ?? new List<Service>();
37+
var services = await _servicesDelegate.Invoke() ?? new(0);
3838
if (services.Count == 0)
3939
{
4040
return new ErrorResponse<ServiceHostAndPort>(new ServicesAreEmptyError($"There were no services in {Type} for '{_serviceName}' during {nameof(LeaseAsync)} operation!"));

test/Ocelot.AcceptanceTests/ServiceDiscovery/KubernetesServiceDiscoveryTests.cs

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using Ocelot.Configuration;
1010
using Ocelot.Configuration.File;
1111
using Ocelot.DependencyInjection;
12+
using Ocelot.Infrastructure.Extensions;
1213
using Ocelot.LoadBalancer.Balancers;
1314
using Ocelot.Logging;
1415
using Ocelot.Provider.Kubernetes;
@@ -40,7 +41,7 @@ public KubernetesServiceDiscoveryTests()
4041

4142
[Theory]
4243
[InlineData(nameof(Kube))]
43-
/* [InlineData(nameof(PollKube))] TODO Fails now. Bug 2304? -> https://github.com/ThreeMammals/Ocelot/issues/2304 */
44+
[InlineData(nameof(PollKube))] // Bug 2304 -> https://github.com/ThreeMammals/Ocelot/issues/2304
4445
[InlineData(nameof(WatchKube))]
4546
public void ShouldReturnServicesFromK8s(string discoveryType)
4647
{
@@ -108,27 +109,31 @@ public void ShouldReturnServicesByPortNameAsDownstreamScheme(string downstreamSc
108109

109110
[Theory]
110111
[Trait("Bug", "2110")]
111-
[InlineData(1, 30)]
112-
[InlineData(2, 50)]
113-
[InlineData(3, 50)]
114-
[InlineData(4, 50)]
115-
[InlineData(5, 50)]
116-
[InlineData(6, 99)]
117-
[InlineData(7, 99)]
118-
[InlineData(8, 99)]
119-
[InlineData(9, 999)]
120-
[InlineData(10, 999)]
121-
public void ShouldHighlyLoadOnStableKubeProvider_WithRoundRobinLoadBalancing(int totalServices, int totalRequests)
112+
[InlineData(1, 30, null)]
113+
[InlineData(2, 50, null)]
114+
[InlineData(3, 50, null)]
115+
[InlineData(4, 50, null)]
116+
[InlineData(5, 50, null)]
117+
[InlineData(6, 99, null)]
118+
[InlineData(7, 99, null)]
119+
[InlineData(8, 99, null)]
120+
[InlineData(9, 999, null)]
121+
[InlineData(10, 999, nameof(Kube))]
122+
[InlineData(10, 999, nameof(PollKube))]
123+
[InlineData(10, 999, nameof(WatchKube))]
124+
public void ShouldHighlyLoadOnStableKubeProvider_WithRoundRobinLoadBalancing(int totalServices, int totalRequests, string discoveryType)
122125
{
123126
// Skip in MacOS because the test is very unstable
124127
if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) // the test is stable in Linux and Windows only
125128
return;
126129

127-
const int ZeroGeneration = 0;
128-
var (endpoints, servicePorts) = GivenServiceDiscoveryAndLoadBalancing(totalServices);
130+
int zeroGeneration = 0, k8sCount = totalRequests;
131+
var (endpoints, servicePorts) = GivenServiceDiscoveryAndLoadBalancing(totalServices, discoveryType ?? nameof(Kube));
129132
GivenThereIsAFakeKubernetesProvider(endpoints); // stable, services will not be removed from the list
133+
if (discoveryType == nameof(WatchKube))
134+
k8sCount = GivenWatchReceivedEvent(); // 1
130135

131-
HighlyLoadOnKubeProviderAndRoundRobinBalancer(totalRequests, ZeroGeneration);
136+
HighlyLoadOnKubeProviderAndRoundRobinBalancer(totalRequests, zeroGeneration, k8sCount);
132137

133138
int bottom = totalRequests / totalServices,
134139
top = totalRequests - (bottom * totalServices) + bottom;
@@ -138,28 +143,37 @@ public void ShouldHighlyLoadOnStableKubeProvider_WithRoundRobinLoadBalancing(int
138143

139144
[Theory]
140145
[Trait("Bug", "2110")]
141-
[InlineData(5, 50, 1)]
142-
[InlineData(5, 50, 2)]
143-
[InlineData(5, 50, 3)]
144-
[InlineData(5, 50, 4)]
145-
public void ShouldHighlyLoadOnUnstableKubeProvider_WithRoundRobinLoadBalancing(int totalServices, int totalRequests, int k8sGeneration)
146+
[InlineData(5, 50, 1, null)]
147+
[InlineData(5, 50, 2, null)]
148+
[InlineData(5, 50, 3, null)]
149+
[InlineData(5, 50, 4, nameof(Kube))]
150+
[InlineData(5, 50, 4, nameof(PollKube))]
151+
[InlineData(5, 50, 4, nameof(WatchKube))]
152+
public void ShouldHighlyLoadOnUnstableKubeProvider_WithRoundRobinLoadBalancing(int totalServices, int totalRequests, int k8sGeneration, string discoveryType)
146153
{
147154
// Skip in MacOS because the test is very unstable
148155
if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) // the test is stable in Linux and Windows only
149156
return;
150157

151-
int failPerThreads = (totalRequests / k8sGeneration) - 1; // k8sGeneration means number of offline services
152-
var (endpoints, servicePorts) = GivenServiceDiscoveryAndLoadBalancing(totalServices);
158+
int failPerThreads = (totalRequests / k8sGeneration) - 1, // k8sGeneration means number of offline services
159+
k8sCount = totalRequests;
160+
var (endpoints, servicePorts) = GivenServiceDiscoveryAndLoadBalancing(totalServices, discoveryType ?? nameof(Kube));
153161
GivenThereIsAFakeKubernetesProvider(endpoints, false, k8sGeneration, failPerThreads); // false means unstable, k8sGeneration services will be removed from the list
162+
if (discoveryType == nameof(WatchKube))
163+
{
164+
k8sCount = GivenWatchReceivedEvent(); // 1
165+
k8sGeneration = 0;
166+
}
154167

155-
HighlyLoadOnKubeProviderAndRoundRobinBalancer(totalRequests, k8sGeneration);
168+
HighlyLoadOnKubeProviderAndRoundRobinBalancer(totalRequests, k8sGeneration, k8sCount);
156169

157170
ThenAllServicesCalledOptimisticAmountOfTimes(_roundRobinAnalyzer); // with unstable checkings
158171
ThenServiceCountersShouldMatchLeasingCounters(_roundRobinAnalyzer, servicePorts, totalRequests);
159172
}
160173

161174
[Theory]
162175
[InlineData(nameof(Kube))]
176+
[InlineData(nameof(PollKube))] // Bug 2304 -> https://github.com/ThreeMammals/Ocelot/issues/2304
163177
[InlineData(nameof(WatchKube))]
164178
[Trait("Feat", "2256")]
165179
public void ShouldReturnServicesFromK8s_AddKubernetesWithNullConfigureOptions(string discoveryType)
@@ -238,7 +252,7 @@ public void ShouldReturnServicesFromK8s_OneWatchRequestUpdatesServicesInfo()
238252
[Trait("Feat", "2319")]
239253
[Trait("PR", "2324")] // https://github.com/ThreeMammals/Ocelot/pull/2324
240254
[InlineData(nameof(Kube))]
241-
/* [InlineData(nameof(PollKube))] // Bug 2304 -> https://github.com/ThreeMammals/Ocelot/issues/2304 */
255+
[InlineData(nameof(PollKube))] // Bug 2304 -> https://github.com/ThreeMammals/Ocelot/issues/2304
242256
[InlineData(nameof(WatchKube))]
243257
public void ShouldApplyGlobalLoadBalancerOptions_ForAllDynamicRoutes(string discoveryType)
244258
{
@@ -260,7 +274,11 @@ static void ConfigureDynamicRouting(FileConfiguration configuration)
260274
var upstreamPath = $"/{ServiceNamespace()}.{ServiceName()}/";
261275
WhenIGetUrlOnTheApiGatewayConcurrently(upstreamPath, 50);
262276

263-
_k8sCounter.ShouldBe(discoveryType == nameof(WatchKube) ? 1 : 50);
277+
if (discoveryType == nameof(PollKube))
278+
_k8sCounter.ShouldBeGreaterThanOrEqualTo(50); // can be 50, 51 and sometimes 52
279+
else
280+
_k8sCounter.ShouldBe(discoveryType == nameof(WatchKube) ? 1 : 50);
281+
264282
_k8sServiceGeneration.ShouldBe(0);
265283
ThenAllStatusCodesShouldBe(HttpStatusCode.OK);
266284
ThenAllServicesShouldHaveBeenCalledTimes(50);
@@ -293,21 +311,21 @@ private void AddKubernetesWithNullConfigureOptions(IServiceCollection services)
293311
downstreams.ForEach(ds => GivenSubsetAddress(ds, subset));
294312
var endpoints = GivenEndpoints(subset, serviceName); // totalServices service instances with different ports
295313
var route = GivenRouteWithServiceName(serviceName, loadBalancerType); // !!!
296-
var configuration = GivenKubeConfiguration(route, discoveryType);
314+
var configuration = GivenKubeConfiguration(route, discoveryType.IfEmpty(nameof(Kube)));
297315
configure?.Invoke(configuration);
298316
GivenMultipleServiceInstancesAreRunning(downstreamUrls, downstreamResponses);
299317
GivenThereIsAConfiguration(configuration);
300318
GivenOcelotIsRunning(services ?? WithKubernetesAndRoundRobin);
301319
return (endpoints, servicePorts);
302320
}
303321

304-
private void HighlyLoadOnKubeProviderAndRoundRobinBalancer(int totalRequests, int k8sGenerationNo)
322+
private void HighlyLoadOnKubeProviderAndRoundRobinBalancer(int totalRequests, int k8sGenerationNo, int? k8sCount = null)
305323
{
306324
// Act
307325
WhenIGetUrlOnTheApiGatewayConcurrently("/", totalRequests); // load by X parallel requests
308326

309327
// Assert
310-
_k8sCounter.ShouldBeGreaterThanOrEqualTo(totalRequests); // integration endpoint called times
328+
_k8sCounter.ShouldBeGreaterThanOrEqualTo(k8sCount ?? totalRequests); // integration endpoint called times
311329
_k8sServiceGeneration.ShouldBe(k8sGenerationNo);
312330
ThenAllStatusCodesShouldBe(HttpStatusCode.OK);
313331
ThenAllServicesShouldHaveBeenCalledTimes(totalRequests);
@@ -373,7 +391,7 @@ private FileConfiguration GivenKubeConfiguration(FileRoute route, string type, s
373391
Host = u.Host,
374392
Port = u.Port,
375393
Type = type,
376-
PollingInterval = 0,
394+
PollingInterval = 3, // 3ms is very fast polling, make sense for PollKube provider only
377395
Namespace = ServiceNamespace(),
378396
Token = token ?? "Test",
379397
};
@@ -442,7 +460,7 @@ private void GivenThereIsAFakeKubernetesProvider(ResourceEventV1<EndpointsV1>[]
442460
handler.GivenThereIsAServiceRunningOn(_kubernetesUrl, (c) => GivenHandleWatchRequest(c, events, namespaces, serviceName));
443461
}
444462

445-
private void GivenWatchReceivedEvent() => _k8sWatchResetEvent.Set();
463+
private int GivenWatchReceivedEvent() => _k8sWatchResetEvent.Set() ? 1 : 0;
446464
private static Task GivenDelay(int milliseconds) => Task.Delay(TimeSpan.FromMilliseconds(milliseconds));
447465

448466
private async Task GivenHandleWatchRequest(HttpContext context,

0 commit comments

Comments
 (0)