Skip to content

Commit b568191

Browse files
committed
2 parents 641452f + 692be7c commit b568191

File tree

8 files changed

+42
-17
lines changed

8 files changed

+42
-17
lines changed

src/Dirichlet

src/Nethermind/Nethermind.DataMarketplace.Consumers/Services/ConsumerService.cs

+13-5
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ await _consumerNotifier.SendDepositConfirmationsStatusAsync(deposit.Id, deposit.
282282
{
283283
var confirmations = 0u;
284284
var block = _blockchainBridge.FindBlock(headHash);
285-
while (confirmations < _requiredBlockConfirmations)
285+
do
286286
{
287287
if (block is null)
288288
{
@@ -312,14 +312,14 @@ await _consumerNotifier.SendDepositConfirmationsStatusAsync(deposit.Id, deposit.
312312
break;
313313
}
314314

315-
if (receipt.BlockHash == block.Hash)
315+
if (receipt.BlockHash == block.Hash || block.Number <= receipt.BlockNumber)
316316
{
317317
break;
318318
}
319319

320320
block = _blockchainBridge.FindBlock(block.ParentHash);
321-
}
322-
321+
} while (confirmations < _requiredBlockConfirmations);
322+
323323
var blocksDifference = _blockchainBridge.Head.Number - receipt.BlockNumber;
324324
if (blocksDifference >= _requiredBlockConfirmations && confirmations < _requiredBlockConfirmations)
325325
{
@@ -898,7 +898,15 @@ private async Task<Keccak> ToggleDataStreamAsync(Keccak depositId, bool enabled,
898898

899899
if (enabled)
900900
{
901-
if (_logger.IsInfo) _logger.Info($"Sending enable data stream for deposit: '{depositId}', client: '{client}'.");
901+
if (dataAsset.QueryType == QueryType.Stream)
902+
{
903+
if (_logger.IsInfo) _logger.Info($"Sending enable data stream for deposit: '{depositId}', client: '{client}'.");
904+
}
905+
else if (dataAsset.QueryType == QueryType.Query)
906+
{
907+
if (_logger.IsInfo) _logger.Info($"Sending the data query for deposit: '{depositId}', client: '{client}'.");
908+
}
909+
902910
providerPeer.SendEnableDataStream(depositId, client, args);
903911
}
904912
else

src/Nethermind/Nethermind.Grpc/Clients/GrpcClient.cs

+19-6
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
using System;
2121
using System.Collections.Generic;
2222
using System.Linq;
23+
using System.Threading;
2324
using System.Threading.Tasks;
2425
using Grpc.Core;
2526
using Nethermind.Logging;
@@ -50,10 +51,13 @@ public GrpcClient(string host, int port, int reconnectionInterval, ILogManager l
5051

5152
if (reconnectionInterval < 0)
5253
{
53-
throw new ArgumentException($"Invalid reconnection interval: {reconnectionInterval} ms.", nameof(reconnectionInterval));
54+
throw new ArgumentException($"Invalid reconnection interval: {reconnectionInterval} ms.",
55+
nameof(reconnectionInterval));
5456
}
5557

56-
_address = string.IsNullOrWhiteSpace(host) ? throw new ArgumentException("Missing gRPC host", nameof(host)) : $"{host}:{port}";
58+
_address = string.IsNullOrWhiteSpace(host)
59+
? throw new ArgumentException("Missing gRPC host", nameof(host))
60+
: $"{host}:{port}";
5761
_reconnectionInterval = reconnectionInterval;
5862
_logger = logManager.GetClassLogger();
5963
}
@@ -111,12 +115,14 @@ public async Task<string> QueryAsync(IEnumerable<string> args)
111115
{
112116
if (_logger.IsError) _logger.Error(ex.Message, ex);
113117
await TryReconnectAsync();
114-
return string.Empty;;
118+
return string.Empty;
115119
}
116120
}
117121

118-
public async Task SubscribeAsync(Action<string> callback, Func<bool> enabled, IEnumerable<string> args)
122+
public async Task SubscribeAsync(Action<string> callback, Func<bool> enabled, IEnumerable<string> args,
123+
CancellationToken? token = null)
119124
{
125+
var cancellationToken = token ?? CancellationToken.None;
120126
try
121127
{
122128
if (!_connected)
@@ -129,14 +135,20 @@ public async Task SubscribeAsync(Action<string> callback, Func<bool> enabled, IE
129135
Args = {args ?? Enumerable.Empty<string>()}
130136
}))
131137
{
132-
while (enabled() && _connected && await stream.ResponseStream.MoveNext())
138+
while (enabled() && _connected && !cancellationToken.IsCancellationRequested &&
139+
await stream.ResponseStream.MoveNext(cancellationToken))
133140
{
134141
callback(stream.ResponseStream.Current.Data);
135142
}
136143
}
137144
}
138145
catch (Exception ex)
139146
{
147+
if (cancellationToken.IsCancellationRequested)
148+
{
149+
return;
150+
}
151+
140152
if (_logger.IsError) _logger.Error(ex.Message, ex);
141153
await TryReconnectAsync();
142154
}
@@ -146,7 +158,8 @@ private async Task TryReconnectAsync()
146158
{
147159
_connected = false;
148160
_retry++;
149-
if (_logger.IsWarn) _logger.Warn($"Retrying ({_retry}) gRPC connection to: '{_address}' in {_reconnectionInterval} ms.");
161+
if (_logger.IsWarn)
162+
_logger.Warn($"Retrying ({_retry}) gRPC connection to: '{_address}' in {_reconnectionInterval} ms.");
150163
await Task.Delay(_reconnectionInterval);
151164
await StartAsync();
152165
}

src/Nethermind/Nethermind.Grpc/IGrpcClient.cs

+4-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
using System;
2020
using System.Collections.Generic;
21+
using System.Threading;
2122
using System.Threading.Tasks;
2223

2324
namespace Nethermind.Grpc
@@ -27,6 +28,8 @@ public interface IGrpcClient
2728
Task StartAsync();
2829
Task StopAsync();
2930
Task<string> QueryAsync(IEnumerable<string> args);
30-
Task SubscribeAsync(Action<string> callback, Func<bool> enabled, IEnumerable<string> args);
31+
32+
Task SubscribeAsync(Action<string> callback, Func<bool> enabled, IEnumerable<string> args,
33+
CancellationToken? token = null);
3134
}
3235
}

