@@ -22,9 +22,6 @@ import (
2222 "fmt"
2323 "sync"
2424
25- afpb "aalyria.com/spacetime/api/cdpi/v1alpha"
26- apipb "aalyria.com/spacetime/api/common"
27- schedpb "aalyria.com/spacetime/api/scheduling/v1alpha"
2825 "aalyria.com/spacetime/cdpi_agent/enactment"
2926 "aalyria.com/spacetime/cdpi_agent/internal/task"
3027 "aalyria.com/spacetime/cdpi_agent/telemetry"
@@ -46,7 +43,6 @@ func init() {
4643
4744var (
4845 errNoClock = errors .New ("no clock provided (see WithClock)" )
49- errNoEndpoint = errors .New ("no server endpoint provied (see WithServerEndpoint)" )
5046 errNoNodes = errors .New ("no nodes configured (see WithNode)" )
5147 errNoActiveServices = errors .New ("no services configured for node (see WithEnactmentBackend and WithTelemetryBackend)" )
5248)
@@ -71,17 +67,14 @@ func (fn nodeOptFunc) apply(n *node) { fn(n) }
7167// Agent is a CDPI agent that coordinates change requests across multiple
7268// nodes.
7369type Agent struct {
74- clock clockwork.Clock
75- nodes map [string ]* node
76- endpoint string
77- dialOpts []grpc.DialOption
70+ clock clockwork.Clock
71+ nodes map [string ]* node
7872}
7973
8074// NewAgent creates a new Agent configured with the provided options.
8175func NewAgent (opts ... AgentOption ) (* Agent , error ) {
8276 a := & Agent {
83- nodes : map [string ]* node {},
84- dialOpts : []grpc.DialOption {grpc .WithBlock ()},
77+ nodes : map [string ]* node {},
8578 }
8679
8780 for _ , opt := range opts {
@@ -96,9 +89,6 @@ func (a *Agent) validate() error {
9689 if a .clock == nil {
9790 errs = append (errs , errNoClock )
9891 }
99- if a .endpoint == "" {
100- errs = append (errs , errNoEndpoint )
101- }
10292 if len (a .nodes ) == 0 {
10393 errs = append (errs , errNoNodes )
10494 }
@@ -122,25 +112,6 @@ func WithRealClock() AgentOption {
122112 return WithClock (clockwork .NewRealClock ())
123113}
124114
125- // WithServerEndpoint configures the Agent to connect to the provided endpoint.
126- func WithServerEndpoint (endpoint string ) AgentOption {
127- return agentOptFunc (func (a * Agent ) {
128- a .endpoint = endpoint
129- })
130- }
131-
132- // WithDialOpts configures the Agent to use the provided DialOptions when
133- // connecting to the CDPI endpoint.
134- //
135- // NOTE: The CDPI agent always uses the `grpc.WithBlock` option to ensure
136- // initial connection errors are caught immediately, whereas logical errors are
137- // often more tightly scoped to individual RPCs.
138- func WithDialOpts (dialOpts ... grpc.DialOption ) AgentOption {
139- return agentOptFunc (func (a * Agent ) {
140- a .dialOpts = append (a .dialOpts , dialOpts ... )
141- })
142- }
143-
144115// WithNode configures a network node for the agent to represent.
145116func WithNode (id string , opts ... NodeOption ) AgentOption {
146117 n := & node {id : id }
@@ -153,42 +124,36 @@ func WithNode(id string, opts ...NodeOption) AgentOption {
153124}
154125
155126type node struct {
156- initState * apipb.ControlPlaneState
157- eb enactment.Backend
158- tb telemetry.Backend
159- id string
160- priority uint32
161- enactmentsEnabled bool
162- telemetryEnabled bool
163- }
127+ ed enactment.Driver
128+ td telemetry.Driver
129+ id string
130+ priority uint32
164131
165- func WithChannelPriority (priority uint32 ) NodeOption {
166- return nodeOptFunc (func (n * node ) { n .priority = priority })
132+ enactmentEndpoint , telemetryEndpoint string
133+ enactmentsEnabled , telemetryEnabled bool
134+ enactmentDialOpts , telemetryDialOpts []grpc.DialOption
167135}
168136
169- // WithEnactmentBackend configures the EnactmentBackend for the given Node.
170- func WithEnactmentBackend ( eb enactment.Backend ) NodeOption {
137+ // WithEnactmentDriver configures the [enactment.Driver] for the given Node.
138+ func WithEnactmentDriver ( endpoint string , d enactment.Driver , dialOpts ... grpc. DialOption ) NodeOption {
171139 return nodeOptFunc (func (n * node ) {
172- n .eb = eb
140+ n .ed = d
141+ n .enactmentEndpoint = endpoint
142+ n .enactmentDialOpts = dialOpts
173143 n .enactmentsEnabled = true
174144 })
175145}
176146
177- // WithTelemetryBackend configures the telemetry.Backend for the given Node.
178- func WithTelemetryBackend ( tb telemetry.Backend ) NodeOption {
147+ // WithTelemetryDriver configures the [ telemetry.Driver] for the given Node.
148+ func WithTelemetryDriver ( endpoint string , d telemetry.Driver , dialOpts ... grpc. DialOption ) NodeOption {
179149 return nodeOptFunc (func (n * node ) {
180- n .tb = tb
150+ n .td = d
151+ n .telemetryEndpoint = endpoint
152+ n .telemetryDialOpts = dialOpts
181153 n .telemetryEnabled = true
182154 })
183155}
184156
185- // WithInitialState configures the initial state of the Node.
186- func WithInitialState (initState * apipb.ControlPlaneState ) NodeOption {
187- return nodeOptFunc (func (n * node ) {
188- n .initState = initState
189- })
190- }
191-
192157// Run starts the Agent and blocks until a fatal error is encountered or all
193158// node controllers terminate.
194159func (a * Agent ) Run (ctx context.Context ) error {
@@ -226,21 +191,13 @@ func (a *Agent) Run(ctx context.Context) error {
226191}
227192
228193func (a * Agent ) start (ctx context.Context , agentMap * expvar.Map , errCh chan error ) error {
229- log := zerolog .Ctx (ctx )
230-
231- log .Trace ().Str ("endpoint" , a .endpoint ).Msg ("contacting the CDPI endpoint" )
232- conn , err := grpc .NewClient (a .endpoint , a .dialOpts ... )
233- if err != nil {
234- return fmt .Errorf ("agent: failed connecting to CDPI backend: %w" , err )
235- }
236-
237- schedClient := schedpb .NewSchedulingClient (conn )
238- telemetryClient := afpb .NewNetworkTelemetryStreamingClient (conn )
239-
240194 for _ , n := range a .nodes {
241195 ctx , done := context .WithCancel (ctx )
242196
243- nc := a .newNodeController (n , done , schedClient , telemetryClient )
197+ nc , err := a .newNodeController (n , done )
198+ if err != nil {
199+ return fmt .Errorf ("node %q: %w" , n .id , err )
200+ }
244201 agentMap .Set (n .id , expvar .Func (nc .Stats ))
245202
246203 srv := task .Task (nc .run ).
0 commit comments