Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/k8sattributes] Operator resource attributes #37114

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions processor/k8sattributesprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ type ExtractConfig struct {
// It is a list of FieldExtractConfig type. See FieldExtractConfig
// documentation for more details.
Labels []FieldExtractConfig `mapstructure:"labels"`

OperatorRules kube.OperatorRules `mapstructure:"operator_rules"`
}

// FieldExtractConfig allows specifying an extraction rule to extract a resource attribute from pod (or namespace)
Expand Down
1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func createProcessorOpts(cfg component.Config) []option {
opts = append(opts, withExtractMetadata(oCfg.Extract.Metadata...))
opts = append(opts, withExtractLabels(oCfg.Extract.Labels...))
opts = append(opts, withExtractAnnotations(oCfg.Extract.Annotations...))
opts = append(opts, withOperatorExtractRules(oCfg.Extract.OperatorRules))

// filters
opts = append(opts, withFilterNode(oCfg.Filter.Node, oCfg.Filter.NodeFromEnvVar))
Expand Down
77 changes: 61 additions & 16 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,15 @@ func (c *WatchClient) GetNode(nodeName string) (*Node, bool) {
return nil, false
}

func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) (map[string]string, map[string]string) {
tags := map[string]string{}
serviceNames := map[string]string{}
if c.Rules.PodName {
tags[conventions.AttributeK8SPodName] = pod.Name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SPodName] = pod.Name
}

if c.Rules.PodHostName {
tags[tagHostName] = pod.Spec.Hostname
Expand Down Expand Up @@ -494,7 +498,7 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
c.Rules.JobUID || c.Rules.JobName ||
c.Rules.StatefulSetUID || c.Rules.StatefulSetName ||
c.Rules.DeploymentName || c.Rules.DeploymentUID ||
c.Rules.CronJobName {
c.Rules.CronJobName || c.Rules.OperatorRules.Enabled {
for _, ref := range pod.OwnerReferences {
switch ref.Kind {
case "ReplicaSet":
Expand All @@ -504,11 +508,19 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.ReplicaSetName {
tags[conventions.AttributeK8SReplicaSetName] = ref.Name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SReplicaSetName] = ref.Name
}
if c.Rules.DeploymentName {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
tags[conventions.AttributeK8SDeploymentName] = replicaset.Deployment.Name
}
name := c.deploymentName(ref)
if name != "" {
tags[conventions.AttributeK8SDeploymentName] = name
}
}
if c.Rules.OperatorRules.Enabled {
name := c.deploymentName(ref)
if name != "" {
serviceNames[conventions.AttributeK8SDeploymentName] = name
}
}
if c.Rules.DeploymentUID {
Expand All @@ -525,18 +537,30 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.DaemonSetName {
tags[conventions.AttributeK8SDaemonSetName] = ref.Name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SDaemonSetName] = ref.Name
}
case "StatefulSet":
if c.Rules.StatefulSetUID {
tags[conventions.AttributeK8SStatefulSetUID] = string(ref.UID)
}
if c.Rules.StatefulSetName {
tags[conventions.AttributeK8SStatefulSetName] = ref.Name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SStatefulSetName] = ref.Name
}
case "Job":
if c.Rules.CronJobName {
if c.Rules.CronJobName || c.Rules.OperatorRules.Enabled {
parts := c.cronJobRegex.FindStringSubmatch(ref.Name)
if len(parts) == 2 {
tags[conventions.AttributeK8SCronJobName] = parts[1]
name := parts[1]
if c.Rules.CronJobName {
tags[conventions.AttributeK8SCronJobName] = name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SCronJobName] = name
}
}
}
if c.Rules.JobUID {
Expand All @@ -545,6 +569,9 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
if c.Rules.JobName {
tags[conventions.AttributeK8SJobName] = ref.Name
}
if c.Rules.OperatorRules.Enabled {
serviceNames[conventions.AttributeK8SJobName] = ref.Name
}
}
}
}
Expand All @@ -568,7 +595,16 @@ func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
for _, r := range c.Rules.Annotations {
r.extractFromPodMetadata(pod.Annotations, tags, "k8s.pod.annotations.%s")
}
return tags
return tags, serviceNames
}

func (c *WatchClient) deploymentName(ref meta_v1.OwnerReference) string {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
return replicaset.Deployment.Name
}
}
return ""
}

