Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bkflow/interface/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
home,
is_admin_or_current_space_superuser,
is_admin_or_space_superuser,
remote_trigger,
user_exit,
)

Expand All @@ -36,6 +37,7 @@
url(r"^is_admin_user/$", is_admin_or_space_superuser),
url(r"^is_current_space_admin/$", is_admin_or_current_space_superuser),
url(r"^callback/(?P<token>.+)/$", callback),
url(r"^remote_trigger/(?P<token>.+)/$", remote_trigger),
url(r"^get_msg_types/$", get_msg_types),
url(r"^itsm_approve/$", itsm_approve),
url(r"", include("bkflow.interface.task.urls")),
Expand Down
70 changes: 67 additions & 3 deletions bkflow/interface/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import logging
import traceback

from bkflow_feel.api import parse_expression
from bkflow_feel.exceptions import ValidationError
from blueapps.account import ConfFixture
from blueapps.account.decorators import login_exempt
from blueapps.account.handlers.response import ResponseHandler
Expand All @@ -37,6 +39,9 @@
from bkflow.contrib.api.collections.task import TaskComponentClient
from bkflow.space.configs import SuperusersConfig
from bkflow.space.models import Space, SpaceConfig
from bkflow.template.models import Template, Trigger
from bkflow.template.serializers.trigger import RemoteTriggerSerializer
from bkflow.template.utils import create_trigger_tasks

logger = logging.getLogger("root")

Expand Down Expand Up @@ -115,9 +120,7 @@ def callback(request, token):
try:
callback_data = json.loads(request.body)
except Exception:
message = _("节点回调失败: 无效的请求, 请重试. 如持续失败可联系管理员处理. {msg} | api callback").format(
msg=traceback.format_exc()
)
message = _("节点回调失败: 无效的请求, 请重试. 如持续失败可联系管理员处理. {msg} | api callback").format(msg=traceback.format_exc())
logger.error(message)
return JsonResponse({"result": False, "message": message}, status=400)

Expand All @@ -135,3 +138,64 @@ def callback(request, token):
"[callback] resp, space_id={}, task_id={}, node_id={}, resp={}".format(space_id, task_id, node_id, resp)
)
return JsonResponse(resp)


@login_exempt
@csrf_exempt
@require_POST
def remote_trigger(request, token):
try:
trigger_data = json.loads(request.body)
trigger_serializer = RemoteTriggerSerializer(data=trigger_data)
trigger_serializer.is_valid(raise_exception=True)
except Exception:
message = _("触发器调用失败: 无效的请求, 请重试. 如持续失败可联系管理员处理. {msg} | api trigger").format(msg=traceback.format_exc())
logger.error(message)
return JsonResponse({"result": False, "message": message}, status=400)
trigger_data = trigger_serializer.validated_data
space_id, template_id = trigger_data.get("space_id"), trigger_data.get("template_id")
trigger_id = trigger_data.get("trigger_id")
trigger_cond = trigger_data.get("condition")

template_instance = Template.objects.filter(space_id=space_id, id=template_id)

if not Space.exists(space_id=space_id) or not template_instance.exists():
err_msg = f"对应 空间 {space_id} 或 流程 {template_id} 不存在"
logger.error(err_msg)
return JsonResponse({"result": False, "message": err_msg}, status=400)

try:
instance = Trigger.objects.get(
id=trigger_id,
space_id=space_id,
template_id=template_id,
type=Trigger.TYPE_REMOTE,
is_enabled=True,
token=token,
)
except Trigger.DoesNotExist:
err_msg = f"对应 空间 {space_id} 流程 {template_id} 触发器 {trigger_id} 不存在 请检查信息是否正确以及启用状态 token 是否正确"
logger.error(err_msg)
return JsonResponse({"result": False, "message": err_msg}, status=400)

msg = None
try:
if not parse_expression(expression=instance.condition, context=trigger_cond):
msg = f"未达到触发条件 {instance.condition}:{trigger_cond}"
except ValidationError:
msg = f"未达到触发条件 {instance.condition}:{trigger_cond}"
# 不匹配的时候 可能直接出现解析失败 如果失败 也认为不匹配 直接返回
if msg:
logger.info(msg)
return JsonResponse({"result": True, "message": msg}, status=200)

