Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .cursor/rules/tenant.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# 多租户开发规范

## 基本要求
- 所有功能都要兼容多租户模式
- 如果参数中需要传递租户ID,将租户ID放在第一个参数中(如果有 kit 或 context 则放在它们后面作为第二个参数)
- 示例: `func DoSomething(tenantID string, bizID uint32)`
- 示例: `func DoSomething(ctx context.Context, tenantID string, bizID uint32)`
- 示例: `func DoSomething(kit *kit.Kit, tenantID string, bizID uint32)`

## GORM Hook 机制
- 项目已在 `internal/dal/dao/set_tenant_id.go` 中实现 GORM hook,自动处理多租户过滤
- 查询时自动添加 `WHERE tenant_id = ?` 条件
- 创建/更新时自动设置 `tenant_id` 字段
- **关键**: hook 只在 `kit.TenantID` 非空时生效,因此调用方必须确保 kit 中设置了正确的 TenantID

## 定时任务多租户处理
- 定时任务使用 `kit.New()` 创建的 kit 默认没有 TenantID
- 多租户模式下需要按租户轮询执行:
- 进程管理相关同步(sync_cmdb、cmdb_resource_watcher):使用 `bkuser.ListEnabledTenants()` 获取全量租户
- 业务主机同步(sync_biz_host):使用 `App.GetDistinctTenantIDs()` 从 app 表获取租户
- 示例模式:
```go
if cc.DataService().FeatureFlags.EnableMultiTenantMode {
tenants, _ := bkuser.ListEnabledTenants(kt.Ctx)
for _, tenant := range tenants {
kt.TenantID = tenant.ID
// 执行租户相关操作
}
}
```

## 表结构要求
- 新增表必须包含 `tenant_id` 字段
- `tenant_id` 应作为联合主键或唯一索引的一部分
- 参考 `processes`、`process_instances` 等表的设计

## 外部 API 调用
- 调用蓝鲸组件 API 时需要在请求头中携带租户信息:`X-Bk-Tenant-Id: {tenantID}`
- 参考 `internal/components/bkuser/bkuser.go` 中的实现
52 changes: 42 additions & 10 deletions cmd/data-service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,16 +455,7 @@ func (ds *dataService) startCronTasks() {

// 在启动全量同步之前,先获取事件cursor,避免丢失全量同步期间发生的事件
timeAgo := time.Now().Add(-10 * time.Second).Unix()
if err := crontab.InitHostDetailCursor(ds.daoSet, ds.cmdb, timeAgo); err != nil {
logs.Errorf("init host detail cursor failed, err: %v", err)
// 初始化cursor失败则依赖后续定时任务重新获取,可能存在丢失事件的风险
// PASS
}
if err := crontab.InitBizHostCursor(ds.daoSet, ds.cmdb, timeAgo); err != nil {
logs.Errorf("init biz host cursor failed, err: %v", err)
// 初始化cursor失败则依赖后续定时任务重新获取,可能存在丢失事件的风险
// PASS
}
ds.initBizHostCursors(timeAgo)

crontabConfig := cc.DataService().Crontab
logs.Infof("crontabConfig: %+v", crontabConfig)
Expand Down Expand Up @@ -550,3 +541,44 @@ func (ds *dataService) startCronTasks() {
}

}

