Skip to content

Commit 2090caa

Browse files
authored
Merge pull request #281 from TencentBlueKing/develop
release V1.11.0
2 parents 6921983 + 5e0e5dd commit 2090caa

File tree

127 files changed

+5658
-140
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

127 files changed

+5658
-140
lines changed

.gitignore

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,9 @@ staticfiles
142142

143143
bkflow/interface/templates
144144
static/bkflow
145-
frontend/static
145+
frontend/static
146+
147+
pre-*-bkcodeai
148+
.vscode/settings.json
149+
150+
.idea

app_desc.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
spec_version: 2
2-
app_version: "1.10.0"
2+
app_version: "1.11.0"
33
app:
44
region: default
55
bk_app_code: &APP_CODE bk_flow_engine

bkcodeai.json

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"crignore_file_patterns": [
3+
"out/",
4+
"dist/",
5+
"assets/",
6+
"images/",
7+
"package.json",
8+
"package-lock.json",
9+
"yarn.lock",
10+
".env.*",
11+
"*.env",
12+
"*.map",
13+
"*.yml",
14+
"*.yaml",
15+
"*.min.js",
16+
"libs/",
17+
"static/",
18+
"doc/",
19+
"docs/",
20+
"*.mo",
21+
"*.po",
22+
"*.ini",
23+
"*.toml",
24+
"*.json",
25+
"*.xml",
26+
"*.lock",
27+
"*.md",
28+
"*.txt"
29+
]
30+
}

bkflow/apigw/serializers/task.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ class CreateMockTaskBaseSerializer(serializers.Serializer):
5353

5454
class CreateMockTaskWithPipelineTreeSerializer(CreateMockTaskBaseSerializer):
5555
pipeline_tree = serializers.JSONField(help_text=_("任务树"), required=True)
56+
include_node_ids = serializers.ListField(
57+
child=serializers.CharField(allow_blank=False), help_text=_("包含的节点ID列表"), required=False
58+
)
5659

5760

5861
class CreateMockTaskWithTemplateIdSerializer(CreateMockTaskBaseSerializer):

bkflow/apigw/views/create_template.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
from bkflow.space.utils import build_default_pipeline_tree_with_space_id
3636
from bkflow.template.models import Template, TemplateSnapshot
3737
from bkflow.utils import err_code
38+
from bkflow.utils.canvas import OperateType
39+
from bkflow.utils.pipeline import replace_pipeline_tree_node_ids
3840

3941
logger = logging.getLogger("root")
4042

@@ -70,7 +72,7 @@ def create_template(request, space_id):
7072
# 在序列化器中已经判断了存在,所以不需要处理异常
7173
source_template = Template.objects.get(id=source_template_id)
7274
pipeline_tree = copy.deepcopy(source_template.pipeline_tree)
73-
recursive_replace_id(pipeline_tree)
75+
replace_pipeline_tree_node_ids(pipeline_tree, OperateType.CREATE_TEMPLATE.value)
7476
elif pipeline_tree:
7577
recursive_replace_id(pipeline_tree)
7678
else:

bkflow/contrib/api/collections/task.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ def render_current_constants(self, task_id):
108108
method="get", url=self._get_task_url("task/{}/render_current_constants/".format(task_id)), data=None
109109
)
110110

