Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Multiple Endpoints #1083

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Cluster/ClusterContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void SetupSingleInstance(bool disableSlotVerification = false)
{
QuietMode = true,
EnableCluster = !disableSlotVerification,
EndPoint = new IPEndPoint(IPAddress.Loopback, port),
EndPoints = [new IPEndPoint(IPAddress.Loopback, port)],
CleanClusterConfig = true,
};
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
Expand Down
2 changes: 1 addition & 1 deletion benchmark/BDN.benchmark/Embedded/EmbeddedRespServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal sealed class EmbeddedRespServer : GarnetServer
/// <param name="opts">Server options to configure the base GarnetServer instance</param>
/// <param name="loggerFactory">Logger factory to configure the base GarnetServer instance</param>
/// <param name="server">Server network</param>
public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory = null, GarnetServerEmbedded server = null) : base(opts, loggerFactory, server)
public EmbeddedRespServer(GarnetServerOptions opts, ILoggerFactory loggerFactory = null, GarnetServerEmbedded server = null) : base(opts, loggerFactory, [server])
{
this.garnetServerEmbedded = server;
this.subscribeBroker = opts.DisablePubSub ? null :
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public unsafe ClusterManager(ClusterProvider clusterProvider, ILogger logger = n
clusterConfigDevice = deviceFactory.Get(new FileDescriptor(directoryName: "", fileName: "nodes.conf"));
pool = new(1, (int)clusterConfigDevice.SectorSize);

if (opts.EndPoint is not IPEndPoint endpoint)
if (opts.EndPoints[0] is not IPEndPoint endpoint)
throw new NotImplementedException("Cluster mode for unix domain sockets has not been implemented.");

var address = clusterProvider.storeWrapper.GetIp();
Expand Down
28 changes: 15 additions & 13 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -425,23 +425,25 @@ internal ReplicationLogCheckpointManager GetReplicationLogCheckpointManager(Stor
/// <returns></returns>
internal bool BumpAndWaitForEpochTransition()
{
var server = storeWrapper.TcpServer;
BumpCurrentEpoch();
while (true)
foreach (var server in storeWrapper.TcpServer)
{
retry:
Thread.Yield();
// Acquire latest bumped epoch
var currentEpoch = GarnetCurrentEpoch;
var sessions = server.ActiveClusterSessions();
foreach (var s in sessions)
while (true)
{
var entryEpoch = s.LocalCurrentEpoch;
// Retry if at least one session has not yet caught up to the current epoch.
if (entryEpoch != 0 && entryEpoch < currentEpoch)
goto retry;
retry:
Thread.Yield();
// Acquire latest bumped epoch
var currentEpoch = GarnetCurrentEpoch;
var sessions = ((GarnetServerTcp)server).ActiveClusterSessions();
foreach (var s in sessions)
{
var entryEpoch = s.LocalCurrentEpoch;
// Retry if at least one session has not yet caught up to the current epoch.
if (entryEpoch != 0 && entryEpoch < currentEpoch)
goto retry;
}
break;
}
break;
}
return true;
}
Expand Down
6 changes: 3 additions & 3 deletions libs/cluster/Server/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ public async Task TryMeetAsync(string address, int port, bool acquireLock = true

if (gsn == null)
{
var endpoint = await Format.TryCreateEndpoint(address, port, useForBind: true, logger: logger);
if (endpoint == null)
var endpoints = await Format.TryCreateEndpoint(address, port, useForBind: true, logger: logger);
if (endpoints == null)
{
logger?.LogError("Could not parse endpoint {address} {port}", address, port);
}
gsn = new GarnetServerNode(clusterProvider, endpoint, tlsOptions?.TlsClientOptions, logger: logger);
gsn = new GarnetServerNode(clusterProvider, endpoints[0], tlsOptions?.TlsClientOptions, logger: logger);
created = true;
}

Expand Down
85 changes: 62 additions & 23 deletions libs/common/Format.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
Expand Down Expand Up @@ -31,68 +33,105 @@ internal static bool IsNullOrWhiteSpace([NotNullWhen(false)] this string s) =>
#pragma warning disable format
public static class Format
{
/// <summary>
/// Parse address list string containing address separated by whitespace
/// </summary>
/// <param name="addressList"></param>
/// <param name="port"></param>
/// <param name="endpoints"></param>
/// <param name="errorHostnameOrAddress"></param>
/// <param name="useForBind"></param>
/// <param name="logger"></param>
/// <returns>True if parse and address validation was successful, otherwise false</returns>
public static bool TryParseAddressList(string addressList, int port, out EndPoint[] endpoints, out string errorHostnameOrAddress, bool useForBind = false, ILogger logger = null)
{
endpoints = null;
errorHostnameOrAddress = null;
// Check if input null or empty
if (string.IsNullOrEmpty(addressList) || string.IsNullOrWhiteSpace(addressList))
{
endpoints = [new IPEndPoint(IPAddress.Any, port)];
return true;
}

var addresses = addressList.Split(' ');
var endpointList = new List<EndPoint>();
// Validate addresses and create endpoints
foreach (var singleAddressOrHostname in addresses)
{
var e = TryCreateEndpoint(singleAddressOrHostname, port, useForBind, logger).Result;
if(e == null)
{
endpoints = null;
errorHostnameOrAddress = singleAddressOrHostname;
return false;
}
endpointList.AddRange(e);
}
endpoints = [.. endpointList];

return true;
}

/// <summary>
/// Try to create an endpoint from address and port
/// </summary>
/// <param name="addressOrHostname">This could be an address or a hostname that the method tries to resolve</param>
/// <param name="singleAddressOrHostname">This could be an address or a hostname that the method tries to resolve</param>
/// <param name="port"></param>
/// <param name="useForBind">Binding does not poll connection because is supposed to be called from the server side</param>
/// <param name="logger"></param>
/// <returns></returns>
public static async Task<EndPoint> TryCreateEndpoint(string addressOrHostname, int port, bool useForBind = false, ILogger logger = null)
public static async Task<EndPoint[]> TryCreateEndpoint(string singleAddressOrHostname, int port, bool useForBind = false, ILogger logger = null)
{
IPEndPoint endpoint = null;
if (string.IsNullOrEmpty(addressOrHostname) || string.IsNullOrWhiteSpace(addressOrHostname))
return new IPEndPoint(IPAddress.Any, port);
if (string.IsNullOrEmpty(singleAddressOrHostname) || string.IsNullOrWhiteSpace(singleAddressOrHostname))
return [new IPEndPoint(IPAddress.Any, port)];

if (IPAddress.TryParse(addressOrHostname, out var ipAddress))
return new IPEndPoint(ipAddress, port);
if (singleAddressOrHostname.Equals("localhost", StringComparison.CurrentCultureIgnoreCase))
return [new IPEndPoint(IPAddress.Loopback, port)];

if (IPAddress.TryParse(singleAddressOrHostname, out var ipAddress))
return [new IPEndPoint(ipAddress, port)];

// Sanity check, there should be at least one ip address available
try
{
var ipAddresses = Dns.GetHostAddresses(addressOrHostname);
var ipAddresses = Dns.GetHostAddresses(singleAddressOrHostname);
if (ipAddresses.Length == 0)
{
logger?.LogError("No IP address found for hostname:{hostname}", addressOrHostname);
logger?.LogError("No IP address found for hostname:{hostname}", singleAddressOrHostname);
return null;
}

if (useForBind)
{
foreach (var entry in ipAddresses)
{
endpoint = new IPEndPoint(entry, port);
var endpoint = new IPEndPoint(entry, port);
var IsListening = await IsReachable(endpoint);
if (IsListening) break;
if (IsListening) return [endpoint];
}
}
else
{
var machineHostname = GetHostName();

// Hostname does match the one acquired from machine name
if (!addressOrHostname.Equals(machineHostname, StringComparison.OrdinalIgnoreCase))
// User-provided hostname does not match the machine hostname
if (!singleAddressOrHostname.Equals(machineHostname, StringComparison.OrdinalIgnoreCase))
{
logger?.LogError("Provided hostname does not much acquired machine name {addressOrHostname} {machineHostname}!", addressOrHostname, machineHostname);
logger?.LogError("Provided hostname does not much acquired machine name {addressOrHostname} {machineHostname}!", singleAddressOrHostname, machineHostname);
return null;
}

if (ipAddresses.Length > 1) {
logger?.LogError("Error hostname resolved to multiple endpoints. Garnet does not support multiple endpoints!");
return null;
}

return new IPEndPoint(ipAddresses[0], port);
return ipAddresses.Select(ip => new IPEndPoint(ip, port)).ToArray();
}
logger?.LogError("No reachable IP address found for hostname:{hostname}", addressOrHostname);
logger?.LogError("No reachable IP address found for hostname:{hostname}", singleAddressOrHostname);
}
catch (Exception ex)
{
logger?.LogError(ex, "Error while trying to resolve hostname:{hostname}", addressOrHostname);
logger?.LogError("Error while trying to resolve hostname: {exMessage} [{hostname}]", ex.Message, singleAddressOrHostname);
}

return endpoint;
return null;

async Task<bool> IsReachable(IPEndPoint endpoint)
{
Expand Down
13 changes: 5 additions & 8 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -595,9 +595,6 @@ public bool IsValid(out List<string> invalidOptions, ILogger logger = null)
this.runtimeLogger = logger;
foreach (var prop in typeof(Options).GetProperties())
{
if (prop.Name.Equals("runtimeLogger"))
continue;

// Ignore if property is not decorated with the OptionsAttribute or the ValidationAttribute
var validationAttr = prop.GetCustomAttributes(typeof(ValidationAttribute)).FirstOrDefault();
if (!Attribute.IsDefined(prop, typeof(OptionAttribute)) || validationAttr == null)
Expand Down Expand Up @@ -637,15 +634,15 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
var checkpointDir = CheckpointDir;
if (!useAzureStorage) checkpointDir = new DirectoryInfo(string.IsNullOrEmpty(checkpointDir) ? (string.IsNullOrEmpty(logDir) ? "." : logDir) : checkpointDir).FullName;

EndPoint endpoint;
EndPoint[] endpoints;
if (!string.IsNullOrEmpty(UnixSocketPath))
{
endpoint = new UnixDomainSocketEndPoint(UnixSocketPath);
endpoints = new EndPoint[1];
endpoints[0] = new UnixDomainSocketEndPoint(UnixSocketPath);
}
else
{
endpoint = Format.TryCreateEndpoint(Address, Port, useForBind: false).Result;
if (endpoint == null)
if (!Format.TryParseAddressList(Address, Port, out endpoints, out _) || endpoints.Length == 0)
throw new GarnetException($"Invalid endpoint format {Address} {Port}.");
}

Expand Down Expand Up @@ -701,7 +698,7 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
}
return new GarnetServerOptions(logger)
{
EndPoint = endpoint,
EndPoints = endpoints,
MemorySize = MemorySize,
PageSize = PageSize,
SegmentSize = SegmentSize,
Expand Down
15 changes: 8 additions & 7 deletions libs/host/Configuration/OptionsValidators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -357,17 +357,18 @@ internal IpAddressValidationAttribute(bool isRequired = true) : base(isRequired)
/// <returns></returns>
protected override ValidationResult IsValid(object value, ValidationContext validationContext)
{
if (TryInitialValidation<string>(value, validationContext, out var initValidationResult, out var ipAddress))
if (TryInitialValidation<string>(value, validationContext, out var initValidationResult, out var ipAddresses))
return initValidationResult;

var logger = ((Options)validationContext.ObjectInstance).runtimeLogger;
if (ipAddress.Equals(Localhost, StringComparison.CurrentCultureIgnoreCase) ||
Format.TryCreateEndpoint(ipAddress, 0, useForBind: false, logger: logger).Result != null)
return ValidationResult.Success;
if (!Format.TryParseAddressList(ipAddresses, 0, out _, out var errorHostnameOrAddress, useForBind: false, logger: logger))
{
var baseError = validationContext.MemberName != null ? base.FormatErrorMessage(validationContext.MemberName) : string.Empty;
var errorMessage = $"{baseError} Expected string in IPv4 / IPv6 format (e.g. 127.0.0.1 / 0:0:0:0:0:0:0:1) or 'localhost' or valid hostname. Actual value: {errorHostnameOrAddress}";
return new ValidationResult(errorMessage, [validationContext.MemberName]);
}

var baseError = validationContext.MemberName != null ? base.FormatErrorMessage(validationContext.MemberName) : string.Empty;
var errorMessage = $"{baseError} Expected string in IPv4 / IPv6 format (e.g. 127.0.0.1 / 0:0:0:0:0:0:0:1) or 'localhost' or valid hostname. Actual value: {ipAddress}";
return new ValidationResult(errorMessage, [validationContext.MemberName]);
return ValidationResult.Success;
}
}

Expand Down
Loading
Loading