Skip to content

Commit 7f8f979

Browse files
authored
+semver:minor - Refactor Not in Parallel Exclusive Locks (#488)
* Refactor Not in Parallel Exclusive Locks * Formatting Markdown * Correctly release semaphore lock * CancellationTokenSource * Order keys * Fix waiting for multiple not-in-parallel constraint keys --------- Co-authored-by: Tom Longhurst <[email protected]>
1 parent 876ac21 commit 7f8f979

File tree

3 files changed

+108
-106
lines changed

3 files changed

+108
-106
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
* Refactor how modules wait for their dependencies
1+
* Refactor grabbing exclusive locks for Not-In-Parallel modules

src/ModularPipelines/Engine/IModuleExecutor.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,4 @@ namespace ModularPipelines.Engine;
55
internal interface IModuleExecutor
66
{
77
Task<IEnumerable<ModuleBase>> ExecuteAsync(IReadOnlyList<ModuleBase> modules);
8-
9-
Task<ModuleBase> ExecuteAsync(ModuleBase module);
108
}

src/ModularPipelines/Engine/ModuleExecutor.cs

Lines changed: 107 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -20,170 +20,174 @@ internal class ModuleExecutor : IModuleExecutor
2020
private readonly IModuleDisposer _moduleDisposer;
2121
private readonly IEnumerable<ModuleBase> _allModules;
2222
private readonly IExceptionContainer _exceptionContainer;
23+
private readonly ILogger<ModuleExecutor> _logger;
2324

2425
private readonly ConcurrentDictionary<ModuleBase, Task<ModuleBase>> _moduleExecutionTasks = new();
26+
private readonly object _moduleDictionaryLock = new();
2527

28+
private readonly ConcurrentDictionary<string, Semaphore> _notInParallelKeyedLocks = new();
29+
private readonly object _notInParallelDictionaryLock = new();
30+
2631
public ModuleExecutor(IPipelineSetupExecutor pipelineSetupExecutor,
2732
IOptions<PipelineOptions> pipelineOptions,
2833
ISafeModuleEstimatedTimeProvider moduleEstimatedTimeProvider,
2934
IModuleDisposer moduleDisposer,
3035
IEnumerable<ModuleBase> allModules,
31-
IExceptionContainer exceptionContainer)
36+
IExceptionContainer exceptionContainer,
37+
ILogger<ModuleExecutor> logger)
3238
{
3339
_pipelineSetupExecutor = pipelineSetupExecutor;
3440
_pipelineOptions = pipelineOptions;
3541
_moduleEstimatedTimeProvider = moduleEstimatedTimeProvider;
3642
_moduleDisposer = moduleDisposer;
3743
_allModules = allModules;
3844
_exceptionContainer = exceptionContainer;
45+
_logger = logger;
3946
}
4047