try:
template_instance = template_instance.first()
trigger_data["pipeline_tree"] = template_instance.pipeline_tree
trigger_data["name"] = template_instance.name
task_id = create_trigger_tasks(trigger_data=trigger_data)
except Exception:
message = _("请求创建任务失败, 请重试. 如持续失败可联系管理员处理. {msg} | api trigger").format(msg=traceback.format_exc())
return JsonResponse({"result": False, "message": message}, status=400)

return JsonResponse({"result": True, "message": "success", "task_id": task_id}, status=200)
44 changes: 44 additions & 0 deletions bkflow/template/migrations/0005_auto_20250508_1434.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Generated by Django 3.2.15 on 2025-05-08 06:34

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("template", "0004_auto_20240823_1544"),
]

operations = [
migrations.CreateModel(
name="Trigger",
fields=[
("id", models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("creator", models.CharField(blank=True, max_length=32, null=True, verbose_name="创建人")),
("create_at", models.DateTimeField(auto_now_add=True, verbose_name="创建时间")),
("update_at", models.DateTimeField(auto_now=True, verbose_name="更新时间")),
("updated_by", models.CharField(blank=True, max_length=32, null=True, verbose_name="修改人")),
("is_deleted", models.BooleanField(db_index=True, default=False, verbose_name="是否软删除")),
("space_id", models.IntegerField(help_text="Space ID")),
("template_id", models.IntegerField(db_index=True, help_text="Related template ID")),
("is_enabled", models.BooleanField(default=True, help_text="Indicates whether the trigger is enabled")),
("name", models.CharField(max_length=100)),
("condition", models.TextField(help_text="Condition for the trigger")),
("config", models.JSONField(help_text="Configuration for the trigger")),
("token", models.CharField(help_text="Token for remote authentication", max_length=255)),
(
"type",
models.CharField(
choices=[("interval", "定时"), ("manual", "手动"), ("remote", "远程")],
default="manual",
help_text="Type of the trigger",
max_length=20,
),
),
],
),
migrations.AddIndex(
model_name="trigger",
index=models.Index(fields=["space_id", "template_id"], name="template_tr_space_i_0cf8a3_idx"),
),
]
27 changes: 27 additions & 0 deletions bkflow/template/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,30 @@ class Meta:
verbose_name_plural = "Template Mock Scheme"
ordering = ["-id"]
index_together = ["space_id", "template_id"]


class Trigger(CommonModel):
# 定义触发器类型选项
TYPE_INTERVAL = "interval"
TYPE_MANUAL = "manual"
TYPE_REMOTE = "remote"

TYPE_CHOICES = [
(TYPE_INTERVAL, "定时"), # 定时
(TYPE_REMOTE, "远程"), # 远程
]
space_id = models.IntegerField(help_text="Space ID")
template_id = models.IntegerField(help_text="Related template ID", db_index=True)
is_enabled = models.BooleanField(default=True, help_text="Indicates whether the trigger is enabled")
name = models.CharField(max_length=100)
condition = models.TextField(help_text="Condition for the trigger")
config = models.JSONField(help_text="Configuration for the trigger")
token = models.CharField(max_length=255, help_text="Token for remote authentication")
type = models.CharField(
max_length=20, choices=TYPE_CHOICES, default=TYPE_MANUAL, help_text="Type of the trigger" # 设置默认触发类型
)

class Meta:
indexes = [
models.Index(fields=["space_id", "template_id"]),
]
84 changes: 84 additions & 0 deletions bkflow/template/serializers/trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making
蓝鲸流程引擎服务 (BlueKing Flow Engine Service) available.
Copyright (C) 2024 THL A29 Limited,
a Tencent company. All rights reserved.
Licensed under the MIT License (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied. See the License for the
specific language governing permissions and limitations under the License.

We undertake not to change the open source license (MIT license) applicable

to the current version of the project delivered to anyone in the future.
"""

from django.utils.translation import ugettext_lazy as _
from rest_framework import serializers

from bkflow.space.models import Space
from bkflow.template.models import Template, Trigger


class TriggerSerializer(serializers.ModelSerializer):
create_at = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S")
update_at = serializers.DateTimeField(format="%Y-%m-%d %H:%M:%S")

class Meta:
model = Trigger
fields = "__all__"


class ListTriggerSerializer(serializers.Serializer):
space_id = serializers.IntegerField(help_text=_("空间ID"), required=True)
template_id = serializers.IntegerField(help_text=_("模板ID"), required=False)


class CreateTriggerSerializer(serializers.Serializer):
space_id = serializers.IntegerField(help_text="空间ID", required=True)
template_id = serializers.IntegerField(help_text="模板ID", required=True)
is_enabled = serializers.BooleanField(help_text="是否启用", required=False, default=True)
name = serializers.CharField(max_length=100, help_text="名称", required=True)
condition = serializers.CharField(help_text="条件", required=True)
config = serializers.JSONField(help_text="配置", required=False)
type = serializers.CharField(max_length=20, help_text="触发类型", required=True)
token = serializers.CharField(help_text="远程密钥", required=False)

def validate_type(self, value):
valid_types = {choice[0] for choice in Trigger.TYPE_CHOICES}
if value not in valid_types:
raise serializers.ValidationError(f"Invalid type. Expected one of: {valid_types}")
return value

def validate(self, data):
# remote 类型必须有 token
type_value = data.get("type")
token_value = data.get("token")

space_id = data.get("space_id")
template_id = data.get("template_id")

if type_value == Trigger.TYPE_REMOTE and not token_value:
raise serializers.ValidationError({"token": "Token is required when type is remote"})

if not Space.exists(space_id) or not Template.objects.filter(id=template_id, space_id=space_id).exists():
raise serializers.ValidationError(f"对应 空间 {space_id} 或 流程 {template_id} 不存在")
return data


class RemoteTriggerSerializer(serializers.Serializer):
space_id = serializers.IntegerField(help_text="空间ID", required=True)
template_id = serializers.IntegerField(help_text="模板ID", required=True)
condition = serializers.JSONField(help_text="触发条件", required=True)
trigger_id = serializers.IntegerField(help_text="触发器ID", required=True)
creator = serializers.CharField(help_text="执行人", required=True)

def validate_condition(self, value):
if not isinstance(value, dict):
raise serializers.ValidationError("Condition must be a JSON object.")
return value
4 changes: 3 additions & 1 deletion bkflow/template/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
AdminTemplateViewSet,
TemplateMockDataViewSet,
TemplateMockSchemeViewSet,
TemplateViewSet,
TemplateMockTaskViewSet,
TemplateViewSet,
)
from bkflow.template.views.trigger import TriggerViewSet
from bkflow.template.views.variable import VariableViewSet

router = DefaultRouter()
Expand All @@ -36,6 +37,7 @@
router.register(r"^template_mock_data", TemplateMockDataViewSet, basename="template_mock_data")
router.register(r"^template_mock_scheme", TemplateMockSchemeViewSet, basename="template_mock_scheme")
router.register(r"^template_mock_task", TemplateMockTaskViewSet, basename="template_mock_task")
router.register(r"trigger", TriggerViewSet, basename="trigger")
router.register(r"", TemplateViewSet, basename="template")

urlpatterns = [
Expand Down
32 changes: 32 additions & 0 deletions bkflow/template/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
to the current version of the project delivered to anyone in the future.
"""
import logging
from datetime import datetime

from pipeline.core.data.expression import ConstantTemplate

from bkflow.contrib.api.collections.task import TaskComponentClient
from bkflow.exceptions import APIResponseError
from bkflow.space.configs import CallbackHooksConfig
from bkflow.space.models import SpaceConfig
from bkflow.utils.api_client import ApiGwClient
Expand Down Expand Up @@ -130,3 +133,32 @@ def send_callback(space_id, callback_type, data):
"[send_callback] send_callback error, callback_type={}, data={}, err={}".format(callback_type, data, e)
)
return


def create_trigger_tasks(trigger_data):
"""
提交创建触发器任务
"""
space_id, template_id = trigger_data.get("space_id"), trigger_data.get("template_id")
pipeline_tree = trigger_data.get("pipeline_tree")
client = TaskComponentClient(space_id=space_id)
name = trigger_data.get("name")
formatted_time = datetime.now().strftime("%Y%m%d%H%M")
task_name = f"{name}_{formatted_time}_trigger"
task_data = {
"template_id": template_id,
"space_id": space_id,
"pipeline_tree": pipeline_tree,
"creator": trigger_data["creator"],
"name": task_name,
}
resp = client.create_task(task_data)
if not resp["result"]:
raise APIResponseError(resp["message"])
task_id = resp["data"]["id"]
trigger_data["operator"] = trigger_data["creator"]
resp = client.operate_task(task_id=task_id, operate="start", data=trigger_data)
if not resp["result"]:
raise APIResponseError(resp["message"])
# 创建任务失败或请求失败
return task_id
Loading
Loading