Skip to content

Commit

Permalink
bugfix: fix prometheus server post process callback
Browse files Browse the repository at this point in the history
  • Loading branch information
shugenniu committed Jun 17, 2022
1 parent 5c78d22 commit 5c86a24
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 42 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ coverage.txt
*.bolt.lock
/vendor
.codecc
*.log
26 changes: 19 additions & 7 deletions plugin/discoverstat/discoverlocal/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,33 @@ func TestDiscoverStatisWorker_AddDiscoverCall(t *testing.T) {
},
}

go worker.Run()
workerStarted := make(chan struct{}, 1)
go func() {
workerStarted <- struct{}{}
worker.Run()
}()
<-workerStarted // 等待 worker 启动

namespace := "Test"
timeout := time.After(time.Minute * 3)
stop := make(chan struct{})
for i := 0; i < 1000; i++ {
go func(index int) {
go func(stop chan struct{}, index int) {
trigger := time.NewTimer(time.Millisecond * 10)
for {
name := fmt.Sprintf("test-service-%d", index)
if err := worker.AddDiscoverCall(name, namespace, time.Now()); err != nil {
t.Errorf("err: %s", err.Error())
select {
case <-trigger.C:
name := fmt.Sprintf("test-service-%d", index)
if err := worker.AddDiscoverCall(name, namespace, time.Now()); err != nil {
t.Errorf("err: %s", err.Error())
}
case <-stop:
return
}
time.Sleep(time.Millisecond * 10)
}
}(i)
}(stop, i)
}
<-timeout
stop <- struct{}{}
t.Log("pass")
}
75 changes: 40 additions & 35 deletions test/config_center/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func TestClientSetupAndFileNotExisted(t *testing.T) {
}

rsp := configService.GetConfigFileForClient(defaultCtx, fileInfo)
assert.Equal(t, uint32(api.NotFoundResource), rsp.Code.GetValue(), "GetConfigFileForClient must notfound")
assert.Equal(t, api.NotFoundResource, rsp.Code.GetValue(), "GetConfigFileForClient must notfound")

rsp2 := configService.CheckClientConfigFileByVersion(defaultCtx, assembleDefaultClientConfigFile(0))
assert.Equal(t, uint32(api.DataNoChange), rsp2.Code.GetValue(), "CheckClientConfigFileByVersion must nochange")
assert.Equal(t, api.DataNoChange, rsp2.Code.GetValue(), "CheckClientConfigFileByVersion must nochange")
assert.Nil(t, rsp2.ConfigFile)

rsp3 := configService.CheckClientConfigFileByMd5(defaultCtx, assembleDefaultClientConfigFile(0))
assert.Equal(t, uint32(api.DataNoChange), rsp3.Code.GetValue())
assert.Equal(t, api.DataNoChange, rsp3.Code.GetValue())
assert.Nil(t, rsp3.ConfigFile)
}

