Skip to content

Commit

Permalink
fix: Fix some errors in nacosREST (#54)
Browse files Browse the repository at this point in the history
1. Skip the __names__ item when enumerating all configs
2. Allow changing some Nacos options via environment variables
3. Use local Nacos config cache as a fallback by default
  • Loading branch information
CH3CHO authored Jan 31, 2024
1 parent d383cf3 commit e5ea75f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 17 deletions.
16 changes: 14 additions & 2 deletions src/apiserver/pkg/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"github.com/spf13/pflag"
"k8s.io/klog/v2"
"net/url"
"os"
"strconv"
Expand All @@ -20,6 +21,18 @@ const (
Storage_Nacos = "nacos"
)

var (
NacosDisableUseSnapShot = utils.GetBoolFromEnv("NACOS_DISABLE_USE_SNAPSHOT", false)
NacosListRefreshIntervalSecs = utils.GetIntFromEnv("NACOS_LIST_REFRESH_INTERVAL_SECS", 10)
NacosConfigSearchPageSize = utils.GetIntFromEnv("NACOS_CONFIG_SEARCH_PAGE_SIZE", 50)
)

func init() {
klog.Infof("NacosDisableUseSnapShot: %v", NacosDisableUseSnapShot)
klog.Infof("NacosListRefreshIntervalSecs: %v", NacosListRefreshIntervalSecs)
klog.Infof("NacosConfigSearchPageSize: %v", NacosConfigSearchPageSize)
}

func CreateAuthOptions() *AuthOptions {
return &AuthOptions{}
}
Expand Down Expand Up @@ -225,8 +238,7 @@ func (o *NacosOptions) CreateConfigClient() (config_client.IConfigClient, error)
constant.WithLogDir(o.LogDir),
constant.WithCacheDir(o.CacheDir),
constant.WithLogLevel("info"),
// Ignore snapshot so we can get the latest config right after making any change.
constant.WithDisableUseSnapShot(true),
constant.WithDisableUseSnapShot(NacosDisableUseSnapShot),
)

var serverConfigs []constant.ServerConfig
Expand Down
33 changes: 18 additions & 15 deletions src/apiserver/pkg/registry/nacos_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sync/atomic"
"time"