// initBizHostCursors 初始化业务主机相关的游标
func (ds *dataService) initBizHostCursors(timeAgo int64) {
kt := kit.New()

// 多租户模式:从 app 表获取租户列表并逐个初始化
if cc.DataService().FeatureFlags.EnableMultiTenantMode {
apps, err := ds.daoSet.App().GetDistinctTenantIDs(kt)
if err != nil {
logs.Errorf("get distinct tenant IDs failed, err: %v", err)
return
}

if len(apps) == 0 {
logs.Warnf("no tenants found in app table for init biz host cursors")
return
}

for _, app := range apps {
if app.Spec.TenantID == "" {
continue
}
tenantID := app.Spec.TenantID
if err := crontab.InitHostDetailCursor(tenantID, ds.daoSet, ds.cmdb, timeAgo); err != nil {
logs.Errorf("init host detail cursor failed for tenant %s, err: %v", tenantID, err)
}
if err := crontab.InitBizHostCursor(tenantID, ds.daoSet, ds.cmdb, timeAgo); err != nil {
logs.Errorf("init biz host cursor failed for tenant %s, err: %v", tenantID, err)
}
}
return
}

// 单租户模式
if err := crontab.InitHostDetailCursor("", ds.daoSet, ds.cmdb, timeAgo); err != nil {
logs.Errorf("init host detail cursor failed, err: %v", err)
}
if err := crontab.InitBizHostCursor("", ds.daoSet, ds.cmdb, timeAgo); err != nil {
logs.Errorf("init biz host cursor failed, err: %v", err)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Tencent is pleased to support the open source community by making Blueking Container Service available.
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
* Licensed under the MIT License (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
* http://opensource.org/licenses/MIT
* Unless required by applicable law or agreed to in writing, software distributed under
* the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

package migrations

import (
"gorm.io/gorm"

"github.com/TencentBlueKing/bk-bscp/cmd/data-service/db-migration/migrator"
)

func init() {
migrator.GetMigrator().AddMigration(&migrator.Migration{
Version: "20260122100000",
Name: "20260122100000_add_tenant_id_to_biz_hosts",
Mode: migrator.GormMode,
Up: mig20260122100000Up,
Down: mig20260122100000Down,
})
}

// mig20260122100000Up for up migration
func mig20260122100000Up(tx *gorm.DB) error {
// 1. 添加 tenant_id 列
if err := tx.Exec("ALTER TABLE biz_hosts ADD COLUMN tenant_id varchar(64) NOT NULL DEFAULT 'default' FIRST").Error; err != nil {
return err
}

// 2. 删除原有主键
if err := tx.Exec("ALTER TABLE biz_hosts DROP PRIMARY KEY").Error; err != nil {
return err
}

// 3. 创建新的复合主键 (tenant_id, bk_biz_id, bk_host_id)
if err := tx.Exec("ALTER TABLE biz_hosts ADD PRIMARY KEY (tenant_id, bk_biz_id, bk_host_id)").Error; err != nil {
return err
}

return nil
}

// mig20260122100000Down for down migration
func mig20260122100000Down(tx *gorm.DB) error {
// 1. 删除主键
if err := tx.Exec("ALTER TABLE biz_hosts DROP PRIMARY KEY").Error; err != nil {
return err
}

// 2. 恢复原有主键
if err := tx.Exec("ALTER TABLE biz_hosts ADD PRIMARY KEY (bk_biz_id, bk_host_id)").Error; err != nil {
return err
}

// 3. 删除 tenant_id 列
if err := tx.Exec("ALTER TABLE biz_hosts DROP COLUMN tenant_id").Error; err != nil {
return err
}

return nil
}
32 changes: 31 additions & 1 deletion cmd/data-service/service/crontab/cleanup_biz_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/TencentBlueKing/bk-bscp/internal/dal/dao"
"github.com/TencentBlueKing/bk-bscp/internal/runtime/shutdown"
"github.com/TencentBlueKing/bk-bscp/internal/serviced"
"github.com/TencentBlueKing/bk-bscp/pkg/cc"
"github.com/TencentBlueKing/bk-bscp/pkg/dal/table"
"github.com/TencentBlueKing/bk-bscp/pkg/kit"
"github.com/TencentBlueKing/bk-bscp/pkg/logs"
Expand Down Expand Up @@ -92,12 +93,41 @@ func (c *CleanupBizHost) Run() {
continue
}
logs.Infof("starts to cleanup invalid biz host relationships")
c.cleanupBizHost(kt)
c.cleanupBizHostByTenant(kt)
}
}
}()
}

// cleanupBizHostByTenant 按租户清理无效的业务主机关系
func (c *CleanupBizHost) cleanupBizHostByTenant(kt *kit.Kit) {
// 多租户模式:从 app 表获取租户列表并逐个清理
if cc.DataService().FeatureFlags.EnableMultiTenantMode {
apps, err := c.set.App().GetDistinctTenantIDs(kt)
if err != nil {
logs.Errorf("get distinct tenant IDs failed, err: %v", err)
return
}

if len(apps) == 0 {
logs.Warnf("no tenants found in app table for cleanup biz host")
return
}

for _, app := range apps {
if app.Spec.TenantID == "" {
continue
}
kt.TenantID = app.Spec.TenantID
c.cleanupBizHost(kt)
}
return
}

// 单租户模式
c.cleanupBizHost(kt)
}

// cleanupBizHost cleanup invalid biz host relationships
func (c *CleanupBizHost) cleanupBizHost(kt *kit.Kit) {
c.mutex.Lock()
Expand Down
68 changes: 52 additions & 16 deletions cmd/data-service/service/crontab/cmdb_resource_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (

"github.com/TencentBlueKing/bk-bscp/cmd/data-service/service"
"github.com/TencentBlueKing/bk-bscp/internal/components/bkcmdb"
"github.com/TencentBlueKing/bk-bscp/internal/components/bkuser"
gsecomponents "github.com/TencentBlueKing/bk-bscp/internal/components/gse"
"github.com/TencentBlueKing/bk-bscp/internal/dal/dao"
"github.com/TencentBlueKing/bk-bscp/internal/dal/gen"
"github.com/TencentBlueKing/bk-bscp/internal/processor/cmdb"
"github.com/TencentBlueKing/bk-bscp/internal/processor/gse"
"github.com/TencentBlueKing/bk-bscp/internal/runtime/shutdown"
"github.com/TencentBlueKing/bk-bscp/internal/serviced"
"github.com/TencentBlueKing/bk-bscp/pkg/cc"
"github.com/TencentBlueKing/bk-bscp/pkg/dal/table"
"github.com/TencentBlueKing/bk-bscp/pkg/kit"
"github.com/TencentBlueKing/bk-bscp/pkg/logs"
Expand Down Expand Up @@ -83,27 +85,61 @@ func (c *cmdbResourceWatcher) Run() {
logs.Infof("current service instance is slave, skip sync cmdb")
continue
}
// 顺序监听每种资源类型
for _, res := range []bkcmdb.ResourceType{
bkcmdb.ResourceSet,
bkcmdb.ResourceModule,
bkcmdb.ResourceProcess,
} {
if err := c.watchCMDBResources(kt, res); err != nil {
logs.Errorf("[CMDB Watch] watch %s resource failed: %v", res.String(), err)
}
}

c.watchCMDBResourcesByTenant(kt)
}

}
}()
}