// This function removes all data from the Pod except what is required by extraction rules and pod association
Expand Down Expand Up @@ -633,7 +669,7 @@ func removeUnnecessaryPodData(pod *api_v1.Pod, rules ExtractionRules) *api_v1.Po
removeUnnecessaryContainerData := func(c api_v1.Container) api_v1.Container {
transformedContainer := api_v1.Container{}
transformedContainer.Name = c.Name // we always need the name, it's used for identification
if rules.ContainerImageName || rules.ContainerImageTag {
if rules.ContainerImageName || rules.ContainerImageTag || rules.OperatorRules.Enabled {
transformedContainer.Image = c.Image
}
return transformedContainer
Expand Down Expand Up @@ -698,7 +734,7 @@ func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContain
if !needContainerAttributes(c.Rules) {
return containers
}
if c.Rules.ContainerImageName || c.Rules.ContainerImageTag {
if c.Rules.ContainerImageName || c.Rules.ContainerImageTag || c.Rules.OperatorRules.Enabled {
for _, spec := range append(pod.Spec.Containers, pod.Spec.InitContainers...) {
container := &Container{}
name, tag, err := parseNameAndTagFromImage(spec.Image)
Expand All @@ -709,18 +745,26 @@ func (c *WatchClient) extractPodContainersAttributes(pod *api_v1.Pod) PodContain
if c.Rules.ContainerImageTag {
container.ImageTag = tag
}
if c.Rules.OperatorRules.Enabled {
container.ServiceVersion = tag
}
}
containers.ByName[spec.Name] = container
}
}
for _, apiStatus := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
container, ok := containers.ByName[apiStatus.Name]
containerName := apiStatus.Name
container, ok := containers.ByName[containerName]
if !ok {
container = &Container{}
containers.ByName[apiStatus.Name] = container
containers.ByName[containerName] = container
}
if c.Rules.ContainerName {
container.Name = apiStatus.Name
container.Name = containerName
}
if c.Rules.OperatorRules.Enabled {
container.ServiceInstanceID = operatorServiceInstanceID(pod, containerName)
container.ServiceName = containerName
}
containerID := apiStatus.ContainerID
// Remove container runtime prefix
Expand Down Expand Up @@ -796,7 +840,7 @@ func (c *WatchClient) podFromAPI(pod *api_v1.Pod) *Pod {
if c.shouldIgnorePod(pod) {
newPod.Ignore = true
} else {
newPod.Attributes = c.extractPodAttributes(pod)
newPod.Attributes, newPod.ServiceNames = c.extractPodAttributes(pod)
if needContainerAttributes(c.Rules) {
newPod.Containers = c.extractPodContainersAttributes(pod)
}
Expand Down Expand Up @@ -1040,7 +1084,8 @@ func needContainerAttributes(rules ExtractionRules) bool {
rules.ContainerName ||
rules.ContainerImageTag ||
rules.ContainerImageRepoDigests ||
rules.ContainerID
rules.ContainerID ||
rules.OperatorRules.Enabled
}

func (c *WatchClient) handleReplicaSetAdd(obj any) {
Expand Down
107 changes: 91 additions & 16 deletions processor/k8sattributesprocessor/internal/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,15 +702,27 @@ func TestExtractionRules(t *testing.T) {
},
}

operatorRules := ExtractionRules{
OperatorRules: OperatorRules{
Enabled: true,
Labels: true,
},
Annotations: []FieldExtractionRule{OperatorAnnotationRule},
Labels: OperatorLabelRules,
}

testCases := []struct {
name string
rules ExtractionRules
attributes map[string]string
name string
rules ExtractionRules
additionalAnnotations map[string]string
additionalLabels map[string]string
attributes map[string]string
serviceName string
}{
{
name: "no-rules",
rules: ExtractionRules{},
attributes: nil,
attributes: map[string]string{},
},
{
name: "deployment",
Expand Down Expand Up @@ -962,25 +974,68 @@ func TestExtractionRules(t *testing.T) {
"prefix-annotation1": "av1",
},
},
{
name: "operator-rules-builtin",
rules: operatorRules,
attributes: map[string]string{
// tested in operator-container-level-attributes below
},
serviceName: "auth-service",
},
{
name: "operator-rules-label-values",
rules: operatorRules,
additionalLabels: map[string]string{
"app.kubernetes.io/name": "label-service",
"app.kubernetes.io/version": "label-version",
"app.kubernetes.io/part-of": "label-namespace",
},
attributes: map[string]string{
"service.name": "label-service",
"service.version": "label-version",
"service.namespace": "label-namespace",
},
},
{
name: "operator-rules-annotation-override",
rules: operatorRules,
additionalAnnotations: map[string]string{
"resource.opentelemetry.io/service.instance.id": "annotation-id",
"resource.opentelemetry.io/service.version": "annotation-version",
"resource.opentelemetry.io/service.name": "annotation-service",
"resource.opentelemetry.io/service.namespace": "annotation-namespace",
},
attributes: map[string]string{
"service.instance.id": "annotation-id",
"service.name": "annotation-service",
"service.version": "annotation-version",
"service.namespace": "annotation-namespace",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c.Rules = tc.rules

// manually call the data removal functions here
// normally the informer does this, but fully emulating the informer in this test is annoying
pod := pod.DeepCopy()
for k, v := range tc.additionalAnnotations {
pod.Annotations[k] = v
}
for k, v := range tc.additionalLabels {
pod.Labels[k] = v
}
transformedPod := removeUnnecessaryPodData(pod, c.Rules)
transformedReplicaset := removeUnnecessaryReplicaSetData(replicaset)
c.handleReplicaSetAdd(transformedReplicaset)
c.handlePodAdd(transformedPod)
p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP))
require.True(t, ok)

assert.Equal(t, len(tc.attributes), len(p.Attributes))
for k, v := range tc.attributes {
got, ok := p.Attributes[k]
assert.True(t, ok)
assert.Equal(t, v, got)
assert.Equal(t, tc.attributes, p.Attributes)
if tc.serviceName != "" {
assert.Equal(t, tc.serviceName, OperatorServiceName("containerName", p.ServiceNames))
}
})
}
Expand Down Expand Up @@ -1040,7 +1095,7 @@ func TestReplicaSetExtractionRules(t *testing.T) {
{
name: "no-rules",
rules: ExtractionRules{},
attributes: nil,
attributes: map[string]string{},
}, {
name: "one_deployment_is_controller",
ownerReferences: []meta_v1.OwnerReference{
Expand Down Expand Up @@ -1132,12 +1187,7 @@ func TestReplicaSetExtractionRules(t *testing.T) {
p, ok := c.GetPod(newPodIdentifier("connection", "", pod.Status.PodIP))
require.True(t, ok)

assert.Equal(t, len(tc.attributes), len(p.Attributes))
for k, v := range tc.attributes {
got, ok := p.Attributes[k]
assert.True(t, ok)
assert.Equal(t, v, got)
}
assert.Equal(t, tc.attributes, p.Attributes)
})
}
}
Expand Down Expand Up @@ -1489,6 +1539,10 @@ func TestPodIgnorePatterns(t *testing.T) {

func Test_extractPodContainersAttributes(t *testing.T) {
pod := api_v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: "test-pod",
Namespace: "test-namespace",
},
Spec: api_v1.PodSpec{
Containers: []api_v1.Container{
{
Expand Down Expand Up @@ -1564,6 +1618,27 @@ func Test_extractPodContainersAttributes(t *testing.T) {
pod: &pod,
want: PodContainers{ByID: map[string]*Container{}, ByName: map[string]*Container{}},
},
{
name: "operator-container-level-attributes",
rules: ExtractionRules{
OperatorRules: OperatorRules{Enabled: true},
},
pod: &pod,
want: PodContainers{
ByID: map[string]*Container{
"container1-id-123": {ServiceName: "container1", ServiceInstanceID: "test-namespace.test-pod.container1", ServiceVersion: "0.1.0"},
"container2-id-456": {ServiceName: "container2", ServiceInstanceID: "test-namespace.test-pod.container2"},
"container3-id-abc": {ServiceName: "container3", ServiceInstanceID: "test-namespace.test-pod.container3", ServiceVersion: "1.0"},
"init-container-id-789": {ServiceName: "init_container", ServiceInstanceID: "test-namespace.test-pod.init_container", ServiceVersion: "latest"},
},
ByName: map[string]*Container{
"container1": {ServiceName: "container1", ServiceInstanceID: "test-namespace.test-pod.container1", ServiceVersion: "0.1.0"},
"container2": {ServiceName: "container2", ServiceInstanceID: "test-namespace.test-pod.container2"},
"container3": {ServiceName: "container3", ServiceInstanceID: "test-namespace.test-pod.container3", ServiceVersion: "1.0"},
"init_container": {ServiceName: "init_container", ServiceInstanceID: "test-namespace.test-pod.init_container", ServiceVersion: "latest"},
},
},
},
{
name: "image-name-only",
rules: ExtractionRules{
Expand Down
Loading
Loading