4148
public async Task<IEnumerable<ModuleBase>> ExecuteAsync(IReadOnlyList<ModuleBase> modules)
4249
{
43-
var moduleResults = new List<ModuleBase>();
44-
45-
var nonParallelModules = modules
46-
.Where(x => x.GetType().GetCustomAttribute<NotInParallelAttribute>() != null)
47-
.OrderBy(x => x.DependentModules.Count)
48-
.ToList();
50+
try
51+
{
52+
var nonParallelModules = modules
53+
.Where(x => x.GetType().GetCustomAttribute<NotInParallelAttribute>() != null)
54+
.OrderBy(x => x.DependentModules.Count)
55+
.ToList();
4956

50-
var unKeyedNonParallelModules = nonParallelModules
51-
.Where(x => x.GetType().GetCustomAttribute<NotInParallelAttribute>()!.ConstraintKeys.Length == 0)
52-
.ToList();
57+
var unKeyedNonParallelModules = nonParallelModules
58+
.Where(x => x.GetType().GetCustomAttribute<NotInParallelAttribute>()!.ConstraintKeys.Length == 0)
59+
.ToList();
5360

54-
foreach (var nonParallelModule in unKeyedNonParallelModules)
55-
{
56-
moduleResults.Add(await ExecuteAsync(nonParallelModule));
57-
}
61+
foreach (var nonParallelModule in unKeyedNonParallelModules)
62+
{
63+
await StartModule(nonParallelModule);
64+
}
5865

59-
var keyedNonParallelModules = nonParallelModules
60-
.Where(x => x.GetType().GetCustomAttribute<NotInParallelAttribute>()!.ConstraintKeys.Length != 0)
61-
.ToList();
66+
var keyedNonParallelModules = nonParallelModules
67+
.Where(x => x.GetType().GetCustomAttribute<NotInParallelAttribute>()!.ConstraintKeys.Length != 0)
68+
.ToList();
69+
70+
await ProcessKeyedNonParallelModules(keyedNonParallelModules.ToList());
6271

63-
moduleResults.AddRange(
64-
await ProcessKeyedNonParallelModules(keyedNonParallelModules.ToList(), moduleResults)
65-
);
72+
var parallelModuleTasks = modules.Except(nonParallelModules)
73+
.Select(StartModule)
74+
.ToArray();
6675

67-
var parallelModuleTasks = modules.Except(nonParallelModules)
68-
.Select(ExecuteAsync)
69-
.ToArray();
76+
if (_pipelineOptions.Value.ExecutionMode == ExecutionMode.StopOnFirstException)
77+
{
78+
await parallelModuleTasks.WhenAllFailFast();
79+
}
80+
else
81+
{
82+
await Task.WhenAll(parallelModuleTasks);
83+
}
7084

71-
if (_pipelineOptions.Value.ExecutionMode == ExecutionMode.StopOnFirstException)
72-
{
73-
moduleResults.AddRange(await parallelModuleTasks.WhenAllFailFast());
85+
return modules;
7486
}
75-
else
87+
catch
7688
{
77-
moduleResults.AddRange(await Task.WhenAll(parallelModuleTasks));
78-
}
89+
foreach (var moduleBase in modules.Where(x => x.ModuleRunType == ModuleRunType.AlwaysRun))
90+
{
91+
try
92+
{
93+
await StartModule(moduleBase);
94+
}
95+
catch
96+
{
97+
// Ignored
98+
}
99+
}
79100

80-
return moduleResults;
101+
throw;
102+
}
81103
}
82104

83-
public async Task<ModuleBase> ExecuteAsync(ModuleBase module)
105+
private Semaphore GetLockForKey(string key)
84106
{
85-
try
107+
lock (_notInParallelDictionaryLock)
86108
{
87-
return await StartModule(module);
88-
}
89-
catch (TaskCanceledException)
90-
{
91-
// If the pipeline failed, sometimes a TaskCanceledException can throw before the original exception
92-
// So delay a bit to let the original exception throw first
93-
await Task.Delay(TimeSpan.FromMilliseconds(500));
94-
throw;
109+
return _notInParallelKeyedLocks.GetOrAdd(key, _ => new Semaphore(1, 1));
95110
}
96111
}
97112