111+
def render_context_with_node_outputs(self, task_id, data=None):
112+
return self._request(
113+
method="post",
114+
url=self._get_task_url("task/{}/render_context_with_node_outputs/".format(task_id)),
115+
data=data,
116+
)
117+
111118
def get_task_operation_record(self, task_id, data=None):
112119
return self._request(
113120
method="get", url=self._get_task_url("task/{}/get_task_operation_record/".format(task_id)), data=data
@@ -130,3 +137,12 @@ def trigger_engine_admin_action(self, instance_id, action, data=None):
130137

131138
def batch_delete_tasks(self, data):
132139
return self._request(method="post", url=self._get_task_url("task/batch_delete_tasks/"), data=data)
140+
141+
def get_engine_config(self, data):
142+
return self._request(method="get", url=self._get_task_url("task/get_engine_config/"), data=data)
143+
144+
def upsert_engine_config(self, data):
145+
return self._request(method="post", url=self._get_task_url("task/upsert_engine_config/"), data=data)
146+
147+
def delete_engine_config(self, data):
148+
return self._request(method="delete", url=self._get_task_url("task/delete_engine_config/"), data=data)

bkflow/contrib/openapi/serializers.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,8 @@ def validate(self, attrs):
7474
raise serializers.ValidationError("is_mock must exist when delete all tasks")
7575

7676
return attrs
77+
78+
79+
class RenderConstantsBodySerializer(serializers.Serializer):
80+
node_ids = serializers.ListField(required=True, child=serializers.CharField())
81+
to_render_constants = serializers.ListField(required=True, child=serializers.DictField())

bkflow/interface/task/utils.py

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
TencentBlueKing is pleased to support the open source community by making
4+
蓝鲸流程引擎服务 (BlueKing Flow Engine Service) available.
5+
Copyright (C) 2024 THL A29 Limited,
6+
a Tencent company. All rights reserved.
7+
Licensed under the MIT License (the "License");
8+
you may not use this file except in compliance with the License.
9+
You may obtain a copy of the License at http://opensource.org/licenses/MIT
10+
Unless required by applicable law or agreed to in writing,
11+
software distributed under the License is distributed on
12+
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
13+
either express or implied. See the License for the
14+
specific language governing permissions and limitations under the License.
15+
16+
We undertake not to change the open source license (MIT license) applicable
17+
18+
to the current version of the project delivered to anyone in the future.
19+
"""
20+
import logging
21+
from typing import Dict
22+
23+
from bkflow.contrib.api.collections.task import TaskComponentClient
24+
25+
logger = logging.getLogger(__name__)
26+
27+
28+
class StageJobStateHandler:
29+
"""处理Stage和Job状态的业务逻辑处理器"""
30+
31+
def __init__(self, space_id: int, is_superuser: bool = False):
32+
self.space_id = space_id
33+
self.is_superuser = is_superuser
34+
self.client = TaskComponentClient(space_id=space_id, from_superuser=is_superuser)
35+
36+
def get_task_data(self, task_id: str) -> dict:
37+
"""获取任务相关的基础数据
38+
39+
Args: task_id: 任务ID
40+
Returns: dict: 包含任务相关数据的字典,如果获取失败则包含默认值
41+
"""
42+
try:
43+
task_detail = self.client.get_task_detail(task_id)
44+
45+
if "data" not in task_detail:
46+
logger.warning(f"Task {task_id} response data missing: detail={task_detail}")
47+
48+
pipeline_tree = task_detail.get("data", {}).get("pipeline_tree", {})
49+
return {
50+
"task_detail": task_detail,
51+
"pipeline_tree": pipeline_tree,
52+
}
53+
except Exception as e:
54+
logger.exception(f"Failed to get task data for task {task_id}: {str(e)}")
55+
raise
56+
57+
def get_node_states(self, task_id: str) -> Dict:
58+
"""获取节点状态信息
59+
60+
Args: task_id: 任务ID
61+
Returns: 节点状态信息字典
62+
"""
63+
data = {"space_id": self.space_id}
64+
65+
try:
66+
response = self.client.get_task_states(task_id, data=data)
67+
if "data" not in response:
68+
logger.warning(f"Task {task_id} states response data missing: {response}")
69+
70+
return response.get("data", {}).get("children", {})
71+
except Exception as e:
72+
logger.exception(f"Failed to get node states for task {task_id}: {str(e)}")
73+
return {}
74+
75+
def build_template_task_mapping(self, activities: dict) -> dict:
76+
"""构建模板节点ID到任务节点ID的映射"""
77+
return {
78+
activity.get("template_node_id"): task_node_id
79+
for task_node_id, activity in activities.items()
80+
if activity.get("template_node_id")
81+
}
82+
83+
def build_node_info_map(self, template_to_task_id: dict, node_states: dict) -> dict:
84+
"""构建节点信息映射"""
85+
return {
86+
template_id: {
87+
"state": node_states[task_id].get("state", "READY"),
88+
"start_time": node_states[task_id].get("start_time", ""),
89+
"finish_time": node_states[task_id].get("finish_time", ""),
90+
"loop": node_states[task_id].get("loop", 1),
91+
"retry": node_states[task_id].get("retry", 0),
92+
"skip": node_states[task_id].get("skip", False),
93+
"error_ignorable": node_states[task_id].get("error_ignorable", False),
94+
"error_ignored": node_states[task_id].get("error_ignored", False),
95+
}
96+
for template_id, task_id in template_to_task_id.items()
97+
if task_id in node_states
98+
}
99+
100+
@staticmethod
101+
def calculate_job_state(node_states: list) -> str:
102+
"""计算Job状态"""
103+
if not node_states:
104+
return "READY"
105+
106+
if "FAILED" in node_states:
107+
return "FAILED"
108+
109+
if "RUNNING" in node_states:
110+
return "RUNNING"
111+
112+
if all(state == "READY" for state in node_states):
113+
return "READY"
114+
115+
if all(state == "FINISHED" for state in node_states):
116+
return "FINISHED"
117+
118+
return "RUNNING"
119+
120+
@staticmethod
121+
def calculate_stage_state(job_states: list) -> str:
122+
"""计算Stage状态"""
123+
if not job_states:
124+
return "READY"
125+
126+
if "FAILED" in job_states:
127+
return "FAILED"
128+
129+
if "RUNNING" in job_states:
130+
return "RUNNING"
131+
132+
if all(state == "READY" for state in job_states):
133+
return "READY"
134+
135+
if all(state == "FINISHED" for state in job_states):
136+
return "FINISHED"
137+
138+
return "RUNNING"
139+
140+
def update_states(self, stage_struct: list, node_info_map: dict) -> None:
141+
"""更新状态信息"""
142+
# 构建job到nodes的映射,避免深层嵌套
143+
job_nodes_map = {}
144+
stage_jobs_map = {}
145+
146+
# 第一次遍历:构建映射关系
147+
for stage in stage_struct:
148+
stage_jobs = []
149+
for job in stage["jobs"]:
150+
job_id = job["id"]
151+
job_nodes_map[job_id] = job["nodes"]
152+
stage_jobs.append(job_id)
153+
stage_jobs_map[stage["id"]] = stage_jobs
154+
155+
# 第二次遍历:更新节点状态
156+
job_states_map = {}
157+
for job_id, nodes in job_nodes_map.items():
158+
node_states = []
159+
for node in nodes:
160+
template_node_id = node["id"]
161+
info = node_info_map.get(
162+
template_node_id,
163+
{
164+
"state": "READY",
165+
"start_time": "",
166+
"finish_time": "",
167+
"loop": 1,
168+
"retry": 0,
169+
"skip": False,
170+
"error_ignorable": False,
171+
"error_ignored": False,
172+
},
173+
)
174+
node.update(info)
175+
node_states.append(info["state"])
176+
177+
# 计算job状态
178+
job_state = self.calculate_job_state(node_states)
179+
job_states_map[job_id] = job_state
180+
181+
# 第三次遍历:更新stage和job状态
182+
for stage in stage_struct:
183+
stage_jobs = stage_jobs_map[stage["id"]]
184+
job_states = [job_states_map[job_id] for job_id in stage_jobs]
185+
186+
# 更新job状态
187+
for job in stage["jobs"]:
188+
job["state"] = job_states_map[job["id"]]
189+
190+
# 更新stage状态
191+
stage["state"] = self.calculate_stage_state(job_states)
192+
193+
def process(self, task_id: str) -> list:
194+
"""处理完整的状态更新流程"""
195+
# 1. 获取基础数据
196+
task_data = self.get_task_data(task_id)
197+
activities = task_data.get("pipeline_tree", {}).get("activities", {})
198+
stage_struct = task_data.get("pipeline_tree", {}).get("stage_canvas_data", [])
199+
200+
# 2. 获取节点状态
201+
node_states = self.get_node_states(task_id)
202+
203+
# 3. 构建映射关系
204+
template_to_task_id = self.build_template_task_mapping(activities)
205+
node_info_map = self.build_node_info_map(template_to_task_id, node_states)
206+
207+
# 4. 更新状态
208+
self.update_states(stage_struct, node_info_map)
209+
210+
return stage_struct
211+
212+
213+
class StageConstantHandler:
214+
def __init__(self, space_id: int, is_superuser: bool = False):
215+
self.space_id = space_id
216+
self.is_superuser = is_superuser
217+
self.client = TaskComponentClient(space_id=space_id, from_superuser=is_superuser)
218+
219+
def process(self, task_id: str, node_ids: list, stage_constants: list) -> dict:
220+
to_render_constants = [constant["key"] for constant in stage_constants]
221+
rendered_constants = self.client.render_context_with_node_outputs(
222+
task_id, data={"node_ids": node_ids, "to_render_constants": to_render_constants}
223+
)
224+
225+
return rendered_constants

bkflow/interface/task/view.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from bkflow.contrib.api.collections.task import TaskComponentClient
3030
from bkflow.contrib.openapi.serializers import (
3131
GetTasksStatesBodySerializer,
32+
RenderConstantsBodySerializer,
3233
TaskBatchDeleteSerializer,
3334
TaskEngineAdminSerializer,
3435
)
@@ -37,6 +38,7 @@
3738
TaskMockTokenPermission,
3839
TaskTokenPermission,
3940
)
41+
from bkflow.interface.task.utils import StageConstantHandler, StageJobStateHandler
4042
from bkflow.permission.models import TASK_PERMISSION_TYPE, Token
4143
from bkflow.space.configs import SuperusersConfig
4244
from bkflow.space.models import SpaceConfig
@@ -229,3 +231,24 @@ def get_node_snapshot_config(self, request, task_id, node_id, *args, **kwargs):
229231
data = {"node_id": node_id}
230232
result = client.get_node_snapshot_config(task_id, data)
231233
return Response(result)
234+
235+
@action(methods=["GET"], detail=False, url_path="get_stage_job_states/(?P<task_id>\\d+)")
236+
def get_stage_and_job_states(self, request, task_id, *args, **kwargs):
237+
"""获取stage和job状态的视图函数"""
238+
space_id = self.get_space_id(request)
239+
handler = StageJobStateHandler(space_id, request.user.is_superuser)
240+
result = handler.process(task_id)
241+
return Response(result)
242+
243+
@action(methods=["POST"], detail=False, url_path="rendered_stage_constants/(?P<task_id>\\d+)")
244+
@swagger_auto_schema(operation_description="渲染stage画布变量", request_body=RenderConstantsBodySerializer)
245+
def render_stage_constants(self, request, task_id, *args, **kwargs):
246+
"""渲染stage画布变量"""
247+
space_id = self.get_space_id(request)
248+
serializer = RenderConstantsBodySerializer(data=request.data)
249+
serializer.is_valid(raise_exception=True)
250+
node_ids = serializer.validated_data.get("node_ids", [])
251+
stage_constants = serializer.validated_data.get("to_render_constants", {})
252+
handler = StageConstantHandler(space_id, request.user.is_superuser)
253+
result = handler.process(task_id, node_ids, stage_constants)
254+
return Response(result)

0 commit comments

Comments
 (0)