Skip to content

Commit

Permalink
[ISSUE #522]Fix issue 522 (#523)
Browse files Browse the repository at this point in the history
* feat: feat issue #514 #516

* feat: none

* none

* refactor: update install.sh

* refactor: add unzip in docker-image

* refactor: update docker copy filename

* fix: fix Dockerfile arg position

* feat: 支持单机安装自定义端口 & 单机版本docker镜像支持

* docs: update readme

* fix: fix return empty slice

* fix: fix issue #522
  • Loading branch information
chuntaojun authored Jul 14, 2022
1 parent 434a40b commit 29d3816
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 16 deletions.
13 changes: 11 additions & 2 deletions apiserver/xdsserverv3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,13 +703,22 @@ func (x *XDSServer) getRegistryInfoWithCache(ctx context.Context, registryInfo m
registryInfo[value.Namespace] = []*ServiceInfo{}
}

registryInfo[value.Namespace] = append(registryInfo[value.Namespace], &ServiceInfo{
info := &ServiceInfo{
ID: value.ID,
Name: value.Name,
Namespace: value.Namespace,
Instances: []*api.Instance{},
Ports: value.Ports,
})
}

if info.Ports == "" {
ports := x.namingServer.Cache().Instance().GetServicePorts(value.ID)
if len(ports) != 0 {
info.Ports = strings.Join(ports, ",")
}
}

registryInfo[value.Namespace] = append(registryInfo[value.Namespace], info)

return true, nil
}
Expand Down
92 changes: 78 additions & 14 deletions cache/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package cache

import (
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -55,26 +56,29 @@ type InstanceCache interface {
GetInstancesCount() int
// GetInstancesCountByServiceID 根据服务ID获取实例数
GetInstancesCountByServiceID(serviceID string) model.InstanceCount
// GetServicePorts
GetServicePorts(serviceID string) []string
}

// instanceCache 实例缓存的类
type instanceCache struct {
*baseCache

storage store.Store
lastMtime int64
lastMtimeLogged int64
firstUpdate bool
ids *sync.Map // instanceid -> instance
services *sync.Map // service id -> [instanceid ->instance]
instanceCounts *sync.Map // service id -> [instanceCount]
revisionCh chan *revisionNotify
disableBusiness bool
needMeta bool
systemServiceID []string
singleFlight *singleflight.Group
instanceCount int64
lastCheckAllTime int64
storage store.Store
lastMtime int64
lastMtimeLogged int64
firstUpdate bool
ids *sync.Map // instanceid -> instance
services *sync.Map // service id -> [instanceid ->instance]
instanceCounts *sync.Map // service id -> [instanceCount]
servicePortsBucket *servicePortsBucket
revisionCh chan *revisionNotify
disableBusiness bool
needMeta bool
systemServiceID []string
singleFlight *singleflight.Group
instanceCount int64
lastCheckAllTime int64
}

func init() {
Expand All @@ -96,6 +100,10 @@ func (ic *instanceCache) initialize(opt map[string]interface{}) error {
ic.ids = new(sync.Map)
ic.services = new(sync.Map)
ic.instanceCounts = new(sync.Map)
ic.servicePortsBucket = &servicePortsBucket{
lock: sync.RWMutex{},
servicePorts: make(map[string]map[string]struct{}),
}
ic.lastMtime = 0
ic.firstUpdate = true
if opt == nil {
Expand Down Expand Up @@ -183,6 +191,7 @@ func (ic *instanceCache) clear() error {
ic.ids = new(sync.Map)
ic.services = new(sync.Map)
ic.instanceCounts = new(sync.Map)
ic.servicePortsBucket.reset()
ic.instanceCount = 0
ic.lastMtime = 0
return nil
Expand Down Expand Up @@ -274,6 +283,9 @@ func (ic *instanceCache) setInstances(ins map[string]*model.Instance) (int, int)
value = new(sync.Map)
ic.services.Store(item.ServiceID, value)
}

ic.servicePortsBucket.appendPort(item.ServiceID, int(item.Port()))

value.(*sync.Map).Store(item.ID(), item)
}

Expand Down Expand Up @@ -403,6 +415,10 @@ func (ic *instanceCache) GetInstancesCount() int {
return count
}

func (ic *instanceCache) GetServicePorts(serviceID string) []string {
return ic.servicePortsBucket.listPort(serviceID)
}

// iteratorInstancesProc 迭代指定的instance数据,id->instance
func iteratorInstancesProc(data *sync.Map, iterProc InstanceIterProc) error {
var cont bool
Expand All @@ -418,3 +434,51 @@ func iteratorInstancesProc(data *sync.Map, iterProc InstanceIterProc) error {
data.Range(proc)
return err
}

type servicePortsBucket struct {
lock sync.RWMutex
//servicePorts service-id -> []port
servicePorts map[string]map[string]struct{}
}

func (b *servicePortsBucket) reset() {
b.lock.Lock()
defer b.lock.Unlock()

b.servicePorts = make(map[string]map[string]struct{})
}

func (b *servicePortsBucket) appendPort(serviceID string, port int) {
if serviceID == "" || port == 0 {
return
}

b.lock.Lock()
defer b.lock.Unlock()

if _, ok := b.servicePorts[serviceID]; !ok {
b.servicePorts[serviceID] = map[string]struct{}{}
}

ports := b.servicePorts[serviceID]
ports[strconv.FormatInt(int64(port), 10)] = struct{}{}
}

func (b *servicePortsBucket) listPort(serviceID string) []string {
b.lock.RLock()
defer b.lock.RUnlock()

ret := make([]string, 0, 4)

val, ok := b.servicePorts[serviceID]

if !ok {
return ret
}

for k := range val {
ret = append(ret, k)
}

return ret
}
54 changes: 54 additions & 0 deletions cache/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

v1 "github.com/polarismesh/polaris-server/common/api/v1"
"github.com/polarismesh/polaris-server/common/model"
Expand Down Expand Up @@ -222,6 +223,59 @@ func TestInstanceCache_GetInstance(t *testing.T) {
})
}

func TestInstanceCache_GetServicePorts(t *testing.T) {
ctl, storage, ic := newTestInstanceCache(t)
defer ctl.Finish()
t.Run("缓存有数据,可以正常获取到服务的端口列表", func(t *testing.T) {
_ = ic.clear()
instances := genModelInstances("my-services", 10)

ports := make(map[string][]string)

for i := range instances {
ins := instances[i]
if _, ok := ports[ins.ServiceID]; !ok {
ports[ins.ServiceID] = make([]string, 0, 4)
}

values := ports[ins.ServiceID]
find := false

for j := range values {
if values[j] == fmt.Sprintf("%d", ins.Port()) {
find = true
break
}
}

if !find {
values = append(values, fmt.Sprintf("%d", ins.Port()))
}

ports[ins.ServiceID] = values
}

gomock.InOrder(storage.EXPECT().
GetMoreInstances(gomock.Any(), ic.firstUpdate, ic.needMeta, ic.systemServiceID).
Return(instances, nil))
gomock.InOrder(storage.EXPECT().GetInstancesCount().Return(uint32(10), nil))
if err := ic.update(0); err != nil {
t.Fatalf("error: %s", err.Error())
}

for i := range instances {
ins := instances[i]

expectVal := ports[ins.ServiceID]
targetVal := ic.GetServicePorts(ins.ServiceID)

t.Logf("service-ports expectVal : %v, targetVal : %v", expectVal, targetVal)

assert.ElementsMatch(t, expectVal, targetVal)
}
})
}

const (
instances1Count = 50
instances2Count = 30
Expand Down

0 comments on commit 29d3816

Please sign in to comment.