Skip to content

Commit 77780df

Browse files
authored
feat: implement paginated polling agent-side (#120)
* implement paginated polling agent-side * gen mocks * bump console client * service context templating
1 parent fcd6b68 commit 77780df

File tree

10 files changed

+358
-100
lines changed

10 files changed

+358
-100
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ require (
1818
github.com/orcaman/concurrent-map/v2 v2.0.1
1919
github.com/osteele/liquid v1.3.2
2020
github.com/pkg/errors v0.9.1
21-
github.com/pluralsh/console-client-go v0.0.90
21+
github.com/pluralsh/console-client-go v0.0.92
2222
github.com/pluralsh/controller-reconcile-helper v0.0.4
2323
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34
2424
github.com/pluralsh/polly v0.1.4

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -579,8 +579,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
579579
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
580580
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
581581
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
582-
github.com/pluralsh/console-client-go v0.0.90 h1:dxtuW2C9ynhhs358ckx0XGH4zieaoMr9EGf+pJb4hO4=
583-
github.com/pluralsh/console-client-go v0.0.90/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo=
582+
github.com/pluralsh/console-client-go v0.0.92 h1:PMSF05Zp5gLejeEWXbwe17CfXNLJ55dGlFPAAVucfCM=
583+
github.com/pluralsh/console-client-go v0.0.92/go.mod h1:eyCiLA44YbXiYyJh8303jk5JdPkt9McgCo5kBjk4lKo=
584584
github.com/pluralsh/controller-reconcile-helper v0.0.4 h1:1o+7qYSyoeqKFjx+WgQTxDz4Q2VMpzprJIIKShxqG0E=
585585
github.com/pluralsh/controller-reconcile-helper v0.0.4/go.mod h1:AfY0gtteD6veBjmB6jiRx/aR4yevEf6K0M13/pGan/s=
586586
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw=

pkg/client/console.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,12 @@ type Client interface {
6767
UpdateClusterRestore(id string, attrs console.RestoreAttributes) (*console.ClusterRestoreFragment, error)
6868
SaveClusterBackup(attrs console.BackupAttributes) (*console.ClusterBackupFragment, error)
6969
GetClusterBackup(clusterID, namespace, name string) (*console.ClusterBackupFragment, error)
70-
GetServices() ([]*console.ServiceDeploymentBaseFragment, error)
70+
GetServices(after *string, first *int64) (*console.PagedClusterServices, error)
7171
GetService(id string) (*console.GetServiceDeploymentForAgent_ServiceDeployment, error)
7272
UpdateComponents(id string, components []*console.ComponentAttributes, errs []*console.ServiceErrorAttributes) error
7373
AddServiceErrors(id string, errs []*console.ServiceErrorAttributes) error
7474
ParsePipelineGateCR(pgFragment *console.PipelineGateFragment) (*v1alpha1.PipelineGate, error)
7575
GetClusterGate(id string) (*console.PipelineGateFragment, error)
76-
GetClusterGates() ([]*console.PipelineGateFragment, error)
76+
GetClusterGates(after *string, first *int64) (*console.PagedClusterGates, error)
7777
UpdateGate(id string, attributes console.GateUpdateAttributes) error
7878
}

pkg/client/pipelines.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@ func (c *client) UpdateGate(id string, attributes console.GateUpdateAttributes)
1717
return err
1818
}
1919

20-
func (c *client) GetClusterGates() ([]*console.PipelineGateFragment, error) {
21-
resp, err := c.consoleClient.GetClusterGates(c.ctx)
20+
func (c *client) GetClusterGates(after *string, first *int64) (*console.PagedClusterGates, error) {
21+
resp, err := c.consoleClient.PagedClusterGates(c.ctx, after, first, nil, nil)
2222
if err != nil {
2323
return nil, err
2424
}
25-
26-
return resp.ClusterGates, nil
25+
if resp.PagedClusterGates == nil {
26+
return nil, fmt.Errorf("the response from PagedClusterGates is nil")
27+
}
28+
return resp, nil
2729
}
2830

2931
func (c *client) GetClusterGate(id string) (*console.PipelineGateFragment, error) {

pkg/client/service.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
package client
22

33
import (
4+
"fmt"
45
console "github.com/pluralsh/console-client-go"
56
)
67

7-
func (c *client) GetServices() ([]*console.ServiceDeploymentBaseFragment, error) {
8-
resp, err := c.consoleClient.ListClusterServices(c.ctx)
8+
func (c *client) GetServices(after *string, first *int64) (*console.PagedClusterServices, error) {
9+
10+
resp, err := c.consoleClient.PagedClusterServices(c.ctx, after, first, nil, nil)
911
if err != nil {
1012
return nil, err
1113
}
12-
13-
return resp.ClusterServices, nil
14+
if resp.GetPagedClusterServices() == nil {
15+
return nil, fmt.Errorf("the response from PagedClusterServices is nil")
16+
}
17+
return resp, nil
1418
}
1519

1620
func (c *client) GetService(id string) (*console.GetServiceDeploymentForAgent_ServiceDeployment, error) {

pkg/controller/pipelinegates/reconciler.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,17 +94,27 @@ func (s *GateReconciler) ShutdownQueue() {
9494

9595
func (s *GateReconciler) Poll(ctx context.Context) (done bool, err error) {
9696
logger := log.FromContext(ctx)
97-
9897
logger.Info("fetching gates for cluster")
99-
gates, err := s.ConsoleClient.GetClusterGates()
100-
if err != nil {
101-
logger.Error(err, "failed to fetch gates list")
102-
return false, nil
103-
}
10498

105-
for _, gate := range gates {
106-
logger.Info("sending update for", "gate", gate.ID)
107-
s.GateQueue.Add(gate.ID)
99+
var after *string
100+
var pageSize int64
101+
pageSize = 100
102+
hasNextPage := true
103+
104+
for hasNextPage {
105+
resp, err := s.ConsoleClient.GetClusterGates(after, &pageSize)
106+
if err != nil {
107+
logger.Error(err, "failed to fetch gates list")
108+
return false, nil
109+
}
110+
111+
hasNextPage = resp.PagedClusterGates.PageInfo.HasNextPage
112+
after = resp.PagedClusterGates.PageInfo.EndCursor
113+
114+
for _, gate := range resp.PagedClusterGates.Edges {
115+
logger.Info("sending update for", "gate", gate.Node.ID)
116+
s.GateQueue.Add(gate.Node.ID)
117+
}
108118
}
109119

110120
if err := s.pinger.Ping(); err != nil {

pkg/controller/service/reconciler.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,17 +177,25 @@ func (s *ServiceReconciler) ShutdownQueue() {
177177

178178
func (s *ServiceReconciler) Poll(ctx context.Context) (done bool, err error) {
179179
logger := log.FromContext(ctx)
180-
181180
logger.Info("fetching services for cluster")
182-
svcs, err := s.ConsoleClient.GetServices()
183-
if err != nil {
184-
logger.Error(err, "failed to fetch service list from deployments service")
185-
return false, nil
186-
}
181+
var after *string
182+
var pageSize int64
183+
pageSize = 100
184+
hasNextPage := true
187185

188-
for _, svc := range svcs {
189-
logger.Info("sending update for", "service", svc.ID)
190-
s.SvcQueue.Add(svc.ID)
186+
for hasNextPage {
187+
resp, err := s.ConsoleClient.GetServices(after, &pageSize)
188+
if err != nil {
189+
logger.Error(err, "failed to fetch service list from deployments service")
190+
return false, nil
191+
}
192+
193+
hasNextPage = resp.PagedClusterServices.PageInfo.HasNextPage
194+
after = resp.PagedClusterServices.PageInfo.EndCursor
195+
for _, svc := range resp.PagedClusterServices.Edges {
196+
logger.Info("sending update for", "service", svc.Node.ID)
197+
s.SvcQueue.Add(svc.Node.ID)
198+
}
191199
}
192200

193201
if err := s.pinger.Ping(); err != nil {

pkg/manifests/template/raw.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func renderLiquid(input []byte, svc *console.GetServiceDeploymentForAgent_Servic
4848
bindings := map[string]interface{}{
4949
"configuration": configMap(svc),
5050
"cluster": svc.Cluster,
51-
"contexts": map[string]map[string]interface{}{},
51+
"contexts": contexts(svc),
5252
}
5353
return liquidEngine.ParseAndRender(input, bindings)
5454
}

pkg/manifests/template/utils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,11 @@ func configMap(svc *console.GetServiceDeploymentForAgent_ServiceDeployment) map[
1212

1313
return res
1414
}
15+
16+
func contexts(svc *console.GetServiceDeploymentForAgent_ServiceDeployment) map[string]map[string]interface{} {
17+
res := map[string]map[string]interface{}{}
18+
for _, context := range svc.Contexts {
19+
res[context.Name] = context.Configuration
20+
}
21+
return res
22+
}

0 commit comments

Comments
 (0)