diff --git a/apiserver/eurekaserver/write_test.go b/apiserver/eurekaserver/write_test.go index 255cb7d17..1a52548a9 100644 --- a/apiserver/eurekaserver/write_test.go +++ b/apiserver/eurekaserver/write_test.go @@ -109,6 +109,10 @@ func TestEurekaServer_renew(t *testing.T) { insId: ins, disableBeatInsId: disableBeatIns, }, nil) + mockStore.EXPECT(). + GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes(). + Return(nil, nil) mockStore.EXPECT().StartReadTx().Return(mockTx, nil).AnyTimes() mockStore.EXPECT(). GetMoreServices(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). diff --git a/cache/service/instance.go b/cache/service/instance.go index 80c60a5f0..cd6001597 100644 --- a/cache/service/instance.go +++ b/cache/service/instance.go @@ -51,7 +51,9 @@ type instanceCache struct { // service id -> [instanceid ->instance] services *utils.SyncMap[string, *model.ServiceInstances] // service id -> [instanceCount] - instanceCounts *utils.SyncMap[string, *model.InstanceCount] + instanceCounts *utils.SyncMap[string, *model.InstanceCount] + // service id -> [instanceConsole] + instanceConsoles *utils.SyncMap[string, *model.InstanceConsole] instancePorts *instancePorts disableBusiness bool needMeta bool @@ -78,6 +80,7 @@ func (ic *instanceCache) Initialize(opt map[string]interface{}) error { ic.ids = utils.NewSyncMap[string, *model.Instance]() ic.services = utils.NewSyncMap[string, *model.ServiceInstances]() ic.instanceCounts = utils.NewSyncMap[string, *model.InstanceCount]() + ic.instanceConsoles = utils.NewSyncMap[string, *model.InstanceConsole]() ic.instancePorts = newInstancePorts() if opt == nil { return nil @@ -192,6 +195,17 @@ func (ic *instanceCache) handleUpdate(start time.Time, tx store.Tx) ([]*eventhub return nil, nil, -1, err } + instanceConsoles, err := ic.storage.GetMoreInstanceConsoles(tx, ic.LastFetchTime(), ic.IsFirstUpdate(), + ic.needMeta, ic.systemServiceID) + if err != nil { + log.Error("[Cache][InstanceConsole] update get storage more", zap.Error(err)) + return nil, nil, -1, err + } + for _, item := range instanceConsoles { + //Todo: check validation + ic.instanceConsoles.Store(item.Id, item) + } + events, lastMtimes, update, del := ic.setInstances(instances) log.Info("[Cache][Instance] get more instances", zap.Int("pull-from-store", len(instances)), zap.Int("update", update), zap.Int("delete", del), @@ -205,6 +219,7 @@ func (ic *instanceCache) Clear() error { ic.ids = utils.NewSyncMap[string, *model.Instance]() ic.services = utils.NewSyncMap[string, *model.ServiceInstances]() ic.instanceCounts = utils.NewSyncMap[string, *model.InstanceCount]() + ic.instanceConsoles = utils.NewSyncMap[string, *model.InstanceConsole]() ic.instancePorts.reset() ic.instanceCount = 0 return nil @@ -434,6 +449,20 @@ func (ic *instanceCache) GetInstance(instanceID string) *model.Instance { return value } +// GetInstanceConsole 根据实例ID获取实例数据 +func (ic *instanceCache) GetInstanceConsole(instanceConsoleID string) *model.InstanceConsole { + if instanceConsoleID == "" { + return nil + } + + value, ok := ic.instanceConsoles.Load(instanceConsoleID) + if !ok { + return nil + } + + return value +} + // GetInstances 根据服务名获取实例,先查找服务名对应的服务ID,再找实例列表 func (ic *instanceCache) GetInstances(serviceID string) *model.ServiceInstances { if serviceID == "" { diff --git a/cache/service/instance_test.go b/cache/service/instance_test.go index 0b6ecf8b5..e03a2533b 100644 --- a/cache/service/instance_test.go +++ b/cache/service/instance_test.go @@ -94,6 +94,21 @@ func genModelInstances(label string, total int) map[string]*model.Instance { return out } +func genModelInstancesConsole(label string, total int) map[string]*model.InstanceConsole { + out := make(map[string]*model.InstanceConsole) + for i := 0; i < total; i++ { + entry := &model.InstanceConsole{ + Id: fmt.Sprintf("InstanceConsole-%s-%d", label, i), + Isolate: false, + Weight: 100, + Metadata: "Metadata", + } + out[entry.Id] = entry + } + + return out +} + // 对instanceCache的缓存数据进行计数统计 func iteratorInstances(ic *instanceCache) (int, int) { instancesCount := 0 @@ -118,6 +133,7 @@ func TestInstanceCache_Update(t *testing.T) { ret := make(map[string]*model.Instance) instances1 := genModelInstances("service1", 10) // 每次gen为一个服务的 instances2 := genModelInstances("service2", 5) + instanceConsoles := genModelInstancesConsole("console", 3) for id, instance := range instances1 { ret[id] = instance @@ -129,13 +145,17 @@ func TestInstanceCache_Update(t *testing.T) { gomock.InOrder(storage.EXPECT(). GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). Return(ret, nil)) + gomock.InOrder(storage.EXPECT(). + GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). + Return(instanceConsoles, nil)) gomock.InOrder(storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(15), nil)) if err := ic.Update(); err != nil { t.Fatalf("error: %s", err.Error()) } servicesCount, instancesCount := iteratorInstances(ic) - if servicesCount == 2 && instancesCount == 10+5 { // gen两次,有两个不同服务 + instanceConsoleCounts := ic.instanceConsoles.Len() + if servicesCount == 2 && instancesCount == 10+5 && instanceConsoleCounts == 3 { // gen两次,有两个不同服务 t.Logf("pass") } else { t.Fatalf("error: %d, %d", servicesCount, instancesCount) @@ -147,12 +167,16 @@ func TestInstanceCache_Update(t *testing.T) { gomock.InOrder(storage.EXPECT(). GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). Return(nil, nil)) + gomock.InOrder(storage.EXPECT(). + GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). + Return(nil, nil)) if err := ic.Update(); err != nil { t.Fatalf("error: %s", err.Error()) } servicesCount, instancesCount := iteratorInstances(ic) - if servicesCount != 0 || instancesCount != 0 { + instanceConsoleCounts := ic.instanceConsoles.Len() + if servicesCount != 0 || instancesCount != 0 || instanceConsoleCounts != 0 { t.Fatalf("error: %d %d", servicesCount, instancesCount) } }) @@ -160,6 +184,7 @@ func TestInstanceCache_Update(t *testing.T) { t.Run("lastMtime可以正常更新", func(t *testing.T) { _ = ic.Clear() instances := genModelInstances("services", 10) + instanceConsoles := genModelInstancesConsole("console", 3) maxMtime := time.Now() instances[fmt.Sprintf("instanceID-%s-%d", "services", 5)].ModifyTime = maxMtime @@ -167,6 +192,9 @@ func TestInstanceCache_Update(t *testing.T) { storage.EXPECT(). GetMoreInstances(gomock.Any(), gomock.Any(), gomock.Any(), ic.needMeta, ic.systemServiceID). Return(instances, nil), + storage.EXPECT(). + GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). + Return(instanceConsoles, nil), storage.EXPECT().GetUnixSecond(gomock.Any()).Return(maxMtime.Unix(), nil).AnyTimes(), ) if err := ic.Update(); err != nil { @@ -188,6 +216,9 @@ func TestInstanceCache_Update2(t *testing.T) { gomock.InOrder(storage.EXPECT(). GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). Return(nil, fmt.Errorf("storage get error"))) + gomock.InOrder(storage.EXPECT(). + GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). + Return(nil, nil)) gomock.InOrder(storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(0), fmt.Errorf("storage get error"))) if err := ic.Update(); err != nil { t.Logf("pass: %s", err.Error()) @@ -199,9 +230,13 @@ func TestInstanceCache_Update2(t *testing.T) { t.Run("更新数据,再删除部分数据,缓存正常", func(t *testing.T) { _ = ic.Clear() instances := genModelInstances("service-a", 20) + instanceConsoles := genModelInstancesConsole("console", 3) gomock.InOrder(storage.EXPECT(). GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). Return(instances, nil)) + gomock.InOrder(storage.EXPECT(). + GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). + Return(instanceConsoles, nil)) if err := ic.Update(); err != nil { t.Fatalf("error: %s", err.Error()) } @@ -218,6 +253,9 @@ func TestInstanceCache_Update2(t *testing.T) { gomock.InOrder(storage.EXPECT(). GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). Return(instances, nil)) + gomock.InOrder(storage.EXPECT(). + GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). + Return(instanceConsoles, nil)) if err := ic.Update(); err != nil { t.Fatalf("error: %s", err.Error()) } @@ -231,6 +269,7 @@ func TestInstanceCache_Update2(t *testing.T) { t.Run("对账发现缓存数据数量和存储层不一致", func(t *testing.T) { _ = ic.Clear() instances := genModelInstances("service-a", 20) + queryCount := int32(0) storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(0), nil).AnyTimes() storage.EXPECT(). @@ -256,9 +295,14 @@ func TestInstanceCache_GetInstance(t *testing.T) { t.Run("缓存有数据,可以正常获取到数据", func(t *testing.T) { _ = ic.Clear() instances := genModelInstances("my-services", 10) + instanceConsoles := genModelInstancesConsole("console", 3) + gomock.InOrder(storage.EXPECT(). GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). Return(instances, nil)) + gomock.InOrder(storage.EXPECT(). + GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). + Return(instanceConsoles, nil)) gomock.InOrder(storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(10), nil)) if err := ic.Update(); err != nil { t.Fatalf("error: %s", err.Error()) @@ -271,6 +315,14 @@ func TestInstanceCache_GetInstance(t *testing.T) { if instance := ic.GetInstance("test-instance-xx"); instance != nil { t.Fatalf("error") } + + if instanceConsole := ic.GetInstanceConsole(instanceConsoles[fmt.Sprintf("InstanceConsole-%s-%d", "console", 2)].Id); instanceConsole == nil { + t.Fatalf("error") + } + + if instanceConsole := ic.GetInstance("test-instanceConsole-xx"); instanceConsole != nil { + t.Fatalf("error") + } }) } @@ -280,6 +332,7 @@ func TestInstanceCache_GetServicePorts(t *testing.T) { t.Run("缓存有数据,可以正常获取到服务的端口列表", func(t *testing.T) { _ = ic.Clear() instances := genModelInstances("my-services", 10) + instanceConsoles := genModelInstancesConsole("console", 3) ports := make(map[string][]*model.ServicePort) @@ -311,6 +364,9 @@ func TestInstanceCache_GetServicePorts(t *testing.T) { gomock.InOrder(storage.EXPECT(). GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). Return(instances, nil)) + gomock.InOrder(storage.EXPECT(). + GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). + Return(instanceConsoles, nil)) gomock.InOrder(storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(10), nil)) if err := ic.Update(); err != nil { t.Fatalf("error: %s", err.Error()) @@ -333,6 +389,7 @@ func TestInstanceCache_fillIntrnalLabels(t *testing.T) { t.Run("向实例Metadata中自动注入北极星默认label信息", func(t *testing.T) { _ = ic.Clear() instances := genModelInstances("inject-internal-label", 10) + instanceConsoles := genModelInstancesConsole("console", 3) ports := make(map[string][]string) @@ -362,6 +419,9 @@ func TestInstanceCache_fillIntrnalLabels(t *testing.T) { gomock.InOrder(storage.EXPECT(). GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). Return(instances, nil)) + gomock.InOrder(storage.EXPECT(). + GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). + Return(instanceConsoles, nil)) gomock.InOrder(storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(10), nil)) if err := ic.Update(); err != nil { t.Fatalf("error: %s", err.Error()) @@ -393,6 +453,7 @@ func TestGetInstancesByServiceID(t *testing.T) { instances1 := genModelInstances("my-services", instances1Count) instances2 := genModelInstances("my-services-a", instances2Count) // instances2 = append(instances2, instances1...) + instanceConsoles := genModelInstancesConsole("console", 3) ret := make(map[string]*model.Instance) for id, instance := range instances1 { @@ -405,6 +466,9 @@ func TestGetInstancesByServiceID(t *testing.T) { gomock.InOrder(storage.EXPECT(). GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). Return(ret, nil)) + gomock.InOrder(storage.EXPECT(). + GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID). + Return(instanceConsoles, nil)) gomock.InOrder(storage.EXPECT(). GetInstancesCountTx(gomock.Any()). Return(uint32(instances1Count+instances2Count), nil)) diff --git a/cache/service/service_test.go b/cache/service/service_test.go index b9015e3ec..dfc3f20bd 100644 --- a/cache/service/service_test.go +++ b/cache/service/service_test.go @@ -406,6 +406,7 @@ func TestServiceCache_GetServicesByFilter(t *testing.T) { mockStore.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(len(instances)), nil).AnyTimes() mockStore.EXPECT().GetMoreServices(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(services, nil).AnyTimes() mockStore.EXPECT().GetMoreInstances(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(instances, nil).AnyTimes() + mockStore.EXPECT().GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() ic.setInstances(instances) hostToService := make(map[string]string) diff --git a/common/model/naming.go b/common/model/naming.go index 9394ca3f9..ff5a2add1 100644 --- a/common/model/naming.go +++ b/common/model/naming.go @@ -928,6 +928,14 @@ type InstanceEvent struct { MetaData map[string]string } +// instance info from console +type InstanceConsole struct { + Id string + Isolate bool + Weight int8 + Metadata string +} + // InjectMetadata 从context中获取metadata并注入到事件对象 func (i *InstanceEvent) InjectMetadata(ctx context.Context) { value := ctx.Value(CtxEventKeyMetadata) diff --git a/store/boltdb/instance.go b/store/boltdb/instance.go index e97ecbeb2..a92fde950 100644 --- a/store/boltdb/instance.go +++ b/store/boltdb/instance.go @@ -572,6 +572,31 @@ func (i *instanceStore) GetMoreInstances(tx store.Tx, mtime time.Time, firstUpda return toInstance(instances), nil } +// GetMoreInstancesConsoles 获取增量修改数据 +func (ins *instanceStore) GetMoreInstanceConsoles(tx store.Tx, mtime time.Time, firstUpdate, needMeta bool, + serviceID []string) (map[string]*model.InstanceConsole, error) { + //Todo: get from boltdb + return nil, nil +} + +// DeleteInstanceConsole 逻辑删除instanceConsole +func (ins *instanceStore) DeleteInstanceConsole(instanceConsoleID string) error { + //Todo: delete from boltdb + return nil +} + +// CleanInstanceConsole 物理删除instanceConsole +func (ins *instanceStore) CleanInstanceConsole(instanceConsoleID string) error { + //Todo: clean from boltdb + return nil +} + +// UpdateInstanceConsole 更新instanceConsole +func (ins *instanceStore) UpdateInstanceConsole(instanceConsole *model.InstanceConsole) error { + //Todo: update from boltdb + return nil +} + // BatchSetInstanceHealthStatus 批量设置实例的健康状态 func (i *instanceStore) BatchSetInstanceHealthStatus(ids []interface{}, healthy int, revision string) error { for _, id := range ids { diff --git a/store/discover_api.go b/store/discover_api.go index 20a702309..2ac810559 100644 --- a/store/discover_api.go +++ b/store/discover_api.go @@ -94,12 +94,15 @@ type InstanceStore interface { BatchAddInstances(instances []*model.Instance) error // UpdateInstance 更新实例 UpdateInstance(instance *model.Instance) error + UpdateInstanceConsole(instanceConsole *model.InstanceConsole) error // DeleteInstance 删除一个实例,实际是把valid置为false DeleteInstance(instanceID string) error + DeleteInstanceConsole(instanceConsoleID string) error // BatchDeleteInstances 批量删除实例,flag=1 BatchDeleteInstances(ids []interface{}) error // CleanInstance 清空一个实例,真正删除 CleanInstance(instanceID string) error + CleanInstanceConsole(instanceConsoleID string) error // BatchGetInstanceIsolate 检查ID是否存在,并且返回存在的ID,以及ID的隔离状态 BatchGetInstanceIsolate(ids map[string]bool) (map[string]bool, error) // GetInstancesBrief 获取实例关联的token @@ -118,6 +121,7 @@ type InstanceStore interface { // GetMoreInstances 根据mtime获取增量instances,返回所有store的变更信息 // 此方法用于 cache 增量更新,需要注意 mtime 应为数据库时间戳 GetMoreInstances(tx Tx, mtime time.Time, firstUpdate, needMeta bool, serviceID []string) (map[string]*model.Instance, error) + GetMoreInstanceConsoles(tx Tx, mtime time.Time, firstUpdate, needMeta bool, serviceID []string) (map[string]*model.InstanceConsole, error) // SetInstanceHealthStatus 设置实例的健康状态 SetInstanceHealthStatus(instanceID string, flag int, revision string) error // BatchSetInstanceHealthStatus 批量设置实例的健康状态 diff --git a/store/mock/api_mock.go b/store/mock/api_mock.go index 57ffee0af..ad9f0c98a 100644 --- a/store/mock/api_mock.go +++ b/store/mock/api_mock.go @@ -451,6 +451,20 @@ func (mr *MockStoreMockRecorder) CleanInstance(instanceID interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanInstance", reflect.TypeOf((*MockStore)(nil).CleanInstance), instanceID) } +// CleanInstanceConsole mocks base method. +func (m *MockStore) CleanInstanceConsole(instanceConsoleID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CleanInstanceConsole", instanceConsoleID) + ret0, _ := ret[0].(error) + return ret0 +} + +// CleanInstanceConsole indicates an expected call of CleanInstanceConsole. +func (mr *MockStoreMockRecorder) CleanInstanceConsole(instanceConsoleID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanInstanceConsole", reflect.TypeOf((*MockStore)(nil).CleanInstanceConsole), instanceConsoleID) +} + // CleanPrincipalPolicies mocks base method. func (m *MockStore) CleanPrincipalPolicies(tx store.Tx, p auth.Principal) error { m.ctrl.T.Helper() @@ -836,6 +850,20 @@ func (mr *MockStoreMockRecorder) DeleteInstance(instanceID interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteInstance", reflect.TypeOf((*MockStore)(nil).DeleteInstance), instanceID) } +// DeleteInstanceConsole mocks base method. +func (m *MockStore) DeleteInstanceConsole(instanceConsoleID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteInstanceConsole", instanceConsoleID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteInstanceConsole indicates an expected call of DeleteInstanceConsole. +func (mr *MockStoreMockRecorder) DeleteInstanceConsole(instanceConsoleID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteInstanceConsole", reflect.TypeOf((*MockStore)(nil).DeleteInstanceConsole), instanceConsoleID) +} + // DeleteLaneGroup mocks base method. func (m *MockStore) DeleteLaneGroup(id string) error { m.ctrl.T.Helper() @@ -1577,6 +1605,21 @@ func (mr *MockStoreMockRecorder) GetMoreConfigGroup(firstUpdate, mtime interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMoreConfigGroup", reflect.TypeOf((*MockStore)(nil).GetMoreConfigGroup), firstUpdate, mtime) } +// GetMoreInstanceConsoles mocks base method. +func (m *MockStore) GetMoreInstanceConsoles(tx store.Tx, mtime time.Time, firstUpdate, needMeta bool, serviceID []string) (map[string]*model.InstanceConsole, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMoreInstanceConsoles", tx, mtime, firstUpdate, needMeta, serviceID) + ret0, _ := ret[0].(map[string]*model.InstanceConsole) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMoreInstanceConsoles indicates an expected call of GetMoreInstanceConsoles. +func (mr *MockStoreMockRecorder) GetMoreInstanceConsoles(tx, mtime, firstUpdate, needMeta, serviceID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMoreInstanceConsoles", reflect.TypeOf((*MockStore)(nil).GetMoreInstanceConsoles), tx, mtime, firstUpdate, needMeta, serviceID) +} + // GetMoreGrayResouces mocks base method. func (m *MockStore) GetMoreGrayResouces(firstUpdate bool, mtime time.Time) ([]*model.GrayResource, error) { m.ctrl.T.Helper() @@ -2741,6 +2784,20 @@ func (mr *MockStoreMockRecorder) UpdateInstance(instance interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateInstance", reflect.TypeOf((*MockStore)(nil).UpdateInstance), instance) } +// UpdateInstanceConsole mocks base method. +func (m *MockStore) UpdateInstanceConsole(instanceConsole *model.InstanceConsole) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateInstanceConsole", instanceConsole) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateInstanceConsole indicates an expected call of UpdateInstanceConsole. +func (mr *MockStoreMockRecorder) UpdateInstanceConsole(instanceConsole interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateInstanceConsole", reflect.TypeOf((*MockStore)(nil).UpdateInstanceConsole), instanceConsole) +} + // UpdateLaneGroup mocks base method. func (m *MockStore) UpdateLaneGroup(tx store.Tx, item *model.LaneGroup) error { m.ctrl.T.Helper() diff --git a/store/mysql/instance.go b/store/mysql/instance.go index b55221d97..0baf7488f 100644 --- a/store/mysql/instance.go +++ b/store/mysql/instance.go @@ -508,6 +508,51 @@ func (ins *instanceStore) GetMoreInstances(tx store.Tx, mtime time.Time, firstUp return instances, nil } +// GetMoreInstancesConsoles 获取增量修改数据 +func (ins *instanceStore) GetMoreInstanceConsoles(tx store.Tx, mtime time.Time, firstUpdate, needMeta bool, + serviceID []string) (map[string]*model.InstanceConsole, error) { + + dbTx, _ := tx.GetDelegateTx().(*BaseTx) + + str := `select id, isolate, weight, metadata from instance_console where instance_console.mtime >= FROM_UNIXTIME(?) ` + + if firstUpdate { + str += " and flag != 1" + } + + rows, err := dbTx.Query(str, timeToTimestamp(mtime)) + if err != nil { + log.Errorf("[Store][database] get more instanceConsoles query err: %s", err.Error()) + return nil, err + } + + out := make(map[string]*model.InstanceConsole) + + if rows == nil { + return out, err + } + + defer rows.Close() + var item model.InstanceConsole + + for rows.Next() { + err := rows.Scan(&item.Id, &item.Isolate, &item.Metadata, &item.Weight) + if err != nil { + log.Errorf("[Store][database] fetch instanceConsole rows err: %s", err.Error()) + return out, err + } + + out[item.Id] = &item + } + + if err := rows.Err(); err != nil { + log.Errorf("[Store][database] instanceConsole rows catch err: %s", err.Error()) + return out, err + } + + return out, err +} + // GetInstanceMeta 根据实例ID获取实例的metadata func (ins *instanceStore) GetInstanceMeta(instanceID string) (map[string]string, error) { str := "select `mkey`, `mvalue` from instance_metadata where id = ?" @@ -1014,6 +1059,82 @@ func batchAddInstanceCheck(tx *BaseTx, instances []*model.Instance) error { return err } +// addInstanceConsole +func addInstanceConsole(tx *BaseTx, instanceConsole *model.InstanceConsole) error { + + str := "insert into instance_console(`id`, `isolate`, `weight`, `metadata`, `ctime`, `mtime`) values " + str += "(?, ?, ?, ?, sysdate(), sysdate())" + + _, err := tx.Exec(str, instanceConsole.Id, instanceConsole.Isolate, instanceConsole.Metadata, instanceConsole.Weight) + + return err +} + +// DeleteInstanceConsole 逻辑删除instanceConsole +func (ins *instanceStore) DeleteInstanceConsole(instanceConsoleID string) error { + if instanceConsoleID == "" { + return errors.New("delete InstanceConsole Missing instanceConsole id") + } + return RetryTransaction("deleteInstanceConsole", func() error { + return ins.master.processWithTransaction("deleteInstance", func(tx *BaseTx) error { + str := "update instance_console set flag = 1, mtime = sysdate() where `id` = ?" + if _, err := tx.Exec(str, instanceConsoleID); err != nil { + return store.Error(err) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] delete instanceConsole commit tx err: %s", err.Error()) + return err + } + + return nil + }) + }) +} + +// CleanInstanceConsole 物理删除instanceConsole +func (ins *instanceStore) CleanInstanceConsole(instanceConsoleID string) error { + if instanceConsoleID == "" { + return errors.New("clean InstanceConsole Missing instanceConsole id") + } + return RetryTransaction("cleanInstanceConsole", func() error { + return ins.master.processWithTransaction("cleanInstanceConsole", func(tx *BaseTx) error { + str := "delete from instance_console where id = ? and flag = 1" + if _, err := tx.Exec(str, instanceConsoleID); err != nil { + return store.Error(err) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] clean instanceConsole commit tx err: %s", err.Error()) + return err + } + + return nil + }) + }) +} + +// UpdateInstanceConsole 更新instanceConsole +func (ins *instanceStore) UpdateInstanceConsole(instanceConsole *model.InstanceConsole) error { + + return RetryTransaction("UpdateInstanceConsole", func() error { + return ins.master.processWithTransaction("UpdateInstanceConsole", func(tx *BaseTx) error { + str := `update instance_console set isolate = ?, weight = ?, metadata = ?, mtime = sysdate() where id = ?` + + if _, err := tx.Exec(str, instanceConsole.Isolate, instanceConsole.Weight, instanceConsole.Metadata, instanceConsole.Id); err != nil { + return store.Error(err) + } + + if err := tx.Commit(); err != nil { + log.Errorf("[Store][database] update instanceConsole commit tx err: %s", err.Error()) + return err + } + + return nil + }) + }) +} + // addInstanceMeta 往表中加入instance meta数据 func addInstanceMeta(tx *BaseTx, id string, meta map[string]string) error { if len(meta) == 0 { diff --git a/store/mysql/scripts/polaris_server.sql b/store/mysql/scripts/polaris_server.sql index f7a470a58..ab5818fd7 100644 --- a/store/mysql/scripts/polaris_server.sql +++ b/store/mysql/scripts/polaris_server.sql @@ -61,6 +61,23 @@ CREATE TABLE KEY `host` (`host`) ) ENGINE = InnoDB; +-- -------------------------------------------------------- +-- +-- Table structure `instance_console` +-- +CREATE TABLE `instance_console` +( + `id` VARCHAR(128) NOT NULL COMMENT 'Unique ID', + `isolate` TINYINT(4) DEFAULT NULL COMMENT 'Example isolation status flag, 0 is not isolated, 1 is isolated', + `weight` SMALLINT(6) DEFAULT NULL COMMENT 'The weight of the instance is mainly used for LoadBalance, default is 100', + `metadata` LONGTEXT COLLATE utf8_bin DEFAULT NULL COMMENT 'Instance metadata', + `flag` TINYINT(4) NOT NULL DEFAULT '0' COMMENT 'Logic delete flag, 0 means visible, 1 means that it has been logically deleted', + `ctime` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', + `mtime` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Last updated time', + PRIMARY KEY (`id`), + KEY `mtime` (`mtime`) +) ENGINE = InnoDB; + -- -------------------------------------------------------- -- -- Table structure `health_check`