-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathKubernetesApiServiceDiscovery.cs
310 lines (277 loc) · 12.7 KB
/
KubernetesApiServiceDiscovery.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
//-----------------------------------------------------------------------
// <copyright file="KubernetesApiServiceDiscovery.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using k8s;
using k8s.Authentication;
using k8s.Models;
using Newtonsoft.Json;
using Debug = System.Diagnostics.Debug;
#if !NET6_0_OR_GREATER
using Microsoft.Rest;
#else
using System.Runtime.Serialization;
using k8s.Autorest;
#endif
namespace Akka.Discovery.KubernetesApi
{
public class KubernetesApiServiceDiscovery : ServiceDiscovery
{
public sealed class KubernetesApiException : Exception
{
public KubernetesApiException(string message) : base(message)
{ }
public KubernetesApiException(string message, Exception innerException) : base(message, innerException)
{ }
}
internal const string DefaultPath = "kubernetes-api";
internal const string DefaultConfigPath = "akka.discovery." + DefaultPath;
internal static string FullPath(string path) => $"akka.discovery.{path}";
private readonly ILoggingAdapter _log;
private readonly KubernetesDiscoverySettings _settings;
private readonly Kubernetes? _client;
private readonly string? _host;
private string PodNamespace =>
_settings.PodNamespace
.DefaultIfNullOrWhitespace(ReadConfigVarFromFileSystem(_settings.PodNamespacePath, "pod-namespace"))
.DefaultIfNullOrWhitespace("default")!;
// Backward compatible constructor
public KubernetesApiServiceDiscovery(ExtendedActorSystem system)
: this(system, system.Settings.Config.GetConfig(FullPath(DefaultPath)))
{
}
public KubernetesApiServiceDiscovery(ExtendedActorSystem system, Configuration.Config config)
{
_log = Logging.GetLogger(system, this);
_settings = KubernetesDiscoverySettings.Create(config);
if(_log.IsDebugEnabled)
_log.Debug("Settings {0}", _settings);
var host = Environment.GetEnvironmentVariable(_settings.ApiServiceHostEnvName);
var port = Environment.GetEnvironmentVariable(_settings.ApiServicePortEnvName);
if(string.IsNullOrWhiteSpace(host))
{
_log.Error($"The Kubernetes host environment variable [{_settings.ApiServiceHostEnvName}] is empty, could not create Kubernetes client.");
} else if (string.IsNullOrWhiteSpace(port))
{
_log.Error($"The Kubernetes port environment variable [{_settings.ApiServicePortEnvName}] is empty, could not create Kubernetes client.");
}
else
{
var clientConfig = KubernetesClientConfiguration.BuildDefaultConfig();
clientConfig.TokenProvider = new TokenFileAuth(_settings.ApiTokenPath);
clientConfig.ClientCertificateFilePath = _settings.ApiCaPath;
clientConfig.Namespace = PodNamespace;
_host = clientConfig.Host = $"https://{host}:{port}";
_client = new Kubernetes(clientConfig);
}
}
public override async Task<Resolved> Lookup(Lookup lookup, TimeSpan resolveTimeout)
{
if (_client == null)
{
_log.Error("Failed to perform Kubernetes API discovery lookup. The Kubernetes client was not configured properly.");
throw new KubernetesException("Failed to perform Kubernetes API discovery lookup. The Kubernetes client was not configured properly.");
}
var labelSelector = _settings.PodLabelSelector(lookup.ServiceName);
if(_log.IsInfoEnabled)
_log.Info("Querying for pods with label selector: [{0}]. Namespace: [{1}]. AllNamespaces: [{2}]. Port: [{3}]",
labelSelector, PodNamespace, _settings.AllNamespaces, lookup.PortName);
var cts = new CancellationTokenSource(resolveTimeout);
V1PodList podList;
try
{
if(_settings.AllNamespaces)
podList = await ListPodForAllNamespaces(labelSelector, cts);
else
podList = await ListNamespacedPod(labelSelector, cts);
}
catch (SerializationException e)
{
_log.Warning(e, "Failed to deserialize Kubernetes API response. Status code: [{0}]. Response body: [{1}].");
podList = new V1PodList(new List<V1Pod>());
}
catch (HttpOperationException e)
{
switch (e.Response.StatusCode)
{
case HttpStatusCode.Forbidden:
_log.Warning(
e,
"Forbidden to communicate with Kubernetes API server; check RBAC settings. Reason: [{0}]. Response: [{1}]",
e.Response.ReasonPhrase,
e.Response.Content);
throw new KubernetesException("Forbidden when communicating with the Kubernetes API. Check RBAC settings.", e);
case var other:
_log.Warning(
e,
"Non-200 when communicating with Kubernetes API server. Status code: [{0}:{1}]. Reason: [{2}]. Response body: [{3}]",
(int)other,
other,
e.Response.ReasonPhrase,
e.Response.Content);
throw new KubernetesException($"Non-200 from Kubernetes API server: {other}", e);
}
}
catch (OperationCanceledException)
{
throw new KubernetesException("Timed out while trying to retrieve pod list from {_host}");
}
catch (Exception e)
{
throw new KubernetesApiException($"Failed to retrieve pod list from {_host}", e);
}
finally
{
cts.Dispose();
}
if (podList.Items.Count == 0)
{
if(_log.IsWarningEnabled)
_log.Warning(
"No pods found in namespace [{0}] using the pod label selector [{1}]. " +
"Make sure that the namespace is correct and the label are applied to the StatefulSet or Deployment.",
PodNamespace, labelSelector);
return new Resolved(lookup.ServiceName, new List<ResolvedTarget>());
}
var addresses = Targets(podList, lookup.PortName, PodNamespace, _settings.PodDomain, _settings.RawIp, _settings.ContainerName).ToList();
if (addresses.Count == 0 && podList.Items.Count > 0 && _log.IsWarningEnabled)
{
var containerPorts = podList.Items
.Select(p => p.Spec)
.SelectMany(s => s.Containers)
.Select(c => new ContainerDebugView(c.Name, c.Ports))
.Select(JsonConvert.SerializeObject);
_log.Warning(
"No targets found from pod list. Is the correct port name configured? Current configuration: [{0}]. Ports on pods:\n\t{1}",
lookup.PortName,
string.Join(",\n\t", containerPorts));
}
return new Resolved(serviceName: lookup.ServiceName, addresses: addresses);
}
private async Task<V1PodList> ListNamespacedPod(string labelSelector, CancellationTokenSource cts)
{
V1PodList podList;
#if !NET6_0_OR_GREATER
Debug.Assert(_client != null, nameof(_client) + " != null");
var result = await _client!.ListNamespacedPodWithHttpMessagesAsync(
namespaceParameter: PodNamespace,
labelSelector: labelSelector,
cancellationToken: cts.Token)
.ConfigureAwait(false);
podList = result.Body;
#else
var result = await _client.ListNamespacedPodAsync(
namespaceParameter: PodNamespace,
labelSelector: labelSelector,
cancellationToken: cts.Token);
podList = result;
#endif
return podList;
}
private async Task<V1PodList> ListPodForAllNamespaces(string labelSelector, CancellationTokenSource cts)
{
V1PodList podList;
#if !NET6_0_OR_GREATER
Debug.Assert(_client != null, nameof(_client) + " != null");
var result = await _client!.ListPodForAllNamespacesWithHttpMessagesAsync(
labelSelector: labelSelector,
cancellationToken: cts.Token)
.ConfigureAwait(false);
podList = result.Body;
#else
var result = await _client.ListPodForAllNamespacesAsync(
labelSelector: labelSelector,
cancellationToken: cts.Token);
podList = result;
#endif
return podList;
}
// This uses blocking IO, and so should only be used to read configuration at startup.
private string? ReadConfigVarFromFileSystem(string path, string name)
{
if (File.Exists(path))
{
try
{
return File.ReadAllText(path);
}
catch (Exception e)
{
_log.Error(e, "Error reading {0} from {1}", name, path);
return null;
}
}
_log.Warning("Unable to read {0} from {1} because it does not exists.", name, path);
return null;
}
internal static IEnumerable<ResolvedTarget> Targets(
V1PodList podList,
string? portName,
string podNamespace,
string podDomain,
bool rawIp,
string? containerName)
{
foreach (var item in podList.Items)
{
if (item.Metadata.DeletionTimestamp != null)
continue;
var itemStatus = item.Status;
if (!itemStatus.Phase.ToLowerInvariant().Contains("running"))
continue;
var itemSpec = item.Spec;
if (containerName != null)
{
if (itemStatus.ContainerStatuses
.Where(s => s.Name.Equals(containerName))
.Any(s => s.State.Waiting != null))
continue;
}
var ip = itemStatus.PodIP;
if(string.IsNullOrWhiteSpace(ip))
continue;
// Maybe port is a nullable of a port, and will be null if no portName was requested
int? maybePort = null;
if (portName != null)
{
// Bugfix #223, container might not expose ports, therefore should be excluded if port name is queried
var validContainers = itemSpec.Containers.Where(c => c.Ports != null);
var validPort = validContainers
.SelectMany(c => c.Ports)
.FirstOrDefault(p => p.Name?.Contains(portName) ?? false);
if (validPort == null)
continue;
maybePort = validPort.ContainerPort;
}
var hostOrIp = rawIp ? ip : $"{ip.Replace('.', '-')}.{podNamespace}.pod.{podDomain}";
yield return new ResolvedTarget(
host: hostOrIp,
port: maybePort,
address: IPAddress.Parse(ip));
}
}
}
internal class ContainerDebugView
{
public ContainerDebugView(string name, IList<V1ContainerPort> ports)
{
Name = name;
Ports = ports;
}
[JsonProperty(PropertyName = "podName")]
public string Name { get; set; }
[JsonProperty(PropertyName = "ports")]
public IList<V1ContainerPort> Ports { get; set; }
}
}