Skip to content

Commit 9cab012

Browse files
committed
feat: refactor registry for etcd use channel
1 parent 3d8554f commit 9cab012

File tree

3 files changed

+25
-19
lines changed

3 files changed

+25
-19
lines changed

app.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ app_conf:
88
graceful_wait: 5s # 平滑退出等待时间,单位s
99

1010
# 服务注册和发现配置
11-
enable_discovery: true # 是否开启服务发现和注册,本地开发时可以设置为false
11+
enable_discovery: false # 是否开启服务发现和注册,本地开发时可以设置为false
1212
discovery:
1313
target_type: etcd
1414
endpoints:

internal/infras/discovery/etcd/etcd.go

+22-16
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ type etcdImpl struct {
3333

3434
type registerMeta struct {
3535
leaseID clientv3.LeaseID
36-
ctx context.Context
37-
cancel context.CancelFunc
36+
stop chan struct{}
3837
}
3938

4039
// Option etcdImpl functional option
@@ -137,53 +136,60 @@ func (e *etcdImpl) Register(s discovery.Service, ttl ...time.Duration) error {
137136
return err
138137
}
139138

140-
meta := &registerMeta{
139+
e.meta = &registerMeta{
141140
leaseID: leaseID,
141+
stop: make(chan struct{}, 1),
142142
}
143-
meta.ctx, meta.cancel = context.WithCancel(context.Background())
144-
err = e.keepalive(meta)
143+
err = e.keepalive(e.meta)
145144
if err != nil {
146145
return err
147146
}
148147

149-
e.meta = meta
150-
148+
log.Printf("register service:%v leaseID:%v instaceID:%v success\n", s.Name, leaseID, s.InstanceID)
151149
go func() {
152150
ticker := time.NewTicker(e.keepaliveInterval)
153151
defer ticker.Stop()
154152

155153
for {
156154
select {
157155
case <-ticker.C:
158-
err2 := e.keepalive(meta)
156+
err2 := e.keepalive(e.meta)
159157
if err2 != nil {
160-
log.Printf("keep alive failed: %v", err2)
158+
log.Printf(
159+
"keep alive service:%v leaseID:%v instaceID:%v failed,error: %v\n",
160+
s.Name, leaseID, s.InstanceID, err2,
161+
)
161162
} else {
162-
log.Printf("register service:%v leaseID:%v instaceID:%v success\n", s.Name, leaseID, s.InstanceID)
163+
log.Printf(
164+
"keep alive service:%v leaseID:%v instaceID:%v success\n",
165+
s.Name, leaseID, s.InstanceID,
166+
)
163167
}
164-
case <-meta.ctx.Done():
168+
case <-e.meta.stop:
169+
log.Printf(
170+
"service:%v leaseID:%v instaceID:%v has been deregistered\n",
171+
s.Name, leaseID, s.InstanceID,
172+
)
165173
return
166174
default:
167175
}
168176
}
169177
}()
170178

171-
log.Printf("register service:%v leaseID:%v instaceID:%v success\n", s.Name, leaseID, s.InstanceID)
172179
return nil
173180
}
174181

175182
func (e *etcdImpl) keepalive(meta *registerMeta) error {
176-
keepAlive, err := e.client.KeepAlive(meta.ctx, meta.leaseID)
183+
keepAlive, err := e.client.KeepAlive(context.Background(), meta.leaseID)
177184
if err != nil {
178185
return err
179186
}
180187

181188
go func() {
182189
// eat keepAlive channel to keep related lease alive.
183-
log.Printf("start keepalive lease %v for etcd registry\n", meta.leaseID)
184190
for range keepAlive {
185191
select {
186-
case <-meta.ctx.Done():
192+
case <-meta.stop:
187193
log.Printf("stop keepalive lease %v for etcd registry\n", meta.leaseID)
188194
return
189195
default:
@@ -216,7 +222,7 @@ func (e *etcdImpl) Deregister(name string, instanceID string) error {
216222

217223
log.Printf("deregister service:%v instaceID:%v success\n", name, instanceID)
218224
if e.meta != nil {
219-
e.meta.cancel()
225+
close(e.meta.stop)
220226
}
221227

222228
return nil

internal/infras/discovery/etcd/etcd_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
func TestRegister(t *testing.T) {
1414
r, err := New([]string{
15-
"192.168.10.121:2379",
15+
"127.0.0.1:12379",
1616
})
1717
if err != nil {
1818
log.Fatal("failed to init registry,error: ", err)
@@ -39,7 +39,7 @@ func TestRegister(t *testing.T) {
3939
// 获取服务列表
4040
func TestEtcdServices(t *testing.T) {
4141
r, err := New([]string{
42-
"192.168.10.121:2379",
42+
"127.0.0.1:12379",
4343
})
4444
if err != nil {
4545
log.Fatal(err)

0 commit comments

Comments
 (0)