Skip to content

Commit 876ac21

Browse files
authored
+semver:minor - Feature/refactor waiting dependencies (#486)
* Remove WaitHandler.cs * Simplify start dependency * ExceptionContainer.cs * Message * Update dotnet.yml * Formatting Markdown * Expose StartInternal * Refactor StartModule * Fix test --------- Co-authored-by: Tom Longhurst <[email protected]>
1 parent 3de0a82 commit 876ac21

File tree

14 files changed

+174
-221
lines changed

14 files changed

+174
-221
lines changed

.github/workflows/dotnet.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ jobs:
7474
GitHub__PullRequest__Branch: ${{ github.event.pull_request.head.ref }}
7575
GitHub__PullRequest__Sha: ${{ github.event.pull_request.head.sha }}
7676
GitHub__PullRequest__Author: ${{ github.event.pull_request.user.login }}
77-
Publish__ShouldPublish: ${{ github.event.inputs.publish-packages }}
78-
Publish__IsAlpha: ${{ github.event.inputs.is-alpha }}
77+
Publish__ShouldPublish: ${{ github.event.inputs.publish-packages || false}}
78+
Publish__IsAlpha: ${{ github.event.inputs.is-alpha || true}}
7979
Codacy__ApiKey: ${{ secrets.CODACY_APIKEY }}
8080
CodeCov__Token: ${{ secrets.CODECOV_TOKEN }}
8181
EMAIL_PASSWORD: ${{ secrets.EMAIL_PASSWORD }}

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ Define your pipeline in .NET! Strong types, intellisense, parallelisation, and t
6363
| ModularPipelines.WinGet | Helpers for interacting with the Windows Package Manager. | [![nuget](https://img.shields.io/nuget/v/ModularPipelines.WinGet.svg)](https://www.nuget.org/packages/ModularPipelines.WinGet/) |
6464
| ModularPipelines.Yarn | Helpers for interacting with Yarn CLI. | [![nuget](https://img.shields.io/nuget/v/ModularPipelines.Yarn.svg)](https://www.nuget.org/packages/ModularPipelines.Yarn/) |
6565

66-
6766
## Getting Started
6867

6968
If you want to see how to get started, or want to know more about ModularPipelines, [read the Documentation here](https://thomhurst.github.io/ModularPipelines)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
* Refactor how modules wait for their dependencies

src/ModularPipelines/DependencyInjection/DependencyInjectionSetup.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public static void Initialize(IServiceCollection services)
6767
// Singletons
6868
services
6969
.AddSingleton<IConsolePrinter, ConsolePrinter>()
70+
.AddSingleton<IExceptionContainer, ExceptionContainer>()
7071
.AddSingleton<IPipelineContextProvider, ModuleContextProvider>()
7172
.AddSingleton<IDependencyChainProvider, DependencyChainProvider>()
7273
.AddSingleton<IDependencyDetector, DependencyDetector>()
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
namespace ModularPipelines.Engine;
2+
3+
internal class ExceptionContainer : IExceptionContainer
4+
{
5+
private readonly List<Exception> _exceptions = [];
6+
7+
public void RegisterException(Exception exception)
8+
{
9+
_exceptions.Add(exception);
10+
}
11+
12+
public void ThrowExceptions()
13+
{
14+
if (_exceptions.Count == 1)
15+
{
16+
throw _exceptions.First();
17+
}
18+
19+
if (_exceptions.Any())
20+
{
21+
throw new AggregateException(_exceptions);
22+
}
23+
}
24+
}

src/ModularPipelines/Engine/Executors/ModuleHandlers/IWaitHandler.cs

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/ModularPipelines/Engine/Executors/ModuleHandlers/WaitHandler.cs

Lines changed: 0 additions & 93 deletions
This file was deleted.

src/ModularPipelines/Engine/Executors/PipelineExecutor.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,20 @@ internal class PipelineExecutor : IPipelineExecutor
1111
private readonly IModuleExecutor _moduleExecutor;
1212
private readonly EngineCancellationToken _engineCancellationToken;
1313
private readonly ILogger<PipelineExecutor> _logger;
14+
private readonly IExceptionContainer _exceptionContainer;
1415

1516
public PipelineExecutor(
1617
IPipelineSetupExecutor pipelineSetupExecutor,
1718
IModuleExecutor moduleExecutor,
1819
EngineCancellationToken engineCancellationToken,
19-
ILogger<PipelineExecutor> logger)
20+
ILogger<PipelineExecutor> logger,
21+
IExceptionContainer exceptionContainer)
2022
{
2123
_pipelineSetupExecutor = pipelineSetupExecutor;
2224
_moduleExecutor = moduleExecutor;
2325
_engineCancellationToken = engineCancellationToken;
2426
_logger = logger;
27+
_exceptionContainer = exceptionContainer;
2528
}
2629

2730
public async Task<PipelineSummary> ExecuteAsync(List<ModuleBase> runnableModules,
@@ -57,6 +60,8 @@ public async Task<PipelineSummary> ExecuteAsync(List<ModuleBase> runnableModules
5760
{
5861
throw exception;
5962
}
63+
64+
_exceptionContainer.ThrowExceptions();
6065

6166
return pipelineSummary;
6267
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace ModularPipelines.Engine;
2+
3+
internal interface IExceptionContainer
4+
{
5+
void RegisterException(Exception exception);
6+
7+
void ThrowExceptions();
8+
}

src/ModularPipelines/Engine/ModuleExecutor.cs

Lines changed: 61 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
using System.Collections.Concurrent;
22
using System.Reflection;
33
using EnumerableAsyncProcessor.Extensions;
4+
using Microsoft.Extensions.Logging;
45
using Microsoft.Extensions.Options;
56
using ModularPipelines.Attributes;
7+
using ModularPipelines.Exceptions;
68
using ModularPipelines.Extensions;
9+
using ModularPipelines.Models;
710
using ModularPipelines.Modules;
811
using ModularPipelines.Options;
912

@@ -15,18 +18,24 @@ internal class ModuleExecutor : IModuleExecutor
1518
private readonly IOptions<PipelineOptions> _pipelineOptions;
1619
private readonly ISafeModuleEstimatedTimeProvider _moduleEstimatedTimeProvider;
1720
private readonly IModuleDisposer _moduleDisposer;
21+
private readonly IEnumerable<ModuleBase> _allModules;
22+
private readonly IExceptionContainer _exceptionContainer;
1823

1924
private readonly ConcurrentDictionary<ModuleBase, Task<ModuleBase>> _moduleExecutionTasks = new();
2025

2126
public ModuleExecutor(IPipelineSetupExecutor pipelineSetupExecutor,
2227
IOptions<PipelineOptions> pipelineOptions,
2328
ISafeModuleEstimatedTimeProvider moduleEstimatedTimeProvider,
24-
IModuleDisposer moduleDisposer)
29+
IModuleDisposer moduleDisposer,
30+
IEnumerable<ModuleBase> allModules,
31+
IExceptionContainer exceptionContainer)
2532
{
2633
_pipelineSetupExecutor = pipelineSetupExecutor;
2734
_pipelineOptions = pipelineOptions;
2835
_moduleEstimatedTimeProvider = moduleEstimatedTimeProvider;
2936
_moduleDisposer = moduleDisposer;
37+
_allModules = allModules;
38+
_exceptionContainer = exceptionContainer;
3039
}
3140

3241
public async Task<IEnumerable<ModuleBase>> ExecuteAsync(IReadOnlyList<ModuleBase> modules)
@@ -75,7 +84,7 @@ public async Task<ModuleBase> ExecuteAsync(ModuleBase module)
7584
{
7685
try
7786
{
78-
return await ExecuteWithLockAsync(module);
87+
return await StartModule(module);
7988
}
8089
catch (TaskCanceledException)
8190
{
@@ -86,14 +95,6 @@ public async Task<ModuleBase> ExecuteAsync(ModuleBase module)
8695
}
8796
}
8897

89-
private Task<ModuleBase> ExecuteWithLockAsync(ModuleBase module)
90-
{
91-
lock (module)
92-
{
93-
return _moduleExecutionTasks.GetOrAdd(module, @base => StartModule(module));
94-
}
95-
}
96-
9798
private async Task<ModuleBase[]> ProcessKeyedNonParallelModules(List<ModuleBase> keyedNonParallelModules,
9899
List<ModuleBase> moduleResults)
99100
{
@@ -150,44 +151,70 @@ private async Task<ModuleBase[]> ProcessKeyedNonParallelModules(List<ModuleBase>
150151

151152
private async Task<ModuleBase> StartModule(ModuleBase module)
152153
{
153-
if (module.IsStarted || module.ExecutionTask.IsCompleted)
154+
return await _moduleExecutionTasks.GetOrAdd(module, async @base =>
154155
{
155-
await module.ExecutionTask;
156-
return module;
157-
}
156+
var dependencies = module.GetModuleDependencies();
158157

159-
try
160-
{
161-
await _pipelineSetupExecutor.OnBeforeModuleStartAsync(module);
158+
foreach (var dependency in dependencies)
159+
{
160+
await StartDependency(module, dependency.DependencyType, dependency.IgnoreIfNotRegistered);
161+
}
162162

163-
await module.ExecutionTask;
163+
try
164+
{
165+
await _pipelineSetupExecutor.OnBeforeModuleStartAsync(module);
164166

165-
await _moduleEstimatedTimeProvider.SaveModuleTimeAsync(module.GetType(), module.Duration);
167+
await module.StartInternal();
166168

167-
await _pipelineSetupExecutor.OnAfterModuleEndAsync(module);
169+
await _moduleEstimatedTimeProvider.SaveModuleTimeAsync(module.GetType(), module.Duration);
168170

169-
return module;
170-
}
171-
finally
172-
{
173-
if (!_pipelineOptions.Value.ShowProgressInConsole)
171+
await _pipelineSetupExecutor.OnAfterModuleEndAsync(module);
172+
173+
return module;
174+
}
175+
finally
174176
{
175-
await _moduleDisposer.DisposeAsync(module);
177+
if (!_pipelineOptions.Value.ShowProgressInConsole)
178+
{
179+
await _moduleDisposer.DisposeAsync(module);
180+
}
176181
}
177-
}
182+
});
178183
}
179184

180-
private async Task<IEnumerable<ModuleBase>> ProcessGroup(
181-
IGrouping<string?, ModuleBase> moduleBases,
182-
ICollection<ModuleBase> moduleResults)
185+
private async Task StartDependency(ModuleBase requestingModule, Type dependencyType, bool ignoreIfNotRegistered)
183186
{
184-
var executionProcessor = moduleBases.SelectAsync(ExecuteAsync);
187+
var module = _allModules.FirstOrDefault(x => x.GetType() == dependencyType);
188+
189+
if (module is null && ignoreIfNotRegistered)
190+
{
191+
requestingModule.Context.Logger.LogDebug("{Module} was not registered so not waiting", dependencyType.Name);
192+
return;
193+
}
185194

186-
if (string.IsNullOrEmpty(moduleBases.Key))
195+
if (module is null)
187196
{
188-
return await executionProcessor.ProcessInParallel().GetEnumerableTasks().ToArray().WhenAllFailFast();
197+
throw new ModuleNotRegisteredException($"The module {dependencyType.Name} has not been registered", null);
189198
}
199+
200+
requestingModule.Context.Logger.LogDebug("{RequestingModule} is waiting for {Module}", requestingModule.GetType().Name, dependencyType.Name);
190201

191-
return await executionProcessor.ProcessOneAtATime();
202+
try
203+
{
204+
await StartModule(module);
205+
}
206+
catch (Exception e) when (requestingModule.ModuleRunType == ModuleRunType.AlwaysRun)
207+
{
208+
_exceptionContainer.RegisterException(new AlwaysRunPostponedException($"{dependencyType.Name} threw an exception when {requestingModule.GetType().Name} was waiting for it as a dependency", e));
209+
requestingModule.Context.Logger.LogError(e, "Ignoring Exception due to 'AlwaysRun' set");
210+
}
211+
catch (DependencyFailedException)
212+
{
213+
throw;
214+
}
215+
catch (Exception e)
216+
{
217+
throw new DependencyFailedException(e, module);
218+
}
192219
}
193220
}

0 commit comments

Comments
 (0)