Skip to content

Commit 717ea22

Browse files
authoredJun 5, 2023
sotw: Ordered ADS mode (#544)
* initial ADS code path split so we can order responses for target modes * implement functional opts pattern to support xds options * multiplex responses over a single channel rather than selecting over dynamic channels in ordered ads * add cache ordering on ads mode Signed-off-by: Alec Holmes <alecholmez@me.com>
1 parent 5172536 commit 717ea22

26 files changed

+1178
-258
lines changed
 

‎Dockerfile.ci

-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
FROM golang:1.19
22

3-
# TODO(mattklein123): Update this to envoy-dev:latest and fix tests.
43
COPY --from=envoyproxy/envoy-dev:latest /usr/local/bin/envoy /usr/local/bin/envoy

‎go.mod

+4-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ require (
1111
github.com/prometheus/client_model v0.4.0
1212
github.com/stretchr/testify v1.8.3
1313
go.opentelemetry.io/proto/otlp v0.19.0
14-
google.golang.org/genproto v0.0.0-20230526203410-71b5a4ffd15e
1514
google.golang.org/genproto/googleapis/api v0.0.0-20230526203410-71b5a4ffd15e
1615
google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e
1716
google.golang.org/grpc v1.55.0
@@ -20,9 +19,13 @@ require (
2019

2120
require (
2221
github.com/davecgh/go-spew v1.1.1 // indirect
22+
github.com/kr/pretty v0.3.0 // indirect
2323
github.com/pmezard/go-difflib v1.0.0 // indirect
24+
github.com/rogpeppe/go-internal v1.9.0 // indirect
2425
golang.org/x/net v0.10.0 // indirect
2526
golang.org/x/sys v0.8.0 // indirect
2627
golang.org/x/text v0.9.0 // indirect
28+
google.golang.org/genproto v0.0.0-20230526203410-71b5a4ffd15e // indirect
29+
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
2730
gopkg.in/yaml.v3 v3.0.1 // indirect
2831
)

‎go.sum

+476-9
Large diffs are not rendered by default.

‎pkg/cache/types/types.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,15 @@ func (e SkipFetchError) Error() string {
3838
// ResponseType enumeration of supported response types
3939
type ResponseType int
4040

41+
// NOTE: The order of this enum MATTERS!
42+
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#aggregated-discovery-service
43+
// ADS expects things to be returned in a specific order.
44+
// See the following issue for details: https://github.com/envoyproxy/go-control-plane/issues/526
4145
const (
4246
Endpoint ResponseType = iota
4347
Cluster
44-
Route
4548
ScopedRoute
49+
Route
4650
VirtualHost
4751
Listener
4852
Secret

‎pkg/cache/v3/cache.go

+7
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ type DeltaRequest = discovery.DeltaDiscoveryRequest
4444
// ConfigWatcher implementation must be thread-safe.
4545
type ConfigWatcher interface {
4646
// CreateWatch returns a new open watch from a non-empty request.
47+
// This is the entrypoint to propagate configuration changes the
48+
// provided Response channel. State from the gRPC server is utilized
49+
// to make sure consuming cache implementations can see what the server has sent to clients.
50+
//
4751
// An individual consumer normally issues a single open watch by each type URL.
4852
//
4953
// The provided channel produces requested resources as responses, once they are available.
@@ -53,6 +57,9 @@ type ConfigWatcher interface {
5357
CreateWatch(*Request, stream.StreamState, chan Response) (cancel func())
5458

5559
// CreateDeltaWatch returns a new open incremental xDS watch.
60+
// This is the entrypoint to propagate configuration changes the
61+
// provided DeltaResponse channel. State from the gRPC server is utilized
62+
// to make sure consuming cache implementations can see what the server has sent to clients.
5663
//
5764
// The provided channel produces requested resources as responses, or spontaneous updates in accordance
5865
// with the incremental xDS specification.

‎pkg/cache/v3/order.go

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package cache
2+
3+
// Key is an internal sorting data structure we can use to
4+
// order responses by Type and their associated watch IDs.
5+
type key struct {
6+
ID int64
7+
TypeURL string
8+
}
9+
10+
// Keys implements Go's sorting.Sort interface
11+
type keys []key
12+
13+
func (k keys) Len() int {
14+
return len(k)
15+
}
16+
17+
// Less compares the typeURL and determines what order things should be sent.
18+
func (k keys) Less(i, j int) bool {
19+
return GetResponseType(k[i].TypeURL) > GetResponseType(k[j].TypeURL)
20+
}
21+
22+
func (k keys) Swap(i, j int) {
23+
k[i], k[j] = k[j], k[i]
24+
}

‎pkg/cache/v3/order_test.go

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package cache
2+
3+
import (
4+
"sort"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
9+
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
10+
)
11+
12+
func TestOrderKeys(t *testing.T) {
13+
unorderedKeys := keys{
14+
{
15+
ID: 1,
16+
TypeURL: resource.EndpointType,
17+
},
18+
{
19+
ID: 2,
20+
TypeURL: resource.ClusterType,
21+
},
22+
{
23+
ID: 4,
24+
TypeURL: resource.ListenerType,
25+
},
26+
{
27+
ID: 3,
28+
TypeURL: resource.RouteType,
29+
},
30+
{
31+
ID: 5,
32+
TypeURL: resource.ScopedRouteType,
33+
},
34+
}
35+
expected := keys{
36+
{
37+
ID: 4,
38+
TypeURL: resource.ListenerType,
39+
},
40+
{
41+
ID: 3,
42+
TypeURL: resource.RouteType,
43+
},
44+
{
45+
ID: 5,
46+
TypeURL: resource.ScopedRouteType,
47+
},
48+
{
49+
ID: 2,
50+
TypeURL: resource.ClusterType,
51+
},
52+
{
53+
ID: 1,
54+
TypeURL: resource.EndpointType,
55+
},
56+
}
57+
58+
orderedKeys := unorderedKeys
59+
sort.Sort(orderedKeys)
60+
61+
assert.True(t, sort.IsSorted(orderedKeys))
62+
assert.NotEqual(t, unorderedKeys, &orderedKeys)
63+
assert.Equal(t, expected, orderedKeys)
64+
65+
// Ordering:
66+
// === RUN TestOrderKeys
67+
// order_test.go:43: {ID:4 TypeURL:type.googleapis.com/envoy.config.listener.v3.Listener}
68+
// order_test.go:43: {ID:3 TypeURL:type.googleapis.com/envoy.config.route.v3.RouteConfiguration}
69+
// order_test.go:43: {ID:5 TypeURL:type.googleapis.com/envoy.config.route.v3.ScopedRouteConfiguration}
70+
// order_test.go:43: {ID:2 TypeURL:type.googleapis.com/envoy.config.cluster.v3.Cluster}
71+
// order_test.go:43: {ID:1 TypeURL:type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment}
72+
}

‎pkg/cache/v3/simple.go

+28-7
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
220220
}
221221
}
222222

223-
// SetSnapshotCacheContext updates a snapshot for a node.
223+
// SetSnapshotCache updates a snapshot for a node.
224224
func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapshot ResourceSnapshot) error {
225225
cache.mu.Lock()
226226
defer cache.mu.Unlock()
@@ -232,20 +232,41 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
232232
if info, ok := cache.status[node]; ok {
233233
info.mu.Lock()
234234
defer info.mu.Unlock()
235-
for id, watch := range info.watches {
235+
236+
// responder callback for SOTW watches
237+
respond := func(watch ResponseWatch, id int64) error {
236238
version := snapshot.GetVersion(watch.Request.TypeUrl)
237239
if version != watch.Request.VersionInfo {
238240
cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version)
239-
240241
resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
241242
err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
242243
if err != nil {
243244
return err
244245
}
245-
246246
// discard the watch
247247
delete(info.watches, id)
248248
}
249+
return nil
250+
}
251+
252+
// If ADS is enabled we need to order response watches so we guarantee
253+
// sending them in the correct order. Go's default implementation
254+
// of maps are randomized order when ranged over.
255+
if cache.ads {
256+
info.orderResponseWatches()
257+
for _, key := range info.orderedWatches {
258+
err := respond(info.watches[key.ID], key.ID)
259+
if err != nil {
260+
return err
261+
}
262+
}
263+
} else {
264+
for id, watch := range info.watches {
265+
err := respond(watch, id)
266+
if err != nil {
267+
return err
268+
}
269+
}
249270
}
250271

251272
// We only calculate version hashes when using delta. We don't
@@ -258,7 +279,8 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
258279
}
259280
}
260281

261-
// process our delta watches
282+
// this won't run if there are no delta watches
283+
// to process.
262284
for id, watch := range info.deltaWatches {
263285
res, err := cache.respondDelta(
264286
ctx,
@@ -281,7 +303,7 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
281303
return nil
282304
}
283305

284-
// GetSnapshots gets the snapshot for a node, and returns an error if not found.
306+
// GetSnapshot gets the snapshot for a node, and returns an error if not found.
285307
func (cache *snapshotCache) GetSnapshot(node string) (ResourceSnapshot, error) {
286308
cache.mu.RLock()
287309
defer cache.mu.RUnlock()
@@ -341,7 +363,6 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str
341363
info.mu.Unlock()
342364

343365
var version string
344-
345366
snapshot, exists := cache.snapshots[nodeID]
346367
if exists {
347368
version = snapshot.GetVersion(request.TypeUrl)

‎pkg/cache/v3/simple_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ func TestConcurrentSetWatch(t *testing.T) {
345345
i := i
346346
t.Run(fmt.Sprintf("worker%d", i), func(t *testing.T) {
347347
t.Parallel()
348-
id := fmt.Sprintf("%d", i%2)
348+
id := t.Name()
349349
value := make(chan cache.Response, 1)
350350
if i < 25 {
351351
snap := cache.Snapshot{}
@@ -542,6 +542,7 @@ func (s *singleResourceSnapshot) GetResources(typeURL string) map[string]types.R
542542
if typeURL != s.typeurl {
543543
return nil
544544
}
545+
545546
return map[string]types.Resource{
546547
s.name: s.resource,
547548
}

‎pkg/cache/v3/status.go

+26-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package cache
1616

1717
import (
18+
"sort"
1819
"sync"
1920
"time"
2021

@@ -65,7 +66,8 @@ type statusInfo struct {
6566
node *core.Node
6667

6768
// watches are indexed channels for the response watches and the original requests.
68-
watches map[int64]ResponseWatch
69+
watches map[int64]ResponseWatch
70+
orderedWatches keys
6971

7072
// deltaWatches are indexed channels for the delta response watches and the original requests
7173
deltaWatches map[int64]DeltaResponseWatch
@@ -105,9 +107,10 @@ type DeltaResponseWatch struct {
105107
// newStatusInfo initializes a status info data structure.
106108
func newStatusInfo(node *core.Node) *statusInfo {
107109
out := statusInfo{
108-
node: node,
109-
watches: make(map[int64]ResponseWatch),
110-
deltaWatches: make(map[int64]DeltaResponseWatch),
110+
node: node,
111+
watches: make(map[int64]ResponseWatch),
112+
orderedWatches: make(keys, 0),
113+
deltaWatches: make(map[int64]DeltaResponseWatch),
111114
}
112115
return &out
113116
}
@@ -155,3 +158,22 @@ func (info *statusInfo) setDeltaResponseWatch(id int64, drw DeltaResponseWatch)
155158
defer info.mu.Unlock()
156159
info.deltaWatches[id] = drw
157160
}
161+
162+
// orderResponseWatches will track a list of watch keys and order them if
163+
// true is passed.
164+
func (info *statusInfo) orderResponseWatches() {
165+
info.orderedWatches = make(keys, len(info.watches))
166+
167+
var index int
168+
for id, watch := range info.watches {
169+
info.orderedWatches[index] = key{
170+
ID: id,
171+
TypeURL: watch.Request.TypeUrl,
172+
}
173+
index++
174+
}
175+
176+
// Sort our list which we can use in the SetSnapshot functions.
177+
// This is only run when we enable ADS on the cache.
178+
sort.Sort(info.orderedWatches)
179+
}

‎pkg/integration/ttl_integration_test.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,11 @@ func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format
3232
func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) }
3333

3434
func TestTTLResponse(t *testing.T) {
35-
3635
ctx, cancel := context.WithCancel(context.Background())
3736
defer cancel()
3837

3938
snapshotCache := cache.NewSnapshotCacheWithHeartbeating(ctx, false, cache.IDHash{}, logger{t: t}, time.Second)
40-
4139
server := server.NewServer(ctx, snapshotCache, nil)
42-
4340
grpcServer := grpc.NewServer()
4441
endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
4542

@@ -53,8 +50,8 @@ func TestTTLResponse(t *testing.T) {
5350

5451
conn, err := grpc.Dial(":9999", grpc.WithTransportCredentials(insecure.NewCredentials()))
5552
assert.NoError(t, err)
56-
client := endpointservice.NewEndpointDiscoveryServiceClient(conn)
5753

54+
client := endpointservice.NewEndpointDiscoveryServiceClient(conn)
5855
sclient, err := client.StreamEndpoints(ctx)
5956
assert.NoError(t, err)
6057

@@ -75,16 +72,16 @@ func TestTTLResponse(t *testing.T) {
7572
TTL: &oneSecond,
7673
}},
7774
})
75+
7876
err = snapshotCache.SetSnapshot(context.Background(), "test", snap)
7977
assert.NoError(t, err)
8078

8179
timeout := time.NewTimer(5 * time.Second)
82-
8380
awaitResponse := func() *envoy_service_discovery_v3.DiscoveryResponse {
8481
t.Helper()
8582
doneCh := make(chan *envoy_service_discovery_v3.DiscoveryResponse)
86-
go func() {
8783

84+
go func() {
8885
r, err := sclient.Recv()
8986
assert.NoError(t, err)
9087

‎pkg/log/default.go

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//nolint:all
12
package log
23

34
// DefaultLogger is enabled when no consuming clients provide

‎pkg/log/log_test.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@ import (
2222
)
2323

2424
func ExampleLoggerFuncs() {
25-
logger := log.Logger{}
26-
2725
xdsLogger := LoggerFuncs{
28-
DebugFunc: logger.Printf,
29-
InfoFunc: logger.Printf,
30-
WarnFunc: logger.Printf,
31-
ErrorFunc: logger.Printf,
26+
DebugFunc: log.Printf,
27+
InfoFunc: log.Printf,
28+
WarnFunc: log.Printf,
29+
ErrorFunc: log.Printf,
3230
}
3331

3432
xdsLogger.Debugf("debug")

0 commit comments

Comments
 (0)
Please sign in to comment.