// watchCMDBResourcesByTenant 按租户监听 CMDB 资源变化
func (c *cmdbResourceWatcher) watchCMDBResourcesByTenant(kt *kit.Kit) {
// 多租户模式:获取所有启用的租户并逐个监听
if cc.DataService().FeatureFlags.EnableMultiTenantMode {
tenants, err := bkuser.ListEnabledTenants(kt.Ctx)
if err != nil {
logs.Errorf("[CMDB Watch] failed to list tenants: %v", err)
return
}

if len(tenants) == 0 {
logs.Warnf("[CMDB Watch] no enabled tenants found")
return
}

for _, tenant := range tenants {
kt.TenantID = tenant.ID
c.watchResourcesForTenant(kt)
}
return
}

// 单租户模式
c.watchResourcesForTenant(kt)
}

// watchResourcesForTenant 为单个租户监听所有资源类型
func (c *cmdbResourceWatcher) watchResourcesForTenant(kt *kit.Kit) {
// 顺序监听每种资源类型
for _, res := range []bkcmdb.ResourceType{
bkcmdb.ResourceSet,
bkcmdb.ResourceModule,
bkcmdb.ResourceProcess,
} {
if err := c.watchCMDBResources(kt, res); err != nil {
logs.Errorf("[CMDB Watch] tenant=%s watch %s resource failed: %v", kt.TenantID, res.String(), err)
}
}
}

// watchCMDBResources 监听并处理指定资源类型
func (c *cmdbResourceWatcher) watchCMDBResources(kt *kit.Kit, resource bkcmdb.ResourceType) error {
fields := []string{}
// 生成带租户前缀的游标 key
cursorKey := fmt.Sprintf("resource:%s:cursor", resource.String())
if kt.TenantID != "" {
cursorKey = fmt.Sprintf("%s-%s", kt.TenantID, cursorKey)
}
var cursor string
switch resource {
case bkcmdb.ResourceSet:
Expand Down Expand Up @@ -321,15 +357,15 @@ func (c *cmdbResourceWatcher) handleProcessEvent(kt *kit.Kit, resource bkcmdb.Bk

// 如果是空全量同步
if procs == nil {
if err := c.svc.SynchronizeCmdbData(kt.Ctx, []int{p.BkBizID}); err != nil {
if err := c.svc.SynchronizeCmdbData(kt.Ctx, kt.TenantID, []int{p.BkBizID}); err != nil {
logs.Errorf("sync cmdb data failed: %v", err)
}
return
}

switch resource.BkEventType {
case bkcmdb.EventCreate:
if err := c.svc.SynchronizeCmdbData(kt.Ctx, []int{p.BkBizID}); err != nil {
if err := c.svc.SynchronizeCmdbData(kt.Ctx, procs.Attachment.TenantID, []int{p.BkBizID}); err != nil {
logs.Errorf("%s sync cmdb data failed: %v", logPrefix, err)
return
}
Expand Down Expand Up @@ -494,11 +530,11 @@ func (c *cmdbResourceWatcher) handleProcessUpdate(kt *kit.Kit, tx *gen.QueryTx,
// 6. 插入新增实例扩容
if len(addInsts) > 0 {
// 更新gse状态可以忽略错误
g := gse.NewSyncGESService(int(newP.Attachment.BizID), c.gse, c.dao)
g := gse.NewSyncGESService(newP.Attachment.TenantID, int(newP.Attachment.BizID), c.gse, c.dao)
insts, err := g.SyncSingleProcessStatus(kt.Ctx, newP, addInsts)
if err != nil {
logs.Errorf("[ProcessSync] biz=%d: sync gse process status failed: %v",
old.Attachment.BizID, err)
logs.Errorf("[ProcessSync] tenant=%s biz=%d: sync gse process status failed: %v",
newP.Attachment.TenantID, old.Attachment.BizID, err)
}
if err := c.dao.ProcessInstance().BatchCreateWithTx(kt, tx, insts); err != nil {
return fmt.Errorf(
Expand Down
Loading
Loading