Skip to content

Commit

Permalink
当工单不在执行态时,记录日志,不报错,避免q-task不断重试
Browse files Browse the repository at this point in the history
  • Loading branch information
peixubin committed Oct 27, 2024
1 parent eb99790 commit 1d5b60f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
30 changes: 26 additions & 4 deletions sql/utils/execute_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,43 @@

def execute(workflow_id, user=None):
"""为延时或异步任务准备的execute, 传入工单ID和执行人信息"""
audit_id = Audit.detail_by_workflow_id(
workflow_id=workflow_id, workflow_type=WorkflowType.SQL_REVIEW
).audit_id
# 使用当前读防止重复执行
with transaction.atomic():
workflow_detail = SqlWorkflow.objects.select_for_update().get(id=workflow_id)
# 只有排队中和定时执行的数据才可以继续执行,否则直接抛错
if workflow_detail.status not in ["workflow_queuing", "workflow_timingtask"]:
raise Exception("工单状态不正确,禁止执行!")
logger.error(f"工单号[{workflow_id}] 可能被任务调度器重试")
Audit.add_log(
audit_id=audit_id,
operation_type=5,
operation_type_desc="执行工单发生异常",
operation_info="请检查工单执行情况",
operator=user.username if user else "",
operator_display=user.display if user else "系统",
)
result = ReviewSet(
rows=[
ReviewResult(
id=1,
errlevel=2,
stagestatus="执行发生错误",
errormessage=f"任务[{workflow_id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员",
)
],
)
result.error = (
f"任务[{workflow_id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员",
)
return result
# 将工单状态修改为执行中
else:
SqlWorkflow(id=workflow_id, status="workflow_executing").save(
update_fields=["status"]
)
# 增加执行日志
audit_id = Audit.detail_by_workflow_id(
workflow_id=workflow_id, workflow_type=WorkflowType.SQL_REVIEW
).audit_id
Audit.add_log(
audit_id=audit_id,
operation_type=5,
Expand Down
31 changes: 31 additions & 0 deletions sql/utils/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,19 @@ def setUp(self):
db_name="some_db",
syntax_type=1,
)
self.wf_executing = SqlWorkflow.objects.create(
workflow_name="some_name",
group_id=1,
group_name="g1",
engineer_display="",
audit_auth_groups="some_group",
create_time=datetime.datetime.now(),
status="workflow_executing",
is_backup=True,
instance=self.ins,
db_name="some_db",
syntax_type=1,
)
SqlWorkflowContent.objects.create(
workflow=self.wf,
sql_content="some_sql",
Expand Down Expand Up @@ -409,6 +422,24 @@ def test_execute(self, _get_engine, _execute_workflow, _audit):
operator_display="系统",
)

@patch("sql.utils.execute_sql.Audit")
@patch("sql.engines.mysql.MysqlEngine.execute_workflow")
@patch("sql.engines.get_engine")
def test_execute_in_executing(self, _get_engine, _execute_workflow, _audit):
_audit.detail_by_workflow_id.return_value.audit_id = 1
result = execute(self.wf_executing.id)
_audit.add_log.assert_called_with(
audit_id=1,
operation_type=5,
operation_type_desc="执行工单发生异常",
operation_info="请检查工单执行情况",
operator="",
operator_display="系统",
)
assert result.error == (
f"任务[{self.wf_executing.id}]被重试。可能是执行时发生超时,请检查数据库会话及执行状态,或联系管理员",
)

@patch("sql.utils.execute_sql.notify_for_execute")
@patch("sql.utils.execute_sql.Audit")
def test_execute_callback_success(self, _audit, _notify):
Expand Down

0 comments on commit 1d5b60f

Please sign in to comment.