diff --git a/pkg/agent/qrm-plugins/network/state/consts.go b/pkg/agent/qrm-plugins/network/state/consts.go new file mode 100644 index 000000000..9c360090a --- /dev/null +++ b/pkg/agent/qrm-plugins/network/state/consts.go @@ -0,0 +1,5 @@ +package state + +const ( + NetBandwidthImplicitAnnotationKey = "resource.katalyst.kubewharf.io/net_bandwidth_implicit" +) diff --git a/pkg/agent/qrm-plugins/network/state/state.go b/pkg/agent/qrm-plugins/network/state/state.go index 27f31150b..a5fd8fb0c 100644 --- a/pkg/agent/qrm-plugins/network/state/state.go +++ b/pkg/agent/qrm-plugins/network/state/state.go @@ -19,6 +19,7 @@ package state import ( "encoding/json" "fmt" + "strconv" info "github.com/google/cadvisor/info/v1" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" @@ -39,8 +40,11 @@ type AllocationInfo struct { PodType string `json:"pod_type,omitempty"` Egress uint32 `json:"egress"` Ingress uint32 `json:"ingress"` - IfName string `json:"if_name"` // we do not support cross-nic bandwidth + IfName string `json:"if_name"` // we do not support cross-nic bandwidth + NSName string `json:"ns_name"` NumaNodes machine.CPUSet `json:"numa_node"` // associated numa nodes of the socket connecting to the selected NIC + QoSLevel string `json:"qosLevel"` + NetClassID string `json:"net_class_id"` Labels map[string]string `json:"labels"` Annotations map[string]string `json:"annotations"` @@ -109,6 +113,8 @@ func (ai *AllocationInfo) Clone() *AllocationInfo { Ingress: ai.Ingress, IfName: ai.IfName, NumaNodes: ai.NumaNodes.Clone(), + QoSLevel: ai.QoSLevel, + NetClassID: ai.NetClassID, Labels: general.DeepCopyMap(ai.Labels), Annotations: general.DeepCopyMap(ai.Annotations), } @@ -126,6 +132,29 @@ func (ai *AllocationInfo) CheckSideCar() bool { return ai.ContainerType == pluginapi.ContainerType_SIDECAR.String() } +func (ai *AllocationInfo) GetRequestedEgress() (uint32, error) { + if ai == nil { + return 0, fmt.Errorf("nil AllocationInfo") + } + + if ai.Egress > 0 && ai.Annotations[NetBandwidthImplicitAnnotationKey] != "" { + return 0, fmt.Errorf("ambiguous ai.Egress: %d, %s: %s", + ai.Egress, NetBandwidthImplicitAnnotationKey, ai.Annotations[NetBandwidthImplicitAnnotationKey]) + } else if ai.Egress > 0 { + return ai.Egress, nil + } else if ai.Annotations[NetBandwidthImplicitAnnotationKey] != "" { + ret, err := strconv.Atoi(ai.Annotations[NetBandwidthImplicitAnnotationKey]) + if err != nil { + return 0, fmt.Errorf("parse %s: %s failed with error: %v", + NetBandwidthImplicitAnnotationKey, ai.Annotations[NetBandwidthImplicitAnnotationKey], err) + } + + return uint32(ret), nil + } + + return 0, nil +} + func (pe PodEntries) Clone() PodEntries { clone := make(PodEntries) for podUID, containerEntries := range pe { diff --git a/pkg/agent/qrm-plugins/network/staticpolicy/policy.go b/pkg/agent/qrm-plugins/network/staticpolicy/policy.go index fb6a1deeb..8c4e4969e 100644 --- a/pkg/agent/qrm-plugins/network/staticpolicy/policy.go +++ b/pkg/agent/qrm-plugins/network/staticpolicy/policy.go @@ -47,6 +47,7 @@ import ( "github.com/kubewharf/katalyst-core/pkg/util/machine" "github.com/kubewharf/katalyst-core/pkg/util/native" "github.com/kubewharf/katalyst-core/pkg/util/qos" + qrmgeneral "github.com/kubewharf/katalyst-core/pkg/util/qrm" ) const ( @@ -82,6 +83,7 @@ type StaticPolicy struct { CgroupV2Env bool qosLevelToNetClassMap map[string]uint32 applyNetClassFunc func(podUID, containerID string, data *common.NetClsData) error + applyNetworkGroupsFunc func(map[string]*qrmgeneral.NetworkGroup) error podLevelNetClassAnnoKey string podLevelNetAttributesAnnoKeys []string ipv4ResourceAllocationAnnotationKey string @@ -94,6 +96,8 @@ type StaticPolicy struct { podAnnotationKeptKeys []string podLabelKeptKeys []string + lowPriorityGroups map[string]*qrmgeneral.NetworkGroup + // aliveCgroupID is used to record the alive cgroupIDs and their last alive time aliveCgroupID map[uint64]time.Time } @@ -157,8 +161,15 @@ func NewStaticPolicy(agentCtx *agent.GenericContext, conf *config.Configuration, policyImplement.applyNetClassFunc = cgroupcmutils.ApplyNetClsForContainer } + policyImplement.applyNetworkGroupsFunc = agentCtx.MetaServer.ExternalManager.ApplyNetworkGroups + policyImplement.ApplyConfig(conf.StaticAgentConfiguration) + err = policyImplement.generateAndApplyGroups() + if err != nil { + return false, agent.ComponentStub{}, fmt.Errorf("generateAndApplyGroups failed with error: %v", err) + } + pluginWrapper, err := skeleton.NewRegistrationPluginWrapper(policyImplement, conf.QRMPluginSocketDirs, func(key string, value int64) { _ = wrappedEmitter.StoreInt64(key, value, metrics.MetricTypeNameRaw) @@ -514,6 +525,14 @@ func (p *StaticPolicy) Allocate(_ context.Context, return nil, err } + netClassID, err := p.getNetClassID(podAnnotations, p.podLevelNetClassAnnoKey, qosLevel) + if err != nil { + err = fmt.Errorf("getNetClassID for pod: %s/%s, container: %s failed with error: %v", + req.PodNamespace, req.PodName, req.ContainerName, err) + general.Errorf("%s", err.Error()) + return nil, err + } + reqInt, _, err := util.GetQuantityFromResourceReq(req) if err != nil { return nil, fmt.Errorf("getReqQuantityFromResourceReq failed with error: %v", err) @@ -525,7 +544,8 @@ func (p *StaticPolicy) Allocate(_ context.Context, "containerName", req.ContainerName, "qosLevel", qosLevel, "reqAnnotations", req.Annotations, - "netBandwidthReq(Mbps)", reqInt) + "netBandwidthReq(Mbps)", reqInt, + "netClassID", netClassID) p.Lock() defer func() { @@ -570,7 +590,7 @@ func (p *StaticPolicy) Allocate(_ context.Context, "bandwidthReq(Mbps)", reqInt, "currentResult(Mbps)", allocationInfo.Egress) - resourceAllocationAnnotations, err := p.getResourceAllocationAnnotations(podAnnotations, allocationInfo) + resourceAllocationAnnotations, err := p.getResourceAllocationAnnotations(podAnnotations, allocationInfo, netClassID) if err != nil { err = fmt.Errorf("getResourceAllocationAnnotations for pod: %s/%s, container: %s failed with error: %v", req.PodNamespace, req.PodName, req.ContainerName, err) @@ -660,12 +680,23 @@ func (p *StaticPolicy) Allocate(_ context.Context, Egress: uint32(reqInt), Ingress: uint32(reqInt), IfName: selectedNIC.Iface, + NSName: selectedNIC.NSName, NumaNodes: siblingNUMAs, Labels: general.DeepCopyMap(req.Labels), Annotations: general.DeepCopyMap(req.Annotations), + QoSLevel: qosLevel, + NetClassID: fmt.Sprintf("%d", netClassID), } - resourceAllocationAnnotations, err := p.getResourceAllocationAnnotations(podAnnotations, newAllocation) + err = applyImplicitReq(req, newAllocation) + if err != nil { + err = fmt.Errorf("p.applyImplicitReq for pod: %s/%s, container: %s failed with error: %v", + req.PodNamespace, req.PodName, req.ContainerName, err) + general.Errorf("%s", err.Error()) + return nil, err + } + + resourceAllocationAnnotations, err := p.getResourceAllocationAnnotations(podAnnotations, newAllocation, netClassID) if err != nil { err = fmt.Errorf("getResourceAllocationAnnotations for pod: %s/%s, container: %s failed with error: %v", req.PodNamespace, req.PodName, req.ContainerName, err) @@ -690,6 +721,12 @@ func (p *StaticPolicy) Allocate(_ context.Context, // update state cache p.state.SetMachineState(machineState) + err = p.generateAndApplyGroups() + + if err != nil { + general.Errorf("generateAndApplyGroups failed with error: %v", err) + } + return packAllocationResponse(req, newAllocation, respHint, resourceAllocationAnnotations) } @@ -730,7 +767,13 @@ func (p *StaticPolicy) applyNetClass() { continue } - classID, err := p.getNetClassID(pod.GetAnnotations(), p.podLevelNetClassAnnoKey) + qosLevel, err := p.qosConfig.GetQoSLevel(nil, pod.Annotations) + if err != nil { + general.Errorf("get qos level for pod: %s/%s failed with err", pod.Namespace, pod.Name) + continue + } + + classID, err := p.getNetClassID(pod.GetAnnotations(), p.podLevelNetClassAnnoKey, qosLevel) if err != nil { general.Errorf("get net class id failed, pod: %s, err: %s", native.GenerateUniqObjectNameKey(pod), err) continue @@ -944,12 +987,8 @@ func (p *StaticPolicy) selectNICsByReq(req *pluginapi.ResourceRequest) ([]machin return candidateNICs, nil } -func (p *StaticPolicy) getResourceAllocationAnnotations(podAnnotations map[string]string, allocation *state.AllocationInfo) (map[string]string, error) { - netClsID, err := p.getNetClassID(podAnnotations, p.podLevelNetClassAnnoKey) - if err != nil { - return nil, fmt.Errorf("getNetClassID failed with error: %v", err) - } - +func (p *StaticPolicy) getResourceAllocationAnnotations(podAnnotations map[string]string, + allocation *state.AllocationInfo, netClsID uint32) (map[string]string, error) { selectedNIC := p.getNICByName(allocation.IfName) resourceAllocationAnnotations := map[string]string{ @@ -957,8 +996,11 @@ func (p *StaticPolicy) getResourceAllocationAnnotations(podAnnotations map[strin p.ipv6ResourceAllocationAnnotationKey: strings.Join(selectedNIC.GetNICIPs(machine.IPVersionV6), IPsSeparator), p.netInterfaceNameResourceAllocationAnnotationKey: selectedNIC.Iface, p.netClassIDResourceAllocationAnnotationKey: fmt.Sprintf("%d", netClsID), + } + + if !isImplicitReq(podAnnotations) { // TODO: support differentiated Egress/Ingress bandwidth later - p.netBandwidthResourceAllocationAnnotationKey: strconv.Itoa(int(allocation.Egress)), + resourceAllocationAnnotations[p.netBandwidthResourceAllocationAnnotationKey] = strconv.Itoa(int(allocation.Egress)) } if len(selectedNIC.NSAbsolutePath) > 0 { @@ -1002,10 +1044,12 @@ func (p *StaticPolicy) removePod(podUID string) error { p.state.SetPodEntries(podEntries) p.state.SetMachineState(machineState) + p.generateAndApplyGroups() + return nil } -func (p *StaticPolicy) getNetClassID(podAnnotations map[string]string, podLevelNetClassAnnoKey string) (uint32, error) { +func (p *StaticPolicy) getNetClassID(podAnnotations map[string]string, podLevelNetClassAnnoKey, qosLevel string) (uint32, error) { isPodLevelNetClassExist, classID, err := qos.GetPodNetClassID(podAnnotations, podLevelNetClassAnnoKey) if err != nil { return 0, err @@ -1014,10 +1058,6 @@ func (p *StaticPolicy) getNetClassID(podAnnotations map[string]string, podLevelN return classID, nil } - qosLevel, err := p.qosConfig.GetQoSLevel(nil, podAnnotations) - if err != nil { - return 0, err - } return p.getNetClassIDByQoSLevel(qosLevel) } @@ -1092,3 +1132,78 @@ func (p *StaticPolicy) clearResidualNetClass(activeNetClsData map[uint64]*common } } } + +func (p *StaticPolicy) generateLowPriorityGroup() error { + + lowPriorityGroups := make(map[string]*qrmgeneral.NetworkGroup) + machineState := p.state.GetMachineState() + + for nicName, nicState := range machineState { + groupName := getGroupName(nicName, LowPriorityGroupNameSuffix) + // [TODO] since getNICByName has alreday been used, + // we also assume nic name is unique here. + // But if the assumption is broken, we should reconsider logic here. + lowPriorityGroups[groupName] = &qrmgeneral.NetworkGroup{ + Egress: nicState.EgressState.Allocatable, + } + + negtive := false + for podUID, containerEntries := range nicState.PodEntries { + for containerName, allocationInfo := range containerEntries { + if allocationInfo == nil { + general.Warningf("nil allocationInfo") + continue + } + + if allocationInfo.QoSLevel == apiconsts.PodAnnotationQoSLevelReclaimedCores { + if allocationInfo.NetClassID != "" { + lowPriorityGroups[groupName].NetClassIDs = append(lowPriorityGroups[groupName].NetClassIDs, allocationInfo.NetClassID) + } + } else { + requestedEgress, err := allocationInfo.GetRequestedEgress() + + if err != nil { + return fmt.Errorf("GetRequestedEgress for pod: %s, container: %s failed with error: %v", podUID, containerName, err) + } + + if !negtive && lowPriorityGroups[groupName].Egress > requestedEgress { + lowPriorityGroups[groupName].Egress -= requestedEgress + } else { + negtive = true + } + } + } + } + + // [TODO] make 0.05 as option + if negtive { + lowPriorityGroups[groupName].Egress = uint32(float64(nicState.EgressState.Allocatable) * 0.05) + } else { + lowPriorityGroups[groupName].Egress = general.MaxUInt32(uint32(float64(nicState.EgressState.Allocatable)*0.05), lowPriorityGroups[groupName].Egress) + } + selectedNIC := p.getNICByName(nicName) + lowPriorityGroups[groupName].MergedIPv4 = strings.Join(selectedNIC.GetNICIPs(machine.IPVersionV4), IPsSeparator) + lowPriorityGroups[groupName].MergedIPv6 = strings.Join(selectedNIC.GetNICIPs(machine.IPVersionV6), IPsSeparator) + } + + general.Infof("old lowPriorityGroups: %+v, new lowPriorityGroups: %+v", p.lowPriorityGroups, lowPriorityGroups) + + p.lowPriorityGroups = lowPriorityGroups + return nil +} + +func (p *StaticPolicy) generateAndApplyGroups() error { + err := p.generateLowPriorityGroup() + + if err != nil { + return fmt.Errorf("generateLowPriorityGroup failed with error: %v", err) + } else { + err = p.applyNetworkGroupsFunc(p.lowPriorityGroups) + + if err != nil { + return fmt.Errorf("applyGroups failed with error: %v", err) + } + } + + return nil +} diff --git a/pkg/agent/qrm-plugins/network/staticpolicy/types.go b/pkg/agent/qrm-plugins/network/staticpolicy/types.go new file mode 100644 index 000000000..e80f71807 --- /dev/null +++ b/pkg/agent/qrm-plugins/network/staticpolicy/types.go @@ -0,0 +1 @@ +package staticpolicy diff --git a/pkg/agent/qrm-plugins/network/staticpolicy/util.go b/pkg/agent/qrm-plugins/network/staticpolicy/util.go index e4040f8e5..3c16bdc29 100644 --- a/pkg/agent/qrm-plugins/network/staticpolicy/util.go +++ b/pkg/agent/qrm-plugins/network/staticpolicy/util.go @@ -18,14 +18,17 @@ package staticpolicy import ( "fmt" + "math" "math/rand" "time" pluginapi "k8s.io/kubelet/pkg/apis/resourceplugin/v1alpha1" "github.com/kubewharf/katalyst-api/pkg/consts" + apiconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent" "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/network/state" + qrmutil "github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util" "github.com/kubewharf/katalyst-core/pkg/util/general" "github.com/kubewharf/katalyst-core/pkg/util/machine" ) @@ -42,6 +45,8 @@ const ( RandomOne NICSelectionPoligy = "random" FirstOne NICSelectionPoligy = "first" LastOne NICSelectionPoligy = "last" + + LowPriorityGroupNameSuffix = "low_priority" ) type NICFilter func(nics []machine.InterfaceInfo, req *pluginapi.ResourceRequest, agentCtx *agent.GenericContext) []machine.InterfaceInfo @@ -325,3 +330,26 @@ func getResourceIdentifier(ifaceNS, ifaceName string) string { return ifaceName } + +func applyImplicitReq(req *pluginapi.ResourceRequest, allocationInfo *state.AllocationInfo) error { + if req == nil || allocationInfo == nil { + return fmt.Errorf("nil req or allocationInfo") + } + + if !isImplicitReq(req.Annotations) { + return nil + } + + allocationInfo.Annotations[state.NetBandwidthImplicitAnnotationKey] = + fmt.Sprintf("%d", + general.Max(int(math.Ceil(req.ResourceRequests[string(apiconsts.ResourceNetBandwidth)])), 0)) + return nil +} + +func isImplicitReq(annotations map[string]string) bool { + return annotations[qrmutil.PodAnnotationQRMDeclarationKey] == qrmutil.PodAnnotationQRMDeclarationTrue +} + +func getGroupName(nicName, groupSuffix string) string { + return fmt.Sprintf("%s_%s", nicName, groupSuffix) +} diff --git a/pkg/agent/qrm-plugins/util/consts.go b/pkg/agent/qrm-plugins/util/consts.go index 687cd0931..9d3a5b023 100644 --- a/pkg/agent/qrm-plugins/util/consts.go +++ b/pkg/agent/qrm-plugins/util/consts.go @@ -63,6 +63,12 @@ const ( OCIPropertyNameMemoryLimitInBytes = "MemoryLimitInBytes" ) +const ( + //[TODO]: move them to apiserver + PodAnnotationQRMDeclarationKey = "katalyst.kubewharf.io/qrm-declaration" + PodAnnotationQRMDeclarationTrue = "true" +) + const QRMTimeFormat = "2006-01-02 15:04:05.999999999 -0700 MST" const QRMPluginPolicyTagName = "policy" diff --git a/pkg/agent/qrm-plugins/util/util.go b/pkg/agent/qrm-plugins/util/util.go index fb6552ac8..ee9a0a2f7 100644 --- a/pkg/agent/qrm-plugins/util/util.go +++ b/pkg/agent/qrm-plugins/util/util.go @@ -49,7 +49,14 @@ func GetQuantityFromResourceReq(req *pluginapi.ResourceRequest) (int, float64, e return general.Max(int(math.Ceil(req.ResourceRequests[key])), 0), req.ResourceRequests[key], nil case string(apiconsts.ReclaimedResourceMilliCPU): return general.Max(int(math.Ceil(req.ResourceRequests[key]/1000.0)), 0), req.ResourceRequests[key] / 1000.0, nil - case string(v1.ResourceMemory), string(apiconsts.ReclaimedResourceMemory), string(apiconsts.ResourceNetBandwidth): + case string(v1.ResourceMemory), string(apiconsts.ReclaimedResourceMemory): + return general.Max(int(math.Ceil(req.ResourceRequests[key])), 0), req.ResourceRequests[key], nil + case string(apiconsts.ResourceNetBandwidth): + if req.Annotations[PodAnnotationQRMDeclarationKey] == PodAnnotationQRMDeclarationTrue { + general.Infof("detect %s: %s, return %s: 0 instead of %s: %.2f", + PodAnnotationQRMDeclarationKey, PodAnnotationQRMDeclarationTrue, key, key, req.ResourceRequests[key]) + return 0, 0, nil + } return general.Max(int(math.Ceil(req.ResourceRequests[key])), 0), req.ResourceRequests[key], nil default: return 0, 0, fmt.Errorf("invalid request resource name: %s", key) diff --git a/pkg/util/external/network/manager.go b/pkg/util/external/network/manager.go index ac414a4e4..b49ea91cb 100644 --- a/pkg/util/external/network/manager.go +++ b/pkg/util/external/network/manager.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + qrmgeneral "github.com/kubewharf/katalyst-core/pkg/util/qrm" ) // NetworkManager provides methods that control network resources. @@ -30,6 +31,8 @@ type NetworkManager interface { ListNetClass() ([]*common.NetClsData, error) // ClearNetClass clears the net class config for a container. ClearNetClass(cgroupID uint64) error + // ApplyNetworkGroups apply parameters for network groups. + ApplyNetworkGroups(map[string]*qrmgeneral.NetworkGroup) error } type NetworkManagerStub struct { @@ -74,3 +77,7 @@ func (n *NetworkManagerStub) ClearNetClass(cgroupID uint64) error { } return nil } + +func (n *NetworkManagerStub) ApplyNetworkGroups(map[string]*qrmgeneral.NetworkGroup) error { + return nil +} diff --git a/pkg/util/external/network/manager_linux.go b/pkg/util/external/network/manager_linux.go index ec859e338..8805f6332 100644 --- a/pkg/util/external/network/manager_linux.go +++ b/pkg/util/external/network/manager_linux.go @@ -23,6 +23,7 @@ import ( "errors" "github.com/kubewharf/katalyst-core/pkg/util/cgroup/common" + qrmgeneral "github.com/kubewharf/katalyst-core/pkg/util/qrm" ) type defaultNetworkManager struct{} @@ -49,3 +50,8 @@ func (*defaultNetworkManager) ClearNetClass(cgroupID uint64) error { // TODO: clear the eBPF map when a pod is removed return errors.New("not implemented yet") } + +// ApplyNetworkGroups apply parameters for network groups. +func (n *defaultNetworkManager) ApplyNetworkGroups(map[string]*qrmgeneral.NetworkGroup) error { + return nil +} diff --git a/pkg/util/general/common.go b/pkg/util/general/common.go index 3ad1837f8..696ef6d68 100644 --- a/pkg/util/general/common.go +++ b/pkg/util/general/common.go @@ -111,6 +111,14 @@ func MinUInt32(a, b uint32) uint32 { } } +func MaxUInt32(a, b uint32) uint32 { + if a >= b { + return a + } else { + return b + } +} + func MinFloat64(a, b float64) float64 { if a >= b { return b diff --git a/pkg/util/qrm/types.go b/pkg/util/qrm/types.go new file mode 100644 index 000000000..d0ba2dffa --- /dev/null +++ b/pkg/util/qrm/types.go @@ -0,0 +1,9 @@ +package qrm + +type NetworkGroup struct { + NetClassIDs []string `json:"net_class_ids"` + Egress uint32 `json:"egress"` + Ingress uint32 `json:"ingress"` + MergedIPv4 string `json:"merged_ipv4"` + MergedIPv6 string `json:"merged_ipv6"` +}