"github.com/alibaba/higress/api-server/pkg/options"
"github.com/alibaba/higress/api-server/pkg/utils"
"github.com/nacos-group/nacos-sdk-go/v2/clients/config_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
Expand All @@ -38,8 +39,6 @@ import (

const dataIdSeparator = "."
const wildcardSuffix = dataIdSeparator + "*"
const searchPageSize = 50
const listRefreshInterval = 10 * time.Second
const encryptionMark = "enc|"
const namesSuffix = "__names__"
const namesGroup = constant.DEFAULT_GROUP
Expand Down Expand Up @@ -133,7 +132,7 @@ func (n *nacosREST) startBackgroundWatcher() {
return
}

n.listRefreshTicker = time.NewTicker(listRefreshInterval)
n.listRefreshTicker = time.NewTicker(time.Duration(options.NacosListRefreshIntervalSecs) * time.Second)
go func(n *nacosREST) {
for {
<-n.listRefreshTicker.C
Expand Down Expand Up @@ -229,17 +228,16 @@ func (n *nacosREST) List(
ns, _ := genericapirequest.NamespaceFrom(ctx)

searchConfigParam := vo.SearchConfigParam{
Search: "blur",
DataId: n.dataIdPrefix + wildcardSuffix,
Group: ns,
PageSize: searchPageSize,
Search: "blur",
DataId: n.dataIdPrefix + wildcardSuffix,
Group: ns,
}
predicate := n.buildListPredicate(options)
count := 0
err = n.enumerateConfigs(&searchConfigParam, func(item *model.ConfigItem) {
obj, err := n.decodeConfig(n.codec, item.Content, n.newFunc)
if obj == nil || err != nil {
klog.Errorf("failed to decode config %s/%s: %v", item.Group, item.DataId, err)
klog.Errorf("failed to decode config [#3] %s/%s: %v", item.Group, item.DataId, err)
return
}
if ok, err := predicate.Matches(obj); err == nil && ok {
Expand Down Expand Up @@ -499,6 +497,9 @@ func (n *nacosREST) buildListPredicate(options *metainternalversion.ListOptions)
func (n *nacosREST) enumerateConfigs(param *vo.SearchConfigParam, action func(*model.ConfigItem)) error {
searchConfigParam := *param
searchConfigParam.PageNo = 1
if searchConfigParam.PageSize < options.NacosConfigSearchPageSize {
searchConfigParam.PageSize = options.NacosConfigSearchPageSize
}
for {
page, err := n.configClient.SearchConfig(searchConfigParam)
if err != nil {
Expand All @@ -510,6 +511,9 @@ func (n *nacosREST) enumerateConfigs(param *vo.SearchConfigParam, action func(*m
}

for _, item := range page.PageItems {
if item.Group == namesGroup && item.DataId == n.namesDataId {
continue
}
localItem := *(&item)
action(&localItem)
}
Expand All @@ -533,7 +537,7 @@ func (n *nacosREST) read(decoder runtime.Decoder, group, dataId string, newFunc
}
obj, err := n.decodeConfig(decoder, config, newFunc)
if err != nil {
klog.Errorf("failed to decode config %s: %v", dataId, err)
klog.Errorf("failed to decode config #4 %s: %v", dataId, err)
return nil, config, err
}
return obj, config, nil
Expand All @@ -549,10 +553,12 @@ func (n *nacosREST) readRaw(group, dataId string) (string, error) {
func (n *nacosREST) decodeConfig(decoder runtime.Decoder, config string, newFunc func() runtime.Object) (runtime.Object, error) {
decryptedConfig, err := n.decryptConfig(config)
if err != nil {
klog.Infof("failed to decoded config #1: %v\n%s", err, config)
return nil, err
}
obj, _, err := decoder.Decode([]byte(decryptedConfig), nil, newFunc())
if err != nil {
klog.Infof("failed to decoded config #2: %v\n%s", err, config)
return nil, err
}
accessor, err := meta.Accessor(obj)
Expand Down Expand Up @@ -606,9 +612,6 @@ func (n *nacosREST) refreshConfigList() {
Search: "blur",
DataId: n.dataIdPrefix + wildcardSuffix,
}, func(item *model.ConfigItem) {
if item.Group == namesGroup && item.DataId == n.namesDataId {
return
}
key := item.Group + "/" + item.DataId
allConfigKeys = append(allConfigKeys, key)
if _, ok := n.configItems[key]; !ok {
Expand Down Expand Up @@ -644,7 +647,7 @@ func (n *nacosREST) refreshConfigList() {
configItem := configItems[key]
obj, err := n.decodeConfig(n.codec, configItem.Content, n.newFunc)
if err != nil {
klog.Errorf("failed to decode config %s: %v", configItem.DataId, err)
klog.Errorf("failed to decode config #5 %s: %v", configItem.DataId, err)
delete(configItems, key)
continue
}
Expand All @@ -659,7 +662,7 @@ func (n *nacosREST) refreshConfigList() {
OnChange: func(namespace, group, dataId, data string) {
obj, err := n.decodeConfig(n.codec, data, n.newFunc)
if err != nil {
klog.Errorf("failed to decode config %s: %v", dataId, err)
klog.Errorf("failed to decode config #6 %s: %v", dataId, err)
return
}
klog.Infof("%s/%s is changed", group, dataId)
Expand All @@ -677,7 +680,7 @@ func (n *nacosREST) refreshConfigList() {
configItem := n.configItems[key]
obj, err := n.decodeConfig(n.codec, configItem.Content, n.newFunc)
if err != nil {
klog.Errorf("failed to decode config %s: %v", configItem.DataId, err)
klog.Errorf("failed to decode config #7 %s: %v", configItem.DataId, err)
continue
}
_ = n.configClient.CancelListenConfig(vo.ConfigParam{
Expand Down
41 changes: 41 additions & 0 deletions src/apiserver/pkg/utils/env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package utils

import (
"os"
"strconv"
)

func GetStringFromEnv(key, defaultValue string) string {
if len(key) == 0 {
return defaultValue
}
value := os.Getenv(key)
if len(value) != 0 {
return value
}
return defaultValue
}

func GetIntFromEnv(key string, defaultValue int) int {
strValue := GetStringFromEnv(key, "")
if len(strValue) == 0 {
return defaultValue
}
if value, err := strconv.Atoi(strValue); err != nil {
return defaultValue
} else {
return value
}
}

func GetBoolFromEnv(key string, defaultValue bool) bool {
strValue := GetStringFromEnv(key, "")
if len(strValue) == 0 {
return defaultValue
}
if value, err := strconv.ParseBool(strValue); err != nil {
return defaultValue
} else {
return value
}
}

0 comments on commit e5ea75f

Please sign in to comment.