Expand All @@ -78,7 +78,8 @@ func TestClientSetupAndFileExisted(t *testing.T) {

// 拉取配置接口
rsp3 := configService.GetConfigFileForClient(defaultCtx, fileInfo)
assert.Equalf(t, api.ExecuteSuccess, rsp3.Code.GetValue(), "GetConfigFileForClient must success, acutal code : %d", rsp3.Code.GetValue())
assert.Equalf(t, api.ExecuteSuccess, rsp3.Code.GetValue(),
"GetConfigFileForClient must success, acutal code : %d", rsp3.Code.GetValue())
assert.NotNil(t, rsp3.ConfigFile)
assert.Equal(t, uint64(1), rsp3.ConfigFile.Version.GetValue())
assert.Equal(t, configFile.Content.GetValue(), rsp3.ConfigFile.Content.GetValue())
Expand Down Expand Up @@ -164,17 +165,18 @@ func TestWatchConfigFileAtFirstPublish(t *testing.T) {
received := make(chan uint64)

watchConfigFiles := assembleDefaultClientConfigFile(0)
clientId := "TestWatchConfigFileAtFirstPublish-first"
clientID := "TestWatchConfigFileAtFirstPublish-first"

defer func() {
configService.WatchCenter().RemoveWatcher(clientId, watchConfigFiles)
configService.WatchCenter().RemoveWatcher(clientID, watchConfigFiles)
}()

configService.WatchCenter().AddWatcher(clientId, watchConfigFiles, func(clientId string, rsp *api.ConfigClientResponse) bool {
t.Logf("clientId=[%s] receive config publish msg", clientId)
received <- rsp.ConfigFile.Version.GetValue()
return true
})
configService.WatchCenter().AddWatcher(clientID,
watchConfigFiles, func(clientId string, rsp *api.ConfigClientResponse) bool {
t.Logf("clientId=[%s] receive config publish msg", clientId)
received <- rsp.ConfigFile.Version.GetValue()
return true
})

rsp := configService.CreateConfigFile(defaultCtx, configFile)
assert.Equal(t, api.ExecuteSuccess, rsp.Code.GetValue())
Expand All @@ -193,13 +195,14 @@ func TestWatchConfigFileAtFirstPublish(t *testing.T) {
// 版本号由于发布过一次,所以是1
watchConfigFiles := assembleDefaultClientConfigFile(1)

clientId := "TestWatchConfigFileAtFirstPublish-second"
clientID := "TestWatchConfigFileAtFirstPublish-second"

configService.WatchCenter().AddWatcher(clientId, watchConfigFiles, func(clientId string, rsp *api.ConfigClientResponse) bool {
t.Logf("clientId=[%s] receive config publish msg", clientId)
received <- rsp.ConfigFile.Version.GetValue()
return true
})
configService.WatchCenter().AddWatcher(clientID,
watchConfigFiles, func(clientId string, rsp *api.ConfigClientResponse) bool {
t.Logf("clientId=[%s] receive config publish msg", clientId)
received <- rsp.ConfigFile.Version.GetValue()
return true
})

rsp3 := configService.PublishConfigFile(defaultCtx, assembleConfigFileRelease(configFile))
assert.Equal(t, api.ExecuteSuccess, rsp3.Code.GetValue())
Expand All @@ -209,7 +212,7 @@ func TestWatchConfigFileAtFirstPublish(t *testing.T) {
assert.Equal(t, uint64(2), receivedVersion)

// 为了避免影响其它 case,删除订阅
configService.WatchCenter().RemoveWatcher(clientId, watchConfigFiles)
configService.WatchCenter().RemoveWatcher(clientID, watchConfigFiles)
})
}

Expand All @@ -224,14 +227,15 @@ func Test10000ClientWatchConfigFile(t *testing.T) {
receivedVersion := make(map[string]uint64)
watchConfigFiles := assembleDefaultClientConfigFile(0)
for i := 0; i < clientSize; i++ {
clientId := fmt.Sprintf("Test10000ClientWatchConfigFile-client-id=%d", i)
received[clientId] = false
receivedVersion[clientId] = uint64(0)
configService.WatchCenter().AddWatcher(clientId, watchConfigFiles, func(clientId string, rsp *api.ConfigClientResponse) bool {
received[clientId] = true
receivedVersion[clientId] = rsp.ConfigFile.Version.GetValue()
return true
})
clientID := fmt.Sprintf("Test10000ClientWatchConfigFile-client-id=%d", i)
received[clientID] = false
receivedVersion[clientID] = uint64(0)
configService.WatchCenter().AddWatcher(clientID,
watchConfigFiles, func(clientId string, rsp *api.ConfigClientResponse) bool {
received[clientId] = true
receivedVersion[clientId] = rsp.ConfigFile.Version.GetValue()
return true
})
}

// 创建并发布配置文件
Expand All @@ -258,11 +262,11 @@ func Test10000ClientWatchConfigFile(t *testing.T) {
for _, v := range receivedVersion {
receivedVerCnt += v
}
assert.Equal(t, uint64(len(receivedVersion)), uint64(receivedVerCnt))
assert.Equal(t, uint64(len(receivedVersion)), receivedVerCnt)

// 为了避免影响其它case,删除订阅
for clientId := range received {
configService.WatchCenter().RemoveWatcher(clientId, watchConfigFiles)
for clientID := range received {
configService.WatchCenter().RemoveWatcher(clientID, watchConfigFiles)
}
}

Expand All @@ -282,16 +286,17 @@ func TestDeleteConfigFile(t *testing.T) {
time.Sleep(1200 * time.Millisecond)

// 客户端订阅
clientId := randomStr()
clientID := randomStr()
received := make(chan uint64)
watchConfigFiles := assembleDefaultClientConfigFile(0)

t.Log("add config watcher")

configService.WatchCenter().AddWatcher(clientId, watchConfigFiles, func(clientId string, rsp *api.ConfigClientResponse) bool {
received <- rsp.ConfigFile.Version.GetValue()
return true
})
configService.WatchCenter().AddWatcher(clientID,
watchConfigFiles, func(clientId string, rsp *api.ConfigClientResponse) bool {
received <- rsp.ConfigFile.Version.GetValue()
return true
})

// 删除配置文件
t.Log("remove config file")
Expand All @@ -312,5 +317,5 @@ func TestDeleteConfigFile(t *testing.T) {

// 重新拉取配置,获取不到配置文件
rsp4 := configService.GetConfigFileForClient(defaultCtx, fileInfo)
assert.Equal(t, uint32(api.NotFoundResource), rsp4.Code.GetValue())
assert.Equal(t, api.NotFoundResource, rsp4.Code.GetValue())
}

0 comments on commit 5c86a24

Please sign in to comment.