Skip to content

Commit 2f9b5e0

Browse files
committed
add multi concurrency support
1 parent 6d937d7 commit 2f9b5e0

File tree

4 files changed

+25
-65
lines changed

4 files changed

+25
-65
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ replace (
3333

3434
require (
3535
github.com/Masterminds/sprig/v3 v3.2.3
36+
github.com/alitto/pond v1.8.3
3637
github.com/argoproj/gitops-engine v0.7.1-0.20230906152414-b0fffe419a0f
3738
github.com/go-logr/logr v1.2.4
3839
github.com/orcaman/concurrent-map/v2 v2.0.1

go.sum

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
8383
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
8484
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
8585
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
86+
github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs=
87+
github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
8688
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
8789
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
8890
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
@@ -541,12 +543,6 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
541543
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
542544
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
543545
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
544-
github.com/pluralsh/console-client-go v0.0.5 h1:+L7I3QLMWNBiuZlWe/YJfUMMZGnpKhEAlbs9sL+hiSc=
545-
github.com/pluralsh/console-client-go v0.0.5/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
546-
github.com/pluralsh/console-client-go v0.0.8 h1:BwWOt1ggBX/fxzY2+01dk8sBTz1jqT57o2y1Iz9Zxzk=
547-
github.com/pluralsh/console-client-go v0.0.8/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
548-
github.com/pluralsh/console-client-go v0.0.11 h1:2fchZE6qlSQmHTeuH54hAzJJpgKpx2Kbl8HhJNugbns=
549-
github.com/pluralsh/console-client-go v0.0.11/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
550546
github.com/pluralsh/console-client-go v0.0.14 h1:vpvC6SR7A0MIrpeyR78hM6IreOLKgg+moRIEjyUnKZo=
551547
github.com/pluralsh/console-client-go v0.0.14/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
552548
github.com/pluralsh/polly v0.1.4 h1:Kz90peCgvsfF3ERt8cujr5TR9z4wUlqQE60Eg09ZItY=

pkg/agent/agent.go

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import (
44
"fmt"
55
"time"
66

7+
"github.com/alitto/pond"
78
"github.com/argoproj/gitops-engine/pkg/cache"
89
"github.com/argoproj/gitops-engine/pkg/engine"
910
"github.com/pluralsh/deployment-operator/pkg/client"
1011
"github.com/pluralsh/deployment-operator/pkg/manifests"
1112
deploysync "github.com/pluralsh/deployment-operator/pkg/sync"
1213
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
13-
"k8s.io/apimachinery/pkg/util/wait"
1414
"k8s.io/client-go/discovery"
1515
"k8s.io/client-go/tools/clientcmd"
1616
"k8s.io/client-go/util/workqueue"
@@ -26,7 +26,6 @@ type Agent struct {
2626
discoveryClient *discovery.DiscoveryClient
2727
engine *deploysync.Engine
2828
deathChan chan interface{}
29-
svcQueue workqueue.RateLimitingInterface
3029
cleanup engine.StopFunc
3130
refresh time.Duration
3231
}
@@ -74,46 +73,44 @@ func New(clientConfig clientcmd.ClientConfig, refresh time.Duration, consoleUrl,
7473
consoleClient: consoleClient,
7574
engine: engine,
7675
deathChan: deathChan,
77-
svcQueue: svcQueue,
7876
cleanup: cleanup,
7977
refresh: refresh,
8078
}, nil
8179
}
8280

8381
func (agent *Agent) Run() {
8482
defer agent.cleanup()
85-
defer agent.svcQueue.ShutDown()
8683
defer agent.engine.WipeCache()
87-
go func() {
88-
for {
89-
go agent.engine.ControlLoop()
90-
failure := <-agent.deathChan
91-
fmt.Printf("recovered from panic %v\n", failure)
92-
}
93-
}()
84+
panicHandler := func(p interface{}) {
85+
fmt.Printf("Task panicked: %v", p)
86+
}
9487

95-
wait.PollInfinite(agent.refresh, func() (done bool, err error) {
88+
for {
9689
log.Info("fetching services for cluster")
9790
svcs, err := agent.consoleClient.GetServices()
9891
if err != nil {
99-
log.Error(err, "failed to fetch service list from deployments service")
100-
return false, nil
92+
log.Error(err, "failed to fetch service list from deployments service %v", err)
93+
time.Sleep(agent.refresh)
94+
continue
10195
}
102-
96+
pool := pond.New(20, 100, pond.MinWorkers(20), pond.PanicHandler(panicHandler))
10397
for _, svc := range svcs {
104-
log.Info("sending update for", "service", svc.ID)
105-
agent.svcQueue.Add(svc.ID)
98+
log.Info("sending update for", "service", svc.ID, "namespace", svc.Namespace, "name", svc.Name)
99+
pool.TrySubmit(func() {
100+
if err := agent.engine.ProcessItem(svc.ID); err != nil {
101+
log.Error(err, "found unprocessable error")
102+
}
103+
})
106104
}
107-
105+
pool.StopAndWait()
108106
info, err := agent.discoveryClient.ServerVersion()
109107
if err != nil {
110108
log.Error(err, "failed to fetch cluster version")
111-
return false, nil
112109
}
113110
v := fmt.Sprintf("%s.%s", info.Major, info.Minor)
114111
if err := agent.consoleClient.Ping(v); err != nil {
115112
log.Error(err, "failed to ping cluster after scheduling syncs")
116113
}
117-
return false, nil
118-
})
114+
time.Sleep(agent.refresh)
115+
}
119116
}

pkg/sync/loop.go

Lines changed: 4 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,56 +3,22 @@ package sync
33
import (
44
"context"
55
"fmt"
6-
"runtime/debug"
76

87
"github.com/argoproj/gitops-engine/pkg/sync"
98
"github.com/argoproj/gitops-engine/pkg/sync/common"
109
"github.com/samber/lo"
1110
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1211
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
13-
"k8s.io/apimachinery/pkg/util/wait"
1412
)
1513

16-
func (engine *Engine) ControlLoop() {
17-
if engine.deathChan != nil {
18-
defer func() {
19-
if r := recover(); r != nil {
20-
engine.deathChan <- r
21-
fmt.Printf("panic: %s\n", string(debug.Stack()))
22-
}
23-
}()
24-
}
25-
26-
engine.RegisterHandlers()
27-
28-
wait.PollInfinite(syncDelay, func() (done bool, err error) {
29-
log.Info("Polling for new service updates")
30-
31-
item, shutdown := engine.svcQueue.Get()
32-
if shutdown {
33-
return true, nil
34-
}
35-
36-
if err := engine.processItem(item); err != nil {
37-
log.Error(err, "found unprocessable error")
38-
}
39-
40-
engine.syncing = ""
41-
42-
return false, nil
43-
})
44-
}
45-
46-
func (engine *Engine) processItem(item interface{}) error {
47-
defer engine.svcQueue.Done(item)
14+
func (engine *Engine) ProcessItem(item interface{}) error {
4815
id := item.(string)
4916

5017
if id == "" {
5118
return nil
5219
}
5320

5421
log.Info("attempting to sync service", "id", id)
55-
engine.syncing = id
5622
svc, err := engine.svcCache.Get(id)
5723
if err != nil {
5824
fmt.Printf("failed to fetch service from cache: %s, ignoring for now", err)
@@ -74,7 +40,7 @@ func (engine *Engine) processItem(item interface{}) error {
7440

7541
if manErr != nil {
7642
if err := engine.updateStatus(svc.ID, results, errorAttributes("manifests", manErr)); err != nil {
77-
log.Error(err, "Failed to update service status, ignoring for now")
43+
log.Error(err, "Failed to update service status, ignoring for now", "namespace", svc.Namespace, "name", svc.Name)
7844
}
7945
log.Error(manErr, "failed to parse manifests")
8046
return manErr
@@ -87,7 +53,7 @@ func (engine *Engine) processItem(item interface{}) error {
8753
diff, err := engine.diff(manifests, svc.Namespace, svc.ID)
8854
checkModifications := sync.WithResourceModificationChecker(true, diff)
8955
if err != nil {
90-
log.Error(err, "could not build diff list, ignoring for now")
56+
log.Error(err, "could not build diff list for service, ignoring for now", "namespace", svc.Namespace, "name", svc.Name)
9157
checkModifications = sync.WithResourceModificationChecker(false, nil)
9258
}
9359

@@ -113,7 +79,7 @@ func (engine *Engine) processItem(item interface{}) error {
11379
}
11480

11581
if err := engine.updateStatus(svc.ID, results, errorAttributes("sync", err)); err != nil {
116-
log.Error(err, "Failed to update service status, ignoring for now")
82+
log.Error(err, "Failed to update service status, ignoring for now", "namespace", svc.Namespace, "name", svc.Name)
11783
}
11884

11985
return nil

0 commit comments

Comments
 (0)