Skip to content

Commit 227f5af

Browse files
authored
server: add Node to stream closed callbacks (#572)
Signed-off-by: Bing Han <[email protected]>
1 parent 203e445 commit 227f5af

File tree

5 files changed

+29
-22
lines changed

5 files changed

+29
-22
lines changed

docs/server.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"log"
2323
"sync"
2424

25+
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2526
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
2627
)
2728

@@ -46,9 +47,9 @@ func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error
4647
}
4748
return nil
4849
}
49-
func (cb *Callbacks) OnStreamClosed(id int64) {
50+
func (cb *Callbacks) OnStreamClosed(id int64, node *core.Node) {
5051
if cb.Debug {
51-
log.Printf("stream %d closed\n", id)
52+
log.Printf("stream %d of node %s closed\n", id, node.Id)
5253
}
5354
}
5455
func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error {
@@ -57,9 +58,9 @@ func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string)
5758
}
5859
return nil
5960
}
60-
func (cb *Callbacks) OnDeltaStreamClosed(id int64) {
61+
func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *core.Node) {
6162
if cb.Debug {
62-
log.Printf("delta stream %d closed\n", id)
63+
log.Printf("delta stream %d of node %s closed\n", id, node.Id)
6364
}
6465
}
6566
func (cb *Callbacks) OnStreamRequest(int64, *discovery.DiscoveryRequest) error {

pkg/server/delta/v3/server.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type Callbacks interface {
2626
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
2727
OnDeltaStreamOpen(context.Context, int64, string) error
2828
// OnDeltaStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
29-
OnDeltaStreamClosed(int64)
29+
OnDeltaStreamClosed(int64, *core.Node)
3030
// OnStreamDeltaRequest is called once a request is received on a stream.
3131
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
3232
OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error
@@ -63,10 +63,12 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
6363
// a collection of stack allocated watches per request type
6464
watches := newWatches()
6565

66+
var node = &core.Node{}
67+
6668
defer func() {
6769
watches.Cancel()
6870
if s.callbacks != nil {
69-
s.callbacks.OnDeltaStreamClosed(streamID)
71+
s.callbacks.OnDeltaStreamClosed(streamID, node)
7072
}
7173
}()
7274

@@ -96,7 +98,6 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
9698
}
9799
}
98100

99-
var node = &core.Node{}
100101
for {
101102
select {
102103
case <-s.ctx.Done():

pkg/server/sotw/v3/server.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type Callbacks interface {
4141
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
4242
OnStreamOpen(context.Context, int64, string) error
4343
// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
44-
OnStreamClosed(int64)
44+
OnStreamClosed(int64, *core.Node)
4545
// OnStreamRequest is called once a request is received on a stream.
4646
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
4747
OnStreamRequest(int64, *discovery.DiscoveryRequest) error
@@ -87,10 +87,13 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
8787
// a collection of stack allocated watches per request type
8888
watches := newWatches()
8989

90+
// node may only be set on the first discovery request
91+
var node = &core.Node{}
92+
9093
defer func() {
9194
watches.close()
9295
if s.callbacks != nil {
93-
s.callbacks.OnStreamClosed(streamID)
96+
s.callbacks.OnStreamClosed(streamID, node)
9497
}
9598
}()
9699

@@ -130,9 +133,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
130133
}
131134
}
132135

133-
// node may only be set on the first discovery request
134-
var node = &core.Node{}
135-
136136
// recompute dynamic channels for this stream
137137
watches.recompute(s.ctx, reqCh)
138138

pkg/server/v3/server.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3"
2727
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
2828

29+
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2930
clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
3031
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
3132
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@@ -68,9 +69,9 @@ type Callbacks interface {
6869
// CallbackFuncs is a convenience type for implementing the Callbacks interface.
6970
type CallbackFuncs struct {
7071
StreamOpenFunc func(context.Context, int64, string) error
71-
StreamClosedFunc func(int64)
72+
StreamClosedFunc func(int64, *core.Node)
7273
DeltaStreamOpenFunc func(context.Context, int64, string) error
73-
DeltaStreamClosedFunc func(int64)
74+
DeltaStreamClosedFunc func(int64, *core.Node)
7475
StreamRequestFunc func(int64, *discovery.DiscoveryRequest) error
7576
StreamResponseFunc func(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
7677
StreamDeltaRequestFunc func(int64, *discovery.DeltaDiscoveryRequest) error
@@ -91,9 +92,9 @@ func (c CallbackFuncs) OnStreamOpen(ctx context.Context, streamID int64, typeURL
9192
}
9293

9394
// OnStreamClosed invokes StreamClosedFunc.
94-
func (c CallbackFuncs) OnStreamClosed(streamID int64) {
95+
func (c CallbackFuncs) OnStreamClosed(streamID int64, node *core.Node) {
9596
if c.StreamClosedFunc != nil {
96-
c.StreamClosedFunc(streamID)
97+
c.StreamClosedFunc(streamID, node)
9798
}
9899
}
99100

@@ -107,9 +108,9 @@ func (c CallbackFuncs) OnDeltaStreamOpen(ctx context.Context, streamID int64, ty
107108
}
108109

109110
// OnDeltaStreamClosed invokes DeltaStreamClosedFunc.
110-
func (c CallbackFuncs) OnDeltaStreamClosed(streamID int64) {
111+
func (c CallbackFuncs) OnDeltaStreamClosed(streamID int64, node *core.Node) {
111112
if c.DeltaStreamClosedFunc != nil {
112-
c.DeltaStreamClosedFunc(streamID)
113+
c.DeltaStreamClosedFunc(streamID, node)
113114
}
114115
}
115116

pkg/test/v3/callbacks.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"log"
66
"sync"
77

8+
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
89
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
10+
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
911
)
1012

1113
type Callbacks struct {
@@ -18,6 +20,8 @@ type Callbacks struct {
1820
mu sync.Mutex
1921
}
2022

23+
var _ server.Callbacks = &Callbacks{}
24+
2125
func (cb *Callbacks) Report() {
2226
cb.mu.Lock()
2327
defer cb.mu.Unlock()
@@ -29,9 +33,9 @@ func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error
2933
}
3034
return nil
3135
}
32-
func (cb *Callbacks) OnStreamClosed(id int64) {
36+
func (cb *Callbacks) OnStreamClosed(id int64, node *core.Node) {
3337
if cb.Debug {
34-
log.Printf("stream %d closed\n", id)
38+
log.Printf("stream %d of node %s closed\n", id, node.Id)
3539
}
3640
}
3741
func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error {
@@ -40,9 +44,9 @@ func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string)
4044
}
4145
return nil
4246
}
43-
func (cb *Callbacks) OnDeltaStreamClosed(id int64) {
47+
func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *core.Node) {
4448
if cb.Debug {
45-
log.Printf("delta stream %d closed\n", id)
49+
log.Printf("delta stream %d of node %s closed\n", id, node.Id)
4650
}
4751
}
4852
func (cb *Callbacks) OnStreamRequest(int64, *discovery.DiscoveryRequest) error {

0 commit comments

Comments
 (0)