98-
private async Task<ModuleBase[]> ProcessKeyedNonParallelModules(List<ModuleBase> keyedNonParallelModules,
99-
List<ModuleBase> moduleResults)
113+
private async Task ProcessKeyedNonParallelModules(List<ModuleBase> keyedNonParallelModules)
100114
{
101-
var currentlyExecutingByKeysLock = new object();
102-
var currentlyExecutingByKeys = new List<(string[] Keys, Task)>();
103-
104-
var executing = new List<Task<ModuleBase>>();
105-
106-
while (keyedNonParallelModules.Count > 0)
107-
{
108-
// Reversing allows us to remove from the collection
109-
for (var i = keyedNonParallelModules.Count - 1; i >= 0; i--)
115+
await keyedNonParallelModules
116+
.OrderBy(x => x.GetType().GetCustomAttribute<NotInParallelAttribute>()!.ConstraintKeys.Length)
117+
.ForEachAsync(async module =>
110118
{
111-
var module = keyedNonParallelModules[i];
112-
113-
var notInParallelKeys =
114-
module.GetType().GetCustomAttribute<NotInParallelAttribute>()!.ConstraintKeys;
115-
116-
lock (currentlyExecutingByKeysLock)
119+
var keys = module.GetType()
120+
.GetCustomAttribute<NotInParallelAttribute>()!
121+
.ConstraintKeys
122+
.OrderBy(x => x)
123+
.ToArray();
124+
125+
_logger.LogDebug("Grabbing not in parallel locks for keys {Keys}", string.Join(", ", keys));
126+
127+
var locks = keys.Select(GetLockForKey).ToArray();
128+
129+
while (!WaitHandle.WaitAll(locks, TimeSpan.FromMilliseconds(100), false))
117130
{
118-
if (currentlyExecutingByKeys.Any(x => x.Keys.Intersect(notInParallelKeys).Any()))
119-
{
120-
// There are currently executing tasks with that same
121-
continue;
122-
}
131+
await Task.Delay(TimeSpan.FromMilliseconds(500));
123132
}
124133

125-
// Remove from collection as we're now processing it
126-
keyedNonParallelModules.RemoveAt(i);
127-
128-
var executionTask = ExecuteAsync(module);
129-
130-
var tuple = (notInParallelKeys, executionTask);
131-
132-
lock (currentlyExecutingByKeysLock)
134+
try
133135
{
134-
currentlyExecutingByKeys.Add(tuple);
136+
await StartModule(module);
135137
}
136-
137-
_ = executionTask.ContinueWith(_ =>
138+
finally
138139
{
139-
lock (currentlyExecutingByKeysLock)
140+
foreach (var semaphore in locks)
140141
{
141-
return currentlyExecutingByKeys.Remove(tuple);
142+
semaphore.Release();
142143
}
143-
});
144-
145-
executing.Add(executionTask);
146-
}
147-
}
148-
149-
return await Task.WhenAll(executing);
144+
}
145+
})
146+
.ProcessInParallel();
150147
}
151148

152-
private async Task<ModuleBase> StartModule(ModuleBase module)
149+
private Task<ModuleBase> StartModule(ModuleBase module)
153150
{
154-
return await _moduleExecutionTasks.GetOrAdd(module, async @base =>
151+
lock (_moduleDictionaryLock)
155152
{
156-
var dependencies = module.GetModuleDependencies();
157-
158-
foreach (var dependency in dependencies)
153+
return _moduleExecutionTasks.GetOrAdd(module, async @base =>
159154
{
160-
await StartDependency(module, dependency.DependencyType, dependency.IgnoreIfNotRegistered);
161-
}
155+
_logger.LogDebug("Starting Module {Module}", module.GetType().Name);
162156

163-
try
164-
{
165-
await _pipelineSetupExecutor.OnBeforeModuleStartAsync(module);
157+
var dependencies = module.GetModuleDependencies();
166158

167-
await module.StartInternal();
159+
foreach (var dependency in dependencies)
160+
{
161+
await StartDependency(module, dependency.DependencyType, dependency.IgnoreIfNotRegistered);
162+
}
168163

169-
await _moduleEstimatedTimeProvider.SaveModuleTimeAsync(module.GetType(), module.Duration);
164+
try
165+
{
166+
await _pipelineSetupExecutor.OnBeforeModuleStartAsync(module);
170167

171-
await _pipelineSetupExecutor.OnAfterModuleEndAsync(module);
168+
await module.StartInternal();
172169

173-
return module;
174-
}
175-
finally
176-
{
177-
if (!_pipelineOptions.Value.ShowProgressInConsole)
170+
await _moduleEstimatedTimeProvider.SaveModuleTimeAsync(module.GetType(), module.Duration);
171+
172+
await _pipelineSetupExecutor.OnAfterModuleEndAsync(module);
173+
174+
return module;
175+
}
176+
finally
178177
{
179-
await _moduleDisposer.DisposeAsync(module);
178+
if (!_pipelineOptions.Value.ShowProgressInConsole)
179+
{
180+
await _moduleDisposer.DisposeAsync(module);
181+
}
180182
}
181-
}
182-
});
183+
});
184+
}
183185
}
184186

185187
private async Task StartDependency(ModuleBase requestingModule, Type dependencyType, bool ignoreIfNotRegistered)
186188
{
189+
_logger.LogDebug("Starting Dependency {Dependency} for Module {Module}", dependencyType.Name, requestingModule.GetType().Name);
190+
187191
var module = _allModules.FirstOrDefault(x => x.GetType() == dependencyType);
188192

189193
if (module is null && ignoreIfNotRegistered)

0 commit comments

Comments
 (0)