Skip to content

Commit 17c0964

Browse files
authored
refactor: 支持创建任务时自定义插件的span属性 --story=130216170 (#595)
1 parent e20afa4 commit 17c0964

File tree

8 files changed

+535
-1
lines changed

8 files changed

+535
-1
lines changed

bkflow/apigw/serializers/task.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ class CreateTaskSerializer(CredentialsValidationMixin, serializers.Serializer):
7878
creator = serializers.CharField(help_text=_("创建者"), max_length=USER_NAME_MAX_LENGTH, required=True)
7979
description = serializers.CharField(help_text=_("任务描述"), required=False, allow_blank=True)
8080
constants = serializers.JSONField(help_text=_("任务启动参数"), required=False, default={})
81+
custom_span_attributes = serializers.DictField(
82+
help_text=_("自定义 Span 属性,会添加到所有节点上报的 Span 中"), required=False, default={}
83+
)
8184

8285

8386
class CreateTaskByAppSerializer(serializers.Serializer):
@@ -87,6 +90,9 @@ class CreateTaskByAppSerializer(serializers.Serializer):
8790
name = serializers.CharField(help_text=_("任务名"), max_length=MAX_LEN_OF_TASK_NAME, required=False)
8891
description = serializers.CharField(help_text=_("任务描述"), required=False, allow_blank=True)
8992
constants = serializers.JSONField(help_text=_("任务启动参数"), required=False, default={})
93+
custom_span_attributes = serializers.DictField(
94+
help_text=_("自定义 Span 属性,会添加到所有节点上报的 Span 中"), required=False, default={}
95+
)
9096

9197

9298
class TaskMockDataSerializer(serializers.Serializer):
@@ -103,6 +109,9 @@ class CreateMockTaskBaseSerializer(CredentialsValidationMixin, serializers.Seria
103109
mock_data = TaskMockDataSerializer(help_text=_("Mock 数据"), default=TaskMockDataSerializer())
104110
description = serializers.CharField(help_text=_("任务描述"), required=False, allow_blank=True)
105111
constants = serializers.JSONField(help_text=_("任务启动参数"), default={})
112+
custom_span_attributes = serializers.DictField(
113+
help_text=_("自定义 Span 属性,会添加到所有节点上报的 Span 中"), required=False, default={}
114+
)
106115

107116

108117
class CreateMockTaskWithPipelineTreeSerializer(CreateMockTaskBaseSerializer):
@@ -143,6 +152,9 @@ class CreateTaskWithoutTemplateSerializer(CredentialsValidationMixin, serializer
143152
constants = serializers.JSONField(help_text=_("任务启动参数"), required=False, default={})
144153
pipeline_tree = serializers.JSONField(help_text=_("任务树"), required=True)
145154
notify_config = serializers.JSONField(help_text=_("通知配置"), required=False, default={})
155+
custom_span_attributes = serializers.DictField(
156+
help_text=_("自定义 Span 属性,会添加到所有节点上报的 Span 中"), required=False, default={}
157+
)
146158

147159

148160
class PipelineTreeSerializer(serializers.Serializer):

bkflow/apigw/views/create_mock_task.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ def create_mock_task(request, space_id):
105105
if credentials:
106106
create_task_data.setdefault("extra_info", {}).setdefault("custom_context", {})["credentials"] = credentials
107107

108+
# 将custom_span_attributes放入extra_info的custom_context中,以便通过TaskContext和parent_data.inputs获取
109+
# 用于传递到节点Span中
110+
custom_span_attributes = ser.data.get("custom_span_attributes", {})
111+
if custom_span_attributes:
112+
create_task_data.setdefault("extra_info", {}).setdefault("custom_context", {})[
113+
"custom_span_attributes"
114+
] = custom_span_attributes
115+
108116
client = TaskComponentClient(space_id=space_id)
109117
result = client.create_task(create_task_data)
110118
return result

bkflow/apigw/views/create_task.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ def create_task(request, space_id):
7474
if credentials:
7575
create_task_data.setdefault("extra_info", {}).setdefault("custom_context", {})["credentials"] = credentials
7676

77+
# 将custom_span_attributes放入extra_info的custom_context中,以便通过TaskContext和parent_data.inputs获取
78+
# 用于传递到节点Span中
79+
custom_span_attributes = ser.data.get("custom_span_attributes", {})
80+
if custom_span_attributes:
81+
create_task_data.setdefault("extra_info", {}).setdefault("custom_context", {})[
82+
"custom_span_attributes"
83+
] = custom_span_attributes
84+
7785
client = TaskComponentClient(space_id=space_id)
7886
result = client.create_task(create_task_data)
7987

bkflow/apigw/views/create_task_by_app.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,14 @@ def create_task_by_app(request, template_id):
7676
if credentials:
7777
create_task_data.setdefault("extra_info", {}).setdefault("custom_context", {})["credentials"] = credentials
7878

79+
# 将custom_span_attributes放入extra_info的custom_context中,以便通过TaskContext和parent_data.inputs获取
80+
# 用于传递到节点Span中
81+
custom_span_attributes = ser.data.get("custom_span_attributes", {})
82+
if custom_span_attributes:
83+
create_task_data.setdefault("extra_info", {}).setdefault("custom_context", {})[
84+
"custom_span_attributes"
85+
] = custom_span_attributes
86+
7987
client = TaskComponentClient(space_id=space_id)
8088
result = client.create_task(create_task_data)
8189

bkflow/apigw/views/create_task_without_template.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ def create_task_without_template(request, space_id):
5656
if credentials:
5757
create_task_data.setdefault("extra_info", {}).setdefault("custom_context", {})["credentials"] = credentials
5858

59+
# 将custom_span_attributes放入extra_info的custom_context中,以便通过TaskContext和parent_data.inputs获取
60+
# 用于传递到节点Span中
61+
custom_span_attributes = ser.data.get("custom_span_attributes", {})
62+
if custom_span_attributes:
63+
create_task_data.setdefault("extra_info", {}).setdefault("custom_context", {})[
64+
"custom_span_attributes"
65+
] = custom_span_attributes
66+
5967
client = TaskComponentClient(space_id=space_id)
6068
result = client.create_task(create_task_data)
6169
return result

bkflow/pipeline_plugins/components/collections/base.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,21 @@ def _get_span_name(self):
8484

8585
def _get_span_attributes(self, data, parent_data):
8686
"""获取 Span 属性,子类可以覆盖此方法来添加自定义属性"""
87-
return {
87+
attributes = {
8888
"space_id": parent_data.get_one_of_inputs("task_space_id"),
8989
"task_id": parent_data.get_one_of_inputs("task_id"),
9090
"node_id": self.id,
9191
}
9292

93+
# 从 parent_data 中获取 custom_span_attributes,并合并到 Span 属性中
94+
# custom_span_attributes 通过 TaskContext 从 extra_info.custom_context 传递过来
95+
custom_span_attributes = parent_data.get_one_of_inputs("custom_span_attributes")
96+
if custom_span_attributes and isinstance(custom_span_attributes, dict):
97+
# 将自定义属性合并到基础属性中,自定义属性优先级更高
98+
attributes.update(custom_span_attributes)
99+
100+
return attributes
101+
93102
def _get_trace_context(self, parent_data):
94103
"""从 parent_data 中获取 trace context"""
95104
return {
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
"""
2+
TencentBlueKing is pleased to support the open source community by making
3+
蓝鲸流程引擎服务 (BlueKing Flow Engine Service) available.
4+
Copyright (C) 2024 THL A29 Limited,
5+
a Tencent company. All rights reserved.
6+
Licensed under the MIT License (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at http://opensource.org/licenses/MIT
9+
Unless required by applicable law or agreed to in writing,
10+
software distributed under the License is distributed on
11+
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
12+
either express or implied. See the License for the
13+
specific language governing permissions and limitations under the License.
14+
15+
We undertake not to change the open source license (MIT license) applicable
16+
17+
to the current version of the project delivered to anyone in the future.
18+
"""
19+
import json
20+
from unittest import mock
21+
22+
from bamboo_engine.builder import (
23+
EmptyEndEvent,
24+
EmptyStartEvent,
25+
ServiceActivity,
26+
build_tree,
27+
)
28+
from django.test import TestCase, override_settings
29+
30+
from bkflow.space.models import Space
31+
from bkflow.template.models import Template, TemplateSnapshot
32+
33+
34+
def build_pipeline_tree():
35+
"""构建测试用的 pipeline tree"""
36+
start = EmptyStartEvent()
37+
act_1 = ServiceActivity(component_code="example_component")
38+
end = EmptyEndEvent()
39+
start.extend(act_1).extend(end)
40+
return build_tree(start, data={"test": "test"})
41+
42+
43+
class TestCreateTask(TestCase):
44+
"""Test create_task apigw views"""
45+
46+
def setUp(self):
47+
self.space = Space.objects.create(app_code="test", platform_url="http://test.com", name="test_space")
48+
49+
@override_settings(
50+
BK_APIGW_REQUIRE_EXEMPT=True, MIDDLEWARE=("tests.interface.apigw.middlewares.OverrideMiddleware",)
51+
)
52+
@mock.patch("bkflow.apigw.views.create_task.TaskComponentClient")
53+
def test_create_task_with_custom_span_attributes(self, mock_client_class):
54+
"""Test create_task with custom_span_attributes parameter"""
55+
pipeline_tree = build_pipeline_tree()
56+
snapshot = TemplateSnapshot.create_snapshot(pipeline_tree=pipeline_tree, username="test_user", version="1.0.0")
57+
template = Template.objects.create(
58+
name="测试流程", space_id=self.space.id, snapshot_id=snapshot.id, creator="test_user"
59+
)
60+
snapshot.template_id = template.id
61+
snapshot.save()
62+
63+
mock_client = mock.Mock()
64+
mock_client.create_task.return_value = {
65+
"result": True,
66+
"data": {"id": 1, "name": "测试任务", "template_id": template.id, "parameters": {}},
67+
}
68+
mock_client_class.return_value = mock_client
69+
70+
custom_span_attributes = {"business_id": "12345", "request_id": "req-abc-123"}
71+
data = {
72+
"template_id": template.id,
73+
"name": "测试任务",
74+
"creator": "test_user",
75+
"custom_span_attributes": custom_span_attributes,
76+
}
77+
78+
url = "/apigw/space/{}/create_task/".format(self.space.id)
79+
resp = self.client.post(path=url, data=json.dumps(data), content_type="application/json")
80+
81+
resp_data = json.loads(resp.content)
82+
self.assertEqual(resp.status_code, 200)
83+
self.assertEqual(resp_data["result"], True)
84+
85+
# 验证 custom_span_attributes 被传递到 create_task_data 中
86+
call_args = mock_client.create_task.call_args[0][0]
87+
self.assertIn("extra_info", call_args)
88+
self.assertIn("custom_context", call_args["extra_info"])
89+
self.assertEqual(call_args["extra_info"]["custom_context"]["custom_span_attributes"], custom_span_attributes)
90+
91+
@override_settings(
92+
BK_APIGW_REQUIRE_EXEMPT=True, MIDDLEWARE=("tests.interface.apigw.middlewares.OverrideMiddleware",)
93+
)
94+
@mock.patch("bkflow.apigw.views.create_task_by_app.TaskComponentClient")
95+
def test_create_task_by_app_with_custom_span_attributes(self, mock_client_class):
96+
"""Test create_task_by_app with custom_span_attributes parameter"""
97+
pipeline_tree = build_pipeline_tree()
98+
snapshot = TemplateSnapshot.create_snapshot(pipeline_tree=pipeline_tree, username="test_user", version="1.0.0")
99+
template = Template.objects.create(
100+
name="测试流程",
101+
space_id=self.space.id,
102+
snapshot_id=snapshot.id,
103+
creator="test_user",
104+
bk_app_code="test",
105+
)
106+
snapshot.template_id = template.id
107+
snapshot.save()
108+
109+
mock_client = mock.Mock()
110+
mock_client.create_task.return_value = {
111+
"result": True,
112+
"data": {"id": 1, "name": "测试任务", "template_id": template.id, "parameters": {}},
113+
}
114+
mock_client_class.return_value = mock_client
115+
116+
custom_span_attributes = {"user_type": "vip", "source": "api"}
117+
data = {
118+
"template_id": template.id,
119+
"name": "测试任务",
120+
"custom_span_attributes": custom_span_attributes,
121+
}
122+
123+
url = "/apigw/template/{}/create_task_by_app/".format(template.id)
124+
resp = self.client.post(path=url, data=json.dumps(data), content_type="application/json")
125+
126+
resp_data = json.loads(resp.content)
127+
self.assertEqual(resp.status_code, 200)
128+
self.assertEqual(resp_data["result"], True)
129+
130+
# 验证 custom_span_attributes 被传递到 create_task_data 中
131+
call_args = mock_client.create_task.call_args[0][0]
132+
self.assertIn("extra_info", call_args)
133+
self.assertIn("custom_context", call_args["extra_info"])
134+
self.assertEqual(call_args["extra_info"]["custom_context"]["custom_span_attributes"], custom_span_attributes)
135+
136+
@override_settings(
137+
BK_APIGW_REQUIRE_EXEMPT=True, MIDDLEWARE=("tests.interface.apigw.middlewares.OverrideMiddleware",)
138+
)
139+
@mock.patch("bkflow.apigw.views.create_task_without_template.TaskComponentClient")
140+
def test_create_task_without_template_with_custom_span_attributes(self, mock_client_class):
141+
"""Test create_task_without_template with custom_span_attributes parameter"""
142+
pipeline_tree = build_pipeline_tree()
143+
144+
mock_client = mock.Mock()
145+
mock_client.create_task.return_value = {
146+
"result": True,
147+
"data": {"id": 1, "name": "测试任务", "parameters": {}},
148+
}
149+
mock_client_class.return_value = mock_client
150+
151+
custom_span_attributes = {"env": "prod", "region": "us-east-1"}
152+
data = {
153+
"name": "测试任务",
154+
"creator": "test_user",
155+
"pipeline_tree": pipeline_tree,
156+
"custom_span_attributes": custom_span_attributes,
157+
}
158+
159+
url = "/apigw/space/{}/create_task_without_template/".format(self.space.id)
160+
resp = self.client.post(path=url, data=json.dumps(data), content_type="application/json")
161+
162+
resp_data = json.loads(resp.content)
163+
self.assertEqual(resp.status_code, 200)
164+
self.assertEqual(resp_data["result"], True)
165+
166+
# 验证 custom_span_attributes 被传递到 create_task_data 中
167+
call_args = mock_client.create_task.call_args[0][0]
168+
self.assertIn("extra_info", call_args)
169+
self.assertIn("custom_context", call_args["extra_info"])
170+
self.assertEqual(call_args["extra_info"]["custom_context"]["custom_span_attributes"], custom_span_attributes)
171+
172+
@override_settings(
173+
BK_APIGW_REQUIRE_EXEMPT=True, MIDDLEWARE=("tests.interface.apigw.middlewares.OverrideMiddleware",)
174+
)
175+
@mock.patch("bkflow.apigw.views.create_mock_task.TaskComponentClient")
176+
def test_create_mock_task_with_custom_span_attributes(self, mock_client_class):
177+
"""Test create_mock_task with custom_span_attributes parameter"""
178+
pipeline_tree = build_pipeline_tree()
179+
snapshot = TemplateSnapshot.create_snapshot(pipeline_tree=pipeline_tree, username="test_user", version="1.0.0")
180+
template = Template.objects.create(
181+
name="测试流程", space_id=self.space.id, snapshot_id=snapshot.id, creator="test_user"
182+
)
183+
snapshot.template_id = template.id
184+
snapshot.save()
185+
186+
mock_client = mock.Mock()
187+
mock_client.create_task.return_value = {
188+
"result": True,
189+
"data": {"id": 1, "name": "Mock任务", "template_id": template.id, "parameters": {}},
190+
}
191+
mock_client_class.return_value = mock_client
192+
193+
custom_span_attributes = {"test_mode": "mock", "debug": "true"}
194+
data = {
195+
"template_id": template.id,
196+
"name": "Mock任务",
197+
"creator": "test_user",
198+
"mock_data": {"nodes": [], "outputs": {}, "mock_data_ids": {}},
199+
"custom_span_attributes": custom_span_attributes,
200+
}
201+
202+
url = "/apigw/space/{}/create_mock_task/".format(self.space.id)
203+
resp = self.client.post(path=url, data=json.dumps(data), content_type="application/json")
204+
205+
resp_data = json.loads(resp.content)
206+
self.assertEqual(resp.status_code, 200)
207+
self.assertEqual(resp_data["result"], True)
208+
209+
# 验证 custom_span_attributes 被传递到 create_task_data 中
210+
call_args = mock_client.create_task.call_args[0][0]
211+
self.assertIn("extra_info", call_args)
212+
self.assertIn("custom_context", call_args["extra_info"])
213+
self.assertEqual(call_args["extra_info"]["custom_context"]["custom_span_attributes"], custom_span_attributes)

0 commit comments

Comments
 (0)