Skip to content

Commit c971d72

Browse files
authored
Update the job hosting to run the task count per queue type (#3997)
* Updating the job hosting to run the max running task count per queue type. * Adding a new parameter for the running job count to JobHosting.ExecuteAsync. * Adding a null check on _hostingConfiguration.
1 parent 2214c77 commit c971d72

File tree

4 files changed

+20
-22
lines changed

4 files changed

+20
-22
lines changed

src/Microsoft.Health.Fhir.Api/Features/BackgroundJobService/HostingBackgroundService.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
5959
if (_hostingConfiguration != null)
6060
{
6161
jobHostingValue.PollingFrequencyInSeconds = _hostingConfiguration.PollingFrequencyInSeconds ?? jobHostingValue.PollingFrequencyInSeconds;
62-
jobHostingValue.MaxRunningJobCount = _hostingConfiguration.MaxRunningTaskCount ?? jobHostingValue.MaxRunningJobCount;
6362
jobHostingValue.JobHeartbeatIntervalInSeconds = _hostingConfiguration.TaskHeartbeatIntervalInSeconds ?? jobHostingValue.JobHeartbeatIntervalInSeconds;
6463
jobHostingValue.JobHeartbeatTimeoutThresholdInSeconds = _hostingConfiguration.TaskHeartbeatTimeoutThresholdInSeconds ?? jobHostingValue.JobHeartbeatTimeoutThresholdInSeconds;
6564
}
@@ -74,7 +73,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
7473

7574
foreach (var operation in _operationsConfiguration.HostingBackgroundServiceQueues)
7675
{
77-
jobQueues.Add(jobHostingValue.ExecuteAsync((byte)operation.Queue, Environment.MachineName, cancellationTokenSource));
76+
short runningJobCount = operation.MaxRunningTaskCount ?? _hostingConfiguration?.MaxRunningTaskCount ?? Constants.DefaultMaxRunningJobCount;
77+
jobQueues.Add(jobHostingValue.ExecuteAsync((byte)operation.Queue, runningJobCount, Environment.MachineName, cancellationTokenSource));
7878
}
7979

8080
await Task.WhenAll(jobQueues);

src/Microsoft.Health.Fhir.Core/Configs/HostingBackgroundServiceQueueItem.cs

+8
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,16 @@ namespace Microsoft.Health.Fhir.Core.Configs;
99

1010
public class HostingBackgroundServiceQueueItem
1111
{
12+
/// <summary>
13+
/// Gets or sets the queue type.
14+
/// </summary>
1215
public QueueType Queue { get; set; }
1316

17+
/// <summary>
18+
/// Gets or sets the max running task count at the same time for this queue type.
19+
/// </summary>
20+
public short? MaxRunningTaskCount { get; set; }
21+
1422
// TODO: This is not honored. Make sure that it is not used in PaaS and remove.
1523
public bool UpdateProgressOnHeartbeat { get; set; }
1624
}

src/Microsoft.Health.TaskManagement.UnitTests/JobHostingTests.cs

+8-16
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,11 @@ public async Task GivenValidJobs_WhenJobHostingStart_ThenJobsShouldBeExecute()
5656

5757
JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
5858
jobHosting.PollingFrequencyInSeconds = 0;
59-
jobHosting.MaxRunningJobCount = 5;
6059

6160
CancellationTokenSource tokenSource = new CancellationTokenSource();
6261

6362
tokenSource.CancelAfter(TimeSpan.FromSeconds(2));
64-
await jobHosting.ExecuteAsync(0, "test", tokenSource);
63+
await jobHosting.ExecuteAsync(0, 5, "test", tokenSource);
6564

6665
Assert.Equal(jobCount, executedJobCount);
6766
foreach (JobInfo job in jobs)
@@ -121,12 +120,11 @@ public async Task GivenJobWithCriticalException_WhenJobHostingStart_ThenJobShoul
121120

122121
JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
123122
jobHosting.PollingFrequencyInSeconds = 0;
124-
jobHosting.MaxRunningJobCount = 1;
125123

126124
CancellationTokenSource tokenSource = new CancellationTokenSource();
127125

128126
tokenSource.CancelAfter(TimeSpan.FromSeconds(1));
129-
await jobHosting.ExecuteAsync(0, "test", tokenSource);
127+
await jobHosting.ExecuteAsync(0, 1, "test", tokenSource);
130128

131129
Assert.Equal(2, executeCount);
132130

@@ -163,13 +161,12 @@ public async Task GivenAnCrashJob_WhenJobHostingStart_ThenJobShouldBeRePickup()
163161

164162
JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
165163
jobHosting.PollingFrequencyInSeconds = 0;
166-
jobHosting.MaxRunningJobCount = 1;
167164
jobHosting.JobHeartbeatTimeoutThresholdInSeconds = 1;
168165

169166
CancellationTokenSource tokenSource = new CancellationTokenSource();
170167

171168
tokenSource.CancelAfter(TimeSpan.FromSeconds(2));
172-
await jobHosting.ExecuteAsync(0, "test", tokenSource);
169+
await jobHosting.ExecuteAsync(0, 1, "test", tokenSource);
173170

174171
Assert.Equal(JobStatus.Completed, job1.Status);
175172
Assert.Equal(1, executeCount0);
@@ -198,12 +195,11 @@ public async Task GivenAnLongRunningJob_WhenJobHostingStop_ThenJobShouldBeComple
198195
JobInfo job1 = (await queueClient.EnqueueAsync(0, new string[] { "job1" }, null, false, false, CancellationToken.None)).First();
199196
JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
200197
jobHosting.PollingFrequencyInSeconds = 0;
201-
jobHosting.MaxRunningJobCount = 1;
202198
jobHosting.JobHeartbeatTimeoutThresholdInSeconds = 1;
203199

204200
CancellationTokenSource tokenSource = new CancellationTokenSource();
205201

206-
Task hostingTask = jobHosting.ExecuteAsync(0, "test", tokenSource);
202+
Task hostingTask = jobHosting.ExecuteAsync(0, 1, "test", tokenSource);
207203
autoResetEvent.WaitOne();
208204
tokenSource.Cancel();
209205

@@ -238,13 +234,12 @@ public async Task GivenJobWithInvalidOperationException_WhenJobHostingStart_Then
238234

239235
JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
240236
jobHosting.PollingFrequencyInSeconds = 0;
241-
jobHosting.MaxRunningJobCount = 1;
242237
jobHosting.JobHeartbeatTimeoutThresholdInSeconds = 1;
243238

244239
CancellationTokenSource tokenSource = new CancellationTokenSource();
245240

246241
tokenSource.CancelAfter(TimeSpan.FromSeconds(2));
247-
await jobHosting.ExecuteAsync(0, "test", tokenSource);
242+
await jobHosting.ExecuteAsync(0, 1, "test", tokenSource);
248243

249244
Assert.Equal(JobStatus.Failed, job1.Status);
250245
Assert.Equal(1, executeCount0);
@@ -276,12 +271,11 @@ public async Task GivenJobWithCanceledException_WhenJobHostingStart_ThenJobShoul
276271

277272
JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
278273
jobHosting.PollingFrequencyInSeconds = 0;
279-
jobHosting.MaxRunningJobCount = 1;
280274
jobHosting.JobHeartbeatTimeoutThresholdInSeconds = 15;
281275

282276
CancellationTokenSource tokenSource = new CancellationTokenSource();
283277
tokenSource.CancelAfter(TimeSpan.FromSeconds(2));
284-
await jobHosting.ExecuteAsync(0, "test", tokenSource);
278+
await jobHosting.ExecuteAsync(0, 1, "test", tokenSource);
285279

286280
Assert.Equal(JobStatus.Cancelled, job1.Status);
287281
Assert.Equal(1, executeCount0);
@@ -313,11 +307,10 @@ public async Task GivenJobRunning_WhenCancel_ThenJobShouldBeCancelled()
313307
JobHosting jobHosting = new JobHosting(queueClient, factory, _logger);
314308
jobHosting.PollingFrequencyInSeconds = 0;
315309
jobHosting.JobHeartbeatIntervalInSeconds = 1;
316-
jobHosting.MaxRunningJobCount = 1;
317310

318311
CancellationTokenSource tokenSource = new CancellationTokenSource();
319312
tokenSource.CancelAfter(TimeSpan.FromSeconds(2));
320-
Task hostingTask = jobHosting.ExecuteAsync(0, "test", tokenSource);
313+
Task hostingTask = jobHosting.ExecuteAsync(0, 1, "test", tokenSource);
321314

322315
autoResetEvent.WaitOne();
323316
await queueClient.CancelJobByGroupIdAsync(0, job1.GroupId, CancellationToken.None);
@@ -392,13 +385,12 @@ public async Task GivenRandomFailuresInQueueClient_WhenStartHosting_ThenAllTasks
392385

393386
var jobHosting = new JobHosting(queueClient, factory, _logger);
394387
jobHosting.PollingFrequencyInSeconds = 0;
395-
jobHosting.MaxRunningJobCount = 10;
396388
jobHosting.JobHeartbeatIntervalInSeconds = 0.001;
397389
jobHosting.JobHeartbeatTimeoutThresholdInSeconds = 1;
398390

399391
var tokenSource = new CancellationTokenSource();
400392
tokenSource.CancelAfter(TimeSpan.FromSeconds(60));
401-
var host = Task.Run(async () => await jobHosting.ExecuteAsync(0, "test", tokenSource));
393+
var host = Task.Run(async () => await jobHosting.ExecuteAsync(0, 10, "test", tokenSource));
402394
while (jobs.Where(t => t.Status == JobStatus.Completed).Count() < numberOfJobs && !tokenSource.IsCancellationRequested)
403395
{
404396
await Task.Delay(TimeSpan.FromSeconds(1));

src/Microsoft.Health.TaskManagement/JobHosting.cs

+2-4
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,16 @@ public JobHosting(IQueueClient queueClient, IJobFactory jobFactory, ILogger<JobH
3636

3737
public int PollingFrequencyInSeconds { get; set; } = Constants.DefaultPollingFrequencyInSeconds;
3838

39-
public short MaxRunningJobCount { get; set; } = Constants.DefaultMaxRunningJobCount;
40-
4139
public int JobHeartbeatTimeoutThresholdInSeconds { get; set; } = Constants.DefaultJobHeartbeatTimeoutThresholdInSeconds;
4240

4341
public double JobHeartbeatIntervalInSeconds { get; set; } = Constants.DefaultJobHeartbeatIntervalInSeconds;
4442

45-
public async Task ExecuteAsync(byte queueType, string workerName, CancellationTokenSource cancellationTokenSource)
43+
public async Task ExecuteAsync(byte queueType, short runningJobCount, string workerName, CancellationTokenSource cancellationTokenSource)
4644
{
4745
var workers = new List<Task>();
4846

4947
// parallel dequeue
50-
for (var thread = 0; thread < MaxRunningJobCount; thread++)
48+
for (var thread = 0; thread < runningJobCount; thread++)
5149
{
5250
workers.Add(Task.Run(async () =>
5351
{

0 commit comments

Comments
 (0)