Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 59 additions & 10 deletions dbm-services/k8s-dbs/core/provider/cluster_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
coreconst "k8s-dbs/core/constant"
coreentity "k8s-dbs/core/entity"
coreutil "k8s-dbs/core/util"
corevalidator "k8s-dbs/core/validator"
infrautil "k8s-dbs/infrastructure/util"
metaentity "k8s-dbs/metadata/entity"
metaprovider "k8s-dbs/metadata/provider"
Expand Down Expand Up @@ -68,6 +69,7 @@ type ClusterProvider struct {
clusterHelmRepoProvider metaprovider.AddonClusterHelmRepoProvider
ClusterTagProvider metaprovider.K8sCrdClusterTagProvider
dbmAPIService *thirdapi.DbmAPIService
envValidator *corevalidator.EnvValidator
}

// ClusterProviderOptions ClusterProvider 的函数选项
Expand Down Expand Up @@ -157,6 +159,15 @@ func (c *ClusterProviderBuilder) WithDbmAPIService(
}
}

// WithEnvValidator 设置 EnvValidator
func (c *ClusterProviderBuilder) WithEnvValidator(
validator *corevalidator.EnvValidator,
) ClusterProviderOptions {
return func(c *ClusterProvider) {
c.envValidator = validator
}
}

// validateProvider 验证 ClusterProvider 必要字段
func (c *ClusterProvider) validateProvider() error {
if c.clusterMetaProvider == nil {
Expand Down Expand Up @@ -216,9 +227,14 @@ func InstanceSetGVR() schema.GroupVersionResource {
// CreateCluster 创建集群
func (c *ClusterProvider) CreateCluster(ctx *commentity.DbsContext, request *coreentity.Request) error {
// 检查集群版本
if err := c.checkClusterVersion(request); err != nil {
addonID, err := c.checkClusterVersion(request)
if err != nil {
return err
}
// 验证环境变量参数
if err := c.validateComponentEnv(addonID, request); err != nil {
return dbserrors.NewK8sDbsError(dbserrors.ParameterInvalidError, err)
}
// 检查是否重复创建
k8sClusterConfig, err := c.clusterConfigProvider.FindConfigByName(request.K8sClusterName)
if err != nil {
Expand Down Expand Up @@ -298,35 +314,37 @@ func (c *ClusterProvider) CreateCluster(ctx *commentity.DbsContext, request *cor
//
// 返回值:
//
// uint64 - 匹配到的 addon ID
// error - 检查过程中遇到的错误,如果检查通过则为nil
// bool - 是否发生了错误,true表示有错误发生
func (c *ClusterProvider) checkClusterVersion(request *coreentity.Request) error {
func (c *ClusterProvider) checkClusterVersion(request *coreentity.Request) (uint64, error) {
addonQueryParams := &metaentity.AddonQueryParams{
AddonType: request.StorageAddonType,
AddonVersion: request.StorageAddonVersion,
}
storageAddon, err := c.addonMetaProvider.FindStorageAddonByParams(addonQueryParams)
if err != nil {
return dbserrors.NewK8sDbsError(dbserrors.GetMetaDataError,
return 0, dbserrors.NewK8sDbsError(dbserrors.GetMetaDataError,
fmt.Errorf("查询存储插件元数据失败: %w", err))
}
if len(storageAddon) == 0 {
return dbserrors.NewK8sDbsError(dbserrors.CreateClusterError,
return 0, dbserrors.NewK8sDbsError(dbserrors.CreateClusterError,
fmt.Errorf("插件类型 '%s' 版本 '%s' 不存在或未配置,请检查插件配置", request.StorageAddonType, request.StorageAddonVersion))
}

addonID := storageAddon[0].ID

// 反序列化支持的版本列表
var supportedVersions []string
if err := json.Unmarshal([]byte(storageAddon[0].SupportedVersions), &supportedVersions); err != nil {
slog.Error("failed to unmarshal supported versions", "error", err)
return dbserrors.NewK8sDbsError(dbserrors.CreateClusterError,
return 0, dbserrors.NewK8sDbsError(dbserrors.CreateClusterError,
fmt.Errorf("supported versions 反序列化失败"))
}

// 检查组件版本是否在支持的版本列表中
for _, component := range request.ComponentList {
if !lo.Contains(supportedVersions, component.Version) {
return dbserrors.NewK8sDbsError(dbserrors.CreateClusterError,
return 0, dbserrors.NewK8sDbsError(dbserrors.CreateClusterError,
fmt.Errorf("组件 %s 的版本 %s 不在支持的版本列表中,支持的版本: %v",
component.ComponentName, component.Version, supportedVersions))
}
Expand All @@ -336,16 +354,16 @@ func (c *ClusterProvider) checkClusterVersion(request *coreentity.Request) error
var supportedAcVersions []string
if err := json.Unmarshal([]byte(storageAddon[0].SupportedAcVersions), &supportedAcVersions); err != nil {
slog.Error("failed to unmarshal supported ac versions", "error", err)
return dbserrors.NewK8sDbsError(dbserrors.CreateClusterError,
return 0, dbserrors.NewK8sDbsError(dbserrors.CreateClusterError,
fmt.Errorf("supported ac versions 反序列化失败"))
}

if !lo.Contains(supportedAcVersions, request.AddonClusterVersion) {
return dbserrors.NewK8sDbsError(dbserrors.CreateClusterError,
return 0, dbserrors.NewK8sDbsError(dbserrors.CreateClusterError,
fmt.Errorf("addonClusterVersion 版本 %s 不在支持的版本列表中,支持的版本: %v",
request.AddonClusterVersion, supportedAcVersions))
}
return nil
return addonID, nil
}

// saveClusterReleaseMeta 记录集群 release 元数据
Expand Down Expand Up @@ -452,6 +470,10 @@ func (c *ClusterProvider) UpdateClusterRelease(
if err := c.validateAddonClusterVersion(request, clusterEntity); err != nil {
return err
}
// 验证环境变量参数
if err := c.validateComponentEnv(clusterEntity.AddonID, request); err != nil {
return dbserrors.NewK8sDbsError(dbserrors.ParameterInvalidError, err)
}
// 更新 cluster release
values, err := c.updateClusterRelease(ctx, request, k8sClient, isPartial)
if err != nil {
Expand Down Expand Up @@ -1129,3 +1151,30 @@ func (c *ClusterProvider) validateAddonClusterVersion(
}
return nil
}

// validateComponentEnv 验证组件环境变量参数
func (c *ClusterProvider) validateComponentEnv(addonID uint64, request *coreentity.Request) error {
if c.envValidator == nil {
// 如果没有配置验证器,跳过验证
return nil
}
if request.ComponentList == nil {
return nil
}

for _, component := range request.ComponentList {
if component.Env == nil {
continue
}
// 使用组件的服务版本进行参数验证
if err := c.envValidator.ValidateVMEnv(
addonID,
component.Version,
component.ComponentName,
component.Env,
); err != nil {
return fmt.Errorf("组件 '%s' 环境变量验证失败: %w", component.ComponentName, err)
}
}
return nil
}
168 changes: 168 additions & 0 deletions dbm-services/k8s-dbs/core/validator/env_validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
TencentBlueKing is pleased to support the open source community by making
蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available.
Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package validator 提供环境变量参数验证功能
package validator

import (
"fmt"
"strconv"

metaentity "k8s-dbs/metadata/entity"
metaprovider "k8s-dbs/metadata/provider"
)

// EnvValidator 环境变量验证器
type EnvValidator struct {
paramConfigProvider metaprovider.AddonParamConfigProvider
addonProvider metaprovider.K8sCrdStorageAddonProvider
}

// NewEnvValidator 创建验证器实例
func NewEnvValidator(
paramConfigProvider metaprovider.AddonParamConfigProvider,
addonProvider metaprovider.K8sCrdStorageAddonProvider,
) *EnvValidator {
return &EnvValidator{
paramConfigProvider: paramConfigProvider,
addonProvider: addonProvider,
}
}

// ValidateVMEnv 验证 VictoriaMetrics 组件环境变量
// 参数:addonID(存储addon的ID), serviceVersion(服务版本), componentName(组件名), env(环境变量)
// 目前只验证 EXTRA_ARGS 模式
func (v *EnvValidator) ValidateVMEnv(
addonID uint64,
serviceVersion string,
componentName string,
env map[string]interface{},
) error {
// 检查 addon 是否启用参数校验
addon, err := v.addonProvider.FindStorageAddonByID(addonID)
if err != nil {
// 找不到 addon 或查询出错,跳过验证
return nil
}
if !addon.EnableEnvValidation {
// 未启用参数校验,跳过验证
return nil
}

if env == nil {
return nil
}

// 检查是否有 EXTRA_ARGS
extraArgs, ok := env["EXTRA_ARGS"]
if !ok {
return nil
}

extraArgsMap, ok := extraArgs.(map[string]interface{})
if !ok {
return fmt.Errorf("EXTRA_ARGS must be a map")
}

// 如果 EXTRA_ARGS 为空,跳过验证
if len(extraArgsMap) == 0 {
return nil
}

// 从数据库获取该组件支持的参数配置
supportedParams, err := v.paramConfigProvider.FindByVersionAndComponent(addonID, serviceVersion, componentName)
if err != nil {
return err
}

// 如果没有配置任何参数规则,跳过验证
if len(supportedParams) == 0 {
return nil
}

// 构建支持的参数 map
supportedParamsMap := make(map[string]*metaentity.AddonParamConfigEntity)
for _, param := range supportedParams {
supportedParamsMap[param.ParamName] = param
}

// 验证每个 EXTRA_ARGS 参数
for key, value := range extraArgsMap {
paramConfig, exists := supportedParamsMap[key]
if !exists {
return fmt.Errorf("parameter '%s' is not supported for component '%s'", key, componentName)
}

if err := v.validateParamType(key, value, paramConfig.ParamType); err != nil {
return err
}
}

return nil
}

// validateParamType 验证参数类型
func (v *EnvValidator) validateParamType(
paramName string,
value interface{},
paramType metaentity.ParamType,
) error {
switch paramType {
case metaentity.ParamTypeString:
// string 类型不需要额外验证
return nil
case metaentity.ParamTypeInt:
return v.validateInt(paramName, value)
case metaentity.ParamTypeBool:
return v.validateBool(paramName, value)
default:
// 未知类型当作 string 处理
return nil
}
}

// validateInt 验证整数类型
func (v *EnvValidator) validateInt(paramName string, value interface{}) error {
switch val := value.(type) {
case int, int32, int64, float64:
return nil
case string:
if _, err := strconv.ParseInt(val, 10, 64); err != nil {
return fmt.Errorf("parameter '%s' must be an integer, got '%s'", paramName, val)
}
return nil
default:
return fmt.Errorf("parameter '%s' must be an integer, got type %T", paramName, value)
}
}

// validateBool 验证布尔类型
func (v *EnvValidator) validateBool(paramName string, value interface{}) error {
switch val := value.(type) {
case bool:
return nil
case string:
if _, err := strconv.ParseBool(val); err != nil {
return fmt.Errorf("parameter '%s' must be a boolean (true/false), got '%s'", paramName, val)
}
return nil
default:
return fmt.Errorf("parameter '%s' must be a boolean, got type %T", paramName, value)
}
}
Loading