@@ -16,14 +16,17 @@ package trigger
16
16
17
17
import (
18
18
"context"
19
- stdErr "errors"
19
+ stderr "errors"
20
20
"fmt"
21
21
"io"
22
22
"os"
23
23
"strings"
24
24
"sync"
25
25
"time"
26
26
27
+ "google.golang.org/grpc/credentials/insecure"
28
+ "google.golang.org/protobuf/types/known/emptypb"
29
+
27
30
eb "github.com/vanus-labs/vanus/client"
28
31
"github.com/vanus-labs/vanus/internal/controller/member"
29
32
"github.com/vanus-labs/vanus/internal/controller/trigger/metadata"
@@ -42,22 +45,21 @@ import (
42
45
"github.com/vanus-labs/vanus/pkg/util"
43
46
ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller"
44
47
metapb "github.com/vanus-labs/vanus/proto/pkg/meta"
45
- "google.golang.org/grpc/credentials/insecure"
46
- "google.golang.org/protobuf/types/known/emptypb"
47
48
)
48
49
49
50
var _ ctrlpb.TriggerControllerServer = & controller {}
50
51
51
52
const (
52
53
defaultGcSubscriptionInterval = time .Second * 10
54
+ waitEventbusReadyTime = time .Minute * 3
55
+ waitEventbusCheckPeriod = time .Second * 2
53
56
)
54
57
55
58
func NewController (config Config , mem member.Member ) * controller {
56
59
ctrl := & controller {
57
60
config : config ,
58
61
member : mem ,
59
62
needCleanSubscription : map [vanus.ID ]string {},
60
- state : primitive .ServerStateCreated ,
61
63
cl : cluster .NewClusterController (config .ControllerAddr , insecure .NewCredentials ()),
62
64
ebClient : eb .Connect (config .ControllerAddr ),
63
65
}
@@ -79,17 +81,13 @@ type controller struct {
79
81
isLeader bool
80
82
ctx context.Context
81
83
stopFunc context.CancelFunc
82
- state primitive.ServerState
83
84
cl cluster.Cluster
84
85
ebClient eb.Client
85
86
}
86
87
87
88
func (ctrl * controller ) SetDeadLetterEventOffset (
88
89
ctx context.Context , request * ctrlpb.SetDeadLetterEventOffsetRequest ,
89
90
) (* emptypb.Empty , error ) {
90
- if ctrl .state != primitive .ServerStateRunning {
91
- return nil , errors .ErrServerNotStart
92
- }
93
91
subID := vanus .ID (request .SubscriptionId )
94
92
err := ctrl .subscriptionManager .SaveDeadLetterOffset (ctx , subID , request .GetOffset ())
95
93
if err != nil {
@@ -101,9 +99,6 @@ func (ctrl *controller) SetDeadLetterEventOffset(
101
99
func (ctrl * controller ) GetDeadLetterEventOffset (
102
100
ctx context.Context , request * ctrlpb.GetDeadLetterEventOffsetRequest ,
103
101
) (* ctrlpb.GetDeadLetterEventOffsetResponse , error ) {
104
- if ctrl .state != primitive .ServerStateRunning {
105
- return nil , errors .ErrServerNotStart
106
- }
107
102
subID := vanus .ID (request .SubscriptionId )
108
103
offset , err := ctrl .subscriptionManager .GetDeadLetterOffset (ctx , subID )
109
104
if err != nil {
@@ -115,9 +110,6 @@ func (ctrl *controller) GetDeadLetterEventOffset(
115
110
func (ctrl * controller ) CommitOffset (
116
111
ctx context.Context , request * ctrlpb.CommitOffsetRequest ,
117
112
) (* ctrlpb.CommitOffsetResponse , error ) {
118
- if ctrl .state != primitive .ServerStateRunning {
119
- return nil , errors .ErrServerNotStart
120
- }
121
113
resp := new (ctrlpb.CommitOffsetResponse )
122
114
for _ , subInfo := range request .SubscriptionInfo {
123
115
if len (subInfo .Offsets ) == 0 {
@@ -139,9 +131,6 @@ func (ctrl *controller) CommitOffset(
139
131
func (ctrl * controller ) ResetOffsetToTimestamp (
140
132
ctx context.Context , request * ctrlpb.ResetOffsetToTimestampRequest ,
141
133
) (* ctrlpb.ResetOffsetToTimestampResponse , error ) {
142
- if ctrl .state != primitive .ServerStateRunning {
143
- return nil , errors .ErrServerNotStart
144
- }
145
134
if request .Timestamp == 0 {
146
135
return nil , errors .ErrInvalidRequest .WithMessage ("timestamp is invalid" )
147
136
}
@@ -166,15 +155,20 @@ func (ctrl *controller) ResetOffsetToTimestamp(
166
155
func (ctrl * controller ) CreateSubscription (
167
156
ctx context.Context , request * ctrlpb.CreateSubscriptionRequest ,
168
157
) (* metapb.Subscription , error ) {
169
- if ctrl .state != primitive .ServerStateRunning {
170
- return nil , errors .ErrServerNotStart
171
- }
172
158
err := validation .ValidateSubscriptionRequest (ctx , request .Subscription )
173
159
if err != nil {
174
160
log .Info (ctx ).Err (err ).Msg ("create subscription validate fail" )
175
161
return nil , err
176
162
}
177
163
sub := convert .FromPbSubscriptionRequest (request .Subscription )
164
+ _ , err = ctrl .cl .NamespaceService ().GetNamespace (ctx , sub .NamespaceID .Uint64 ())
165
+ if err != nil {
166
+ return nil , err
167
+ }
168
+ _ , err = ctrl .cl .EventbusService ().GetEventbus (ctx , sub .EventbusID .Uint64 ())
169
+ if err != nil {
170
+ return nil , err
171
+ }
178
172
sub .ID , err = vanus .NewID ()
179
173
sub .CreatedAt = time .Now ()
180
174
sub .UpdatedAt = time .Now ()
@@ -200,9 +194,6 @@ func (ctrl *controller) CreateSubscription(
200
194
func (ctrl * controller ) UpdateSubscription (
201
195
ctx context.Context , request * ctrlpb.UpdateSubscriptionRequest ,
202
196
) (* metapb.Subscription , error ) {
203
- if ctrl .state != primitive .ServerStateRunning {
204
- return nil , errors .ErrServerNotStart
205
- }
206
197
subID := vanus .ID (request .Id )
207
198
sub := ctrl .subscriptionManager .GetSubscription (ctx , subID )
208
199
if sub == nil {
@@ -243,9 +234,6 @@ func (ctrl *controller) UpdateSubscription(
243
234
func (ctrl * controller ) DeleteSubscription (
244
235
ctx context.Context , request * ctrlpb.DeleteSubscriptionRequest ,
245
236
) (* emptypb.Empty , error ) {
246
- if ctrl .state != primitive .ServerStateRunning {
247
- return nil , errors .ErrServerNotStart
248
- }
249
237
subID := vanus .ID (request .Id )
250
238
sub := ctrl .subscriptionManager .GetSubscription (ctx , subID )
251
239
if sub != nil {
@@ -269,9 +257,6 @@ func (ctrl *controller) DeleteSubscription(
269
257
func (ctrl * controller ) DisableSubscription (
270
258
ctx context.Context , request * ctrlpb.DisableSubscriptionRequest ,
271
259
) (* emptypb.Empty , error ) {
272
- if ctrl .state != primitive .ServerStateRunning {
273
- return nil , errors .ErrServerNotStart
274
- }
275
260
subID := vanus .ID (request .Id )
276
261
sub := ctrl .subscriptionManager .GetSubscription (ctx , subID )
277
262
if sub == nil {
@@ -302,9 +287,6 @@ func (ctrl *controller) DisableSubscription(
302
287
func (ctrl * controller ) ResumeSubscription (
303
288
ctx context.Context , request * ctrlpb.ResumeSubscriptionRequest ,
304
289
) (* emptypb.Empty , error ) {
305
- if ctrl .state != primitive .ServerStateRunning {
306
- return nil , errors .ErrServerNotStart
307
- }
308
290
subID := vanus .ID (request .Id )
309
291
sub := ctrl .subscriptionManager .GetSubscription (ctx , subID )
310
292
if sub == nil {
@@ -326,9 +308,6 @@ func (ctrl *controller) ResumeSubscription(
326
308
func (ctrl * controller ) GetSubscription (
327
309
ctx context.Context , request * ctrlpb.GetSubscriptionRequest ,
328
310
) (* metapb.Subscription , error ) {
329
- if ctrl .state != primitive .ServerStateRunning {
330
- return nil , errors .ErrServerNotStart
331
- }
332
311
sub := ctrl .subscriptionManager .GetSubscription (ctx , vanus .ID (request .Id ))
333
312
if sub == nil {
334
313
return nil , errors .ErrResourceNotFound .WithMessage ("subscription not exist" )
@@ -355,7 +334,7 @@ func (ctrl *controller) TriggerWorkerHeartbeat(
355
334
}
356
335
req , err := heartbeat .Recv ()
357
336
if err != nil {
358
- if ! stdErr .Is (err , io .EOF ) {
337
+ if ! stderr .Is (err , io .EOF ) {
359
338
log .Warn (ctx ).Err (err ).Msg ("heartbeat recv error" )
360
339
}
361
340
log .Info (ctx ).Msg ("heartbeat close" )
@@ -562,7 +541,6 @@ func (ctrl *controller) membershipChangedProcessor(
562
541
ctrl .subscriptionManager .Start ()
563
542
ctrl .scheduler .Run ()
564
543
go ctrl .gcSubscriptions (ctx )
565
- ctrl .state = primitive .ServerStateRunning
566
544
ctrl .isLeader = true
567
545
case member .EventBecomeFollower :
568
546
if ! ctrl .isLeader {
@@ -579,13 +557,11 @@ func (ctrl *controller) membershipChangedProcessor(
579
557
580
558
func (ctrl * controller ) stop (_ context.Context ) error {
581
559
ctrl .member .ResignIfLeader ()
582
- ctrl .state = primitive .ServerStateStopping
583
560
ctrl .stopFunc ()
584
561
ctrl .scheduler .Stop ()
585
562
ctrl .workerManager .Stop ()
586
563
ctrl .subscriptionManager .Stop ()
587
564
ctrl .storage .Close ()
588
- ctrl .state = primitive .ServerStateStopped
589
565
return nil
590
566
}
591
567
@@ -621,12 +597,40 @@ func (ctrl *controller) initTriggerSystemEventbus() {
621
597
go func () {
622
598
ctx := context .Background ()
623
599
log .Info (ctx ).Msg ("trigger controller is ready to check system eventbus" )
600
+ if err := ctrl .cl .WaitForControllerReady (false ); err != nil {
601
+ log .Error ().Err (err ).Msg ("trigger controller check system eventbus, " +
602
+ "but Vanus cluster hasn't ready, exit" )
603
+ os .Exit (- 1 )
604
+ }
605
+ ready := util .WaitReady (func () bool {
606
+ exist , err := ctrl .cl .EventbusService ().IsSystemEventbusExistByName (ctx , primitive .TimerEventbusName )
607
+ if err != nil {
608
+ log .Error ().Err (err ).Msg ("check TimerEventbus exist has error" )
609
+ return false
610
+ }
611
+ return exist
612
+ }, waitEventbusReadyTime , waitEventbusCheckPeriod )
613
+ if ! ready {
614
+ log .Error ().Msg ("check TimerEventbus timeout no exist, will exist" )
615
+ os .Exit (- 1 )
616
+ }
617
+
618
+ // wait TimerEventbus
619
+ exist , err := ctrl .cl .EventbusService ().IsSystemEventbusExistByName (ctx , primitive .RetryEventbusName )
620
+ if err != nil {
621
+ log .Error ().Err (err ).Msg ("failed to check RetryEventbus exist, exit" )
622
+ os .Exit (- 1 )
623
+ }
624
+ if exist {
625
+ log .Info ().Msg ("trigger controller check RetryEventbus exist" )
626
+ return
627
+ }
628
+ log .Info ().Msg ("trigger controller check RetryEventbus no exist, will create" )
624
629
if err := ctrl .cl .WaitForControllerReady (true ); err != nil {
625
630
log .Error (ctx ).Err (err ).
626
631
Msg ("trigger controller try to create system eventbus, but Vanus cluster hasn't ready, exit" )
627
632
os .Exit (- 1 )
628
633
}
629
-
630
634
if _ , err := ctrl .cl .EventbusService ().CreateSystemEventbusIfNotExist (ctx , primitive .RetryEventbusName ,
631
635
"System Eventbus For Trigger Service" ); err != nil {
632
636
log .Error (ctx ).Err (err ).Msg ("failed to create RetryEventbus, exit" )
0 commit comments