src/Nethermind/Nethermind.Grpc/Servers/GrpcServer.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ public Task PublishAsync<T>(T data, string client) where T : class
9393
{
9494
results.TryAdd(payload);
9595
}
96-
catch
96+
catch (Exception ex)
9797
{
98-
// ignored
98+
if (_logger.IsError) _logger.Error(ex.Message, ex);
9999
}
100100
}
101101

src/Nethermind/Nethermind.Runner/RunnerAppBase.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ public abstract class RunnerAppBase
5858
private IRunner _jsonRpcRunner = NullRunner.Instance;
5959
private IRunner _ethereumRunner = NullRunner.Instance;
6060
private IRunner _grpcRunner = NullRunner.Instance;
61-
private IRunner _grpcClientRunner = NullRunner.Instance;
6261
private TaskCompletionSource<object> _cancelKeySource;
6362
private IMonitoringService _monitoringService;
6463

@@ -242,6 +241,7 @@ await _monitoringService.StartAsync().ContinueWith(x =>
242241

243242
protected async Task StopAsync()
244243
{
244+
_grpcRunner?.StopAsync();
245245
_monitoringService?.StopAsync();
246246
_jsonRpcRunner?.StopAsync(); // do not await
247247
var ethereumTask = _ethereumRunner?.StopAsync() ?? Task.CompletedTask;

src/Nethermind/Nethermind.Runner/Runners/GrpcRunner.cs

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public Task Start()
3535
public async Task StopAsync()
3636
{
3737
if (_logger.IsInfo) _logger.Info($"Stopping GRPC server...");
38+
await _server.ShutdownAsync();
3839
await GrpcEnvironment.ShutdownChannelsAsync();
3940
if (_logger.IsInfo) _logger.Info($"GRPC shutdown complete.");
4041
}

src/tests

Submodule tests updated 15560 files

0 commit comments

Comments
 (0)