Skip to content

Commit cace1d4

Browse files
committed
Revert "add multi concurrency support"
This reverts commit 2f9b5e0.
1 parent 2f9b5e0 commit cace1d4

File tree

4 files changed

+65
-25
lines changed

4 files changed

+65
-25
lines changed

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ replace (
3333

3434
require (
3535
github.com/Masterminds/sprig/v3 v3.2.3
36-
github.com/alitto/pond v1.8.3
3736
github.com/argoproj/gitops-engine v0.7.1-0.20230906152414-b0fffe419a0f
3837
github.com/go-logr/logr v1.2.4
3938
github.com/orcaman/concurrent-map/v2 v2.0.1

go.sum

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
8383
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
8484
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
8585
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
86-
github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs=
87-
github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
8886
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ=
8987
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8=
9088
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
@@ -543,6 +541,12 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
543541
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
544542
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
545543
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
544+
github.com/pluralsh/console-client-go v0.0.5 h1:+L7I3QLMWNBiuZlWe/YJfUMMZGnpKhEAlbs9sL+hiSc=
545+
github.com/pluralsh/console-client-go v0.0.5/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
546+
github.com/pluralsh/console-client-go v0.0.8 h1:BwWOt1ggBX/fxzY2+01dk8sBTz1jqT57o2y1Iz9Zxzk=
547+
github.com/pluralsh/console-client-go v0.0.8/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
548+
github.com/pluralsh/console-client-go v0.0.11 h1:2fchZE6qlSQmHTeuH54hAzJJpgKpx2Kbl8HhJNugbns=
549+
github.com/pluralsh/console-client-go v0.0.11/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
546550
github.com/pluralsh/console-client-go v0.0.14 h1:vpvC6SR7A0MIrpeyR78hM6IreOLKgg+moRIEjyUnKZo=
547551
github.com/pluralsh/console-client-go v0.0.14/go.mod h1:kZjk0pXAWnvyj+miXveCho4kKQaX1Tm3CGAM+iwurWU=
548552
github.com/pluralsh/polly v0.1.4 h1:Kz90peCgvsfF3ERt8cujr5TR9z4wUlqQE60Eg09ZItY=

pkg/agent/agent.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import (
44
"fmt"
55
"time"
66

7-
"github.com/alitto/pond"
87
"github.com/argoproj/gitops-engine/pkg/cache"
98
"github.com/argoproj/gitops-engine/pkg/engine"
109
"github.com/pluralsh/deployment-operator/pkg/client"
1110
"github.com/pluralsh/deployment-operator/pkg/manifests"
1211
deploysync "github.com/pluralsh/deployment-operator/pkg/sync"
1312
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
13+
"k8s.io/apimachinery/pkg/util/wait"
1414
"k8s.io/client-go/discovery"
1515
"k8s.io/client-go/tools/clientcmd"
1616
"k8s.io/client-go/util/workqueue"
@@ -26,6 +26,7 @@ type Agent struct {
2626
discoveryClient *discovery.DiscoveryClient
2727
engine *deploysync.Engine
2828
deathChan chan interface{}
29+
svcQueue workqueue.RateLimitingInterface
2930
cleanup engine.StopFunc
3031
refresh time.Duration
3132
}
@@ -73,44 +74,46 @@ func New(clientConfig clientcmd.ClientConfig, refresh time.Duration, consoleUrl,
7374
consoleClient: consoleClient,
7475
engine: engine,
7576
deathChan: deathChan,
77+
svcQueue: svcQueue,
7678
cleanup: cleanup,
7779
refresh: refresh,
7880
}, nil
7981
}
8082

8183
func (agent *Agent) Run() {
8284
defer agent.cleanup()
85+
defer agent.svcQueue.ShutDown()
8386
defer agent.engine.WipeCache()
84-
panicHandler := func(p interface{}) {
85-
fmt.Printf("Task panicked: %v", p)
86-
}
87+
go func() {
88+
for {
89+
go agent.engine.ControlLoop()
90+
failure := <-agent.deathChan
91+
fmt.Printf("recovered from panic %v\n", failure)
92+
}
93+
}()
8794

88-
for {
95+
wait.PollInfinite(agent.refresh, func() (done bool, err error) {
8996
log.Info("fetching services for cluster")
9097
svcs, err := agent.consoleClient.GetServices()
9198
if err != nil {
92-
log.Error(err, "failed to fetch service list from deployments service %v", err)
93-
time.Sleep(agent.refresh)
94-
continue
99+
log.Error(err, "failed to fetch service list from deployments service")
100+
return false, nil
95101
}
96-
pool := pond.New(20, 100, pond.MinWorkers(20), pond.PanicHandler(panicHandler))
102+
97103
for _, svc := range svcs {
98-
log.Info("sending update for", "service", svc.ID, "namespace", svc.Namespace, "name", svc.Name)
99-
pool.TrySubmit(func() {
100-
if err := agent.engine.ProcessItem(svc.ID); err != nil {
101-
log.Error(err, "found unprocessable error")
102-
}
103-
})
104+
log.Info("sending update for", "service", svc.ID)
105+
agent.svcQueue.Add(svc.ID)
104106
}
105-
pool.StopAndWait()
107+
106108
info, err := agent.discoveryClient.ServerVersion()
107109
if err != nil {
108110
log.Error(err, "failed to fetch cluster version")
111+
return false, nil
109112
}
110113
v := fmt.Sprintf("%s.%s", info.Major, info.Minor)
111114
if err := agent.consoleClient.Ping(v); err != nil {
112115
log.Error(err, "failed to ping cluster after scheduling syncs")
113116
}
114-
time.Sleep(agent.refresh)
115-
}
117+
return false, nil
118+
})
116119
}

pkg/sync/loop.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,56 @@ package sync
33
import (
44
"context"
55
"fmt"
6+
"runtime/debug"
67

78
"github.com/argoproj/gitops-engine/pkg/sync"
89
"github.com/argoproj/gitops-engine/pkg/sync/common"
910
"github.com/samber/lo"
1011
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1112
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
13+
"k8s.io/apimachinery/pkg/util/wait"
1214
)
1315

14-
func (engine *Engine) ProcessItem(item interface{}) error {
16+
func (engine *Engine) ControlLoop() {
17+
if engine.deathChan != nil {
18+
defer func() {
19+
if r := recover(); r != nil {
20+
engine.deathChan <- r
21+
fmt.Printf("panic: %s\n", string(debug.Stack()))
22+
}
23+
}()
24+
}
25+
26+
engine.RegisterHandlers()
27+
28+
wait.PollInfinite(syncDelay, func() (done bool, err error) {
29+
log.Info("Polling for new service updates")
30+
31+
item, shutdown := engine.svcQueue.Get()
32+
if shutdown {
33+
return true, nil
34+
}
35+
36+
if err := engine.processItem(item); err != nil {
37+
log.Error(err, "found unprocessable error")
38+
}
39+
40+
engine.syncing = ""
41+
42+
return false, nil
43+
})
44+
}
45+
46+
func (engine *Engine) processItem(item interface{}) error {
47+
defer engine.svcQueue.Done(item)
1548
id := item.(string)
1649

1750
if id == "" {
1851
return nil
1952
}
2053

2154
log.Info("attempting to sync service", "id", id)
55+
engine.syncing = id
2256
svc, err := engine.svcCache.Get(id)
2357
if err != nil {
2458
fmt.Printf("failed to fetch service from cache: %s, ignoring for now", err)
@@ -40,7 +74,7 @@ func (engine *Engine) ProcessItem(item interface{}) error {
4074

4175
if manErr != nil {
4276
if err := engine.updateStatus(svc.ID, results, errorAttributes("manifests", manErr)); err != nil {
43-
log.Error(err, "Failed to update service status, ignoring for now", "namespace", svc.Namespace, "name", svc.Name)
77+
log.Error(err, "Failed to update service status, ignoring for now")
4478
}
4579
log.Error(manErr, "failed to parse manifests")
4680
return manErr
@@ -53,7 +87,7 @@ func (engine *Engine) ProcessItem(item interface{}) error {
5387
diff, err := engine.diff(manifests, svc.Namespace, svc.ID)
5488
checkModifications := sync.WithResourceModificationChecker(true, diff)
5589
if err != nil {
56-
log.Error(err, "could not build diff list for service, ignoring for now", "namespace", svc.Namespace, "name", svc.Name)
90+
log.Error(err, "could not build diff list, ignoring for now")
5791
checkModifications = sync.WithResourceModificationChecker(false, nil)
5892
}
5993

@@ -79,7 +113,7 @@ func (engine *Engine) ProcessItem(item interface{}) error {
79113
}
80114

81115
if err := engine.updateStatus(svc.ID, results, errorAttributes("sync", err)); err != nil {
82-
log.Error(err, "Failed to update service status, ignoring for now", "namespace", svc.Namespace, "name", svc.Name)
116+
log.Error(err, "Failed to update service status, ignoring for now")
83117
}
84118

85119
return nil

0 commit comments

Comments
 (0)