Skip to content

Commit 2f01ca0

Browse files
committed
refactor: optimize cost scanning with worker pool for concurrent processing
1 parent d8bdcb8 commit 2f01ca0

File tree

1 file changed

+54
-12
lines changed

1 file changed

+54
-12
lines changed

internal/pipeline/stage_cost.go

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package pipeline
55

66
import (
7+
"sync"
8+
79
"github.com/Azure/azqr/internal/models"
810
"github.com/Azure/azqr/internal/scanners"
911
)
@@ -37,19 +39,59 @@ func (s *CostStage) Execute(ctx *ScanContext) error {
3739
}
3840
}
3941

40-
// Scan costs for all subscriptions
42+
subCount := len(ctx.Subscriptions)
43+
if subCount == 0 {
44+
ctx.ReportData.Cost = nil
45+
return nil
46+
}
47+
48+
// Worker pool to limit concurrent cost scanner goroutines
49+
const numCostWorkers = 10
50+
workerCount := numCostWorkers
51+
if subCount < workerCount {
52+
workerCount = subCount
53+
}
54+
55+
jobs := make(chan string, subCount)
56+
results := make(chan []*models.CostResult, subCount)
57+
58+
// Start worker pool
59+
var workerWg sync.WaitGroup
60+
for i := 0; i < workerCount; i++ {
61+
workerWg.Add(1)
62+
go func() {
63+
defer workerWg.Done()
64+
for subID := range jobs {
65+
scannerConfig := &models.ScannerConfig{
66+
Ctx: ctx.Ctx,
67+
Cred: ctx.Cred,
68+
ClientOptions: ctx.ClientOptions,
69+
SubscriptionID: subID,
70+
}
71+
result := costScanner.Scan(scannerConfig, previousMonth)
72+
if len(result) > 0 {
73+
results <- result
74+
}
75+
}
76+
}()
77+
}
78+
79+
// Send subscription jobs to workers
80+
for subID := range ctx.Subscriptions {
81+
jobs <- subID
82+
}
83+
close(jobs)
84+
85+
// Wait for workers to finish and close results channel
86+
go func() {
87+
workerWg.Wait()
88+
close(results)
89+
}()
90+
91+
// Collect results from all workers
4192
var allCosts []*models.CostResult
42-
for subid := range ctx.Subscriptions {
43-
scannerConfig := &models.ScannerConfig{
44-
Ctx: ctx.Ctx,
45-
Cred: ctx.Cred,
46-
ClientOptions: ctx.ClientOptions,
47-
SubscriptionID: subid,
48-
}
49-
result := costScanner.Scan(scannerConfig, previousMonth)
50-
if len(result) > 0 {
51-
allCosts = append(allCosts, result...)
52-
}
93+
for result := range results {
94+
allCosts = append(allCosts, result...)
5395
}
5496

5597
// Aggregate all cost items into report data

0 commit comments

Comments
 (0)