Skip to content

Commit

Permalink
my2sql失败和archive失败都抛出错误通知
Browse files Browse the repository at this point in the history
  • Loading branch information
woshiyanghai committed Oct 31, 2024
1 parent 86302bd commit dcb6a42
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 84 deletions.
147 changes: 74 additions & 73 deletions sql/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,83 +401,84 @@ def archive(archive_id):
select_cnt = 0
insert_cnt = 0
delete_cnt = 0
with FuncTimer() as t:
p = pt_archiver.execute_cmd(cmd_args)
stdout = ""
for line in iter(p.stdout.readline, ""):
if re.match(r"^SELECT\s(\d+)$", line, re.I):
select_cnt = re.findall(r"^SELECT\s(\d+)$", line)
elif re.match(r"^INSERT\s(\d+)$", line, re.I):
insert_cnt = re.findall(r"^INSERT\s(\d+)$", line)
elif re.match(r"^DELETE\s(\d+)$", line, re.I):
delete_cnt = re.findall(r"^DELETE\s(\d+)$", line)
stdout += f"{line}\n"
statistics = stdout
# 获取异常信息
stderr = p.stderr.read()
if stderr:
statistics = stdout + stderr

# 判断归档结果
select_cnt = int(select_cnt[0]) if select_cnt else 0
insert_cnt = int(insert_cnt[0]) if insert_cnt else 0
delete_cnt = int(delete_cnt[0]) if delete_cnt else 0
error_info = ""
success = True
if stderr:
error_info = f"命令执行报错:{stderr}"
success = False
if mode == "dest":
# 删除源数据,判断删除数量和写入数量
if not no_delete and (insert_cnt != delete_cnt):
error_info = f"删除和写入数量不一致:{insert_cnt}!={delete_cnt}"
success = False
elif mode == "file":
# 删除源数据,判断查询数量和删除数量
if not no_delete and (select_cnt != delete_cnt):
error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}"
success = False
elif mode == "purge":
# 直接删除。判断查询数量和删除数量
if select_cnt != delete_cnt:
error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}"
success = False

# 执行信息保存到数据库
if connection.connection and not connection.is_usable():
close_old_connections()
# 更新最后归档时间
ArchiveConfig(id=archive_id, last_archive_time=t.end).save(
update_fields=["last_archive_time"]
)
# 替换密码信息后保存
shell_cmd = " ".join(cmd_args)
ArchiveLog.objects.create(
archive=archive_info,
cmd=(
shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***")
if mode == "dest"
else shell_cmd.replace(s_ins.password, "***")
),
condition=condition,
mode=mode,
no_delete=no_delete,
sleep=sleep,
select_cnt=select_cnt,
insert_cnt=insert_cnt,
delete_cnt=delete_cnt,
statistics=statistics,
success=success,
error_info=error_info,
start_time=t.start,
end_time=t.end,
)
try:
with FuncTimer() as t:
p = pt_archiver.execute_cmd(cmd_args)
stdout = ""
for line in iter(p.stdout.readline, ""):
if re.match(r"^SELECT\s(\d+)$", line, re.I):
select_cnt = re.findall(r"^SELECT\s(\d+)$", line)
elif re.match(r"^INSERT\s(\d+)$", line, re.I):
insert_cnt = re.findall(r"^INSERT\s(\d+)$", line)
elif re.match(r"^DELETE\s(\d+)$", line, re.I):
delete_cnt = re.findall(r"^DELETE\s(\d+)$", line)
stdout += f"{line}\n"
statistics = stdout
# 获取异常信息
stderr = p.stderr.read()
if stderr:
statistics = stdout + stderr

# 判断归档结果
select_cnt = int(select_cnt[0]) if select_cnt else 0
insert_cnt = int(insert_cnt[0]) if insert_cnt else 0
delete_cnt = int(delete_cnt[0]) if delete_cnt else 0
error_info = ""
success = True
if stderr:
error_info = f"命令执行报错:{stderr}"
success = False
if mode == "dest":
# 删除源数据,判断删除数量和写入数量
if not no_delete and (insert_cnt != delete_cnt):
error_info = f"删除和写入数量不一致:{insert_cnt}!={delete_cnt}"
success = False
elif mode == "file":
# 删除源数据,判断查询数量和删除数量
if not no_delete and (select_cnt != delete_cnt):
error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}"
success = False
elif mode == "purge":
# 直接删除。判断查询数量和删除数量
if select_cnt != delete_cnt:
error_info = f"查询和删除数量不一致:{select_cnt}!={delete_cnt}"
success = False

# 执行信息保存到数据库
if connection.connection and not connection.is_usable():
close_old_connections()
# 更新最后归档时间
ArchiveConfig(id=archive_id, last_archive_time=t.end).save(
update_fields=["last_archive_time"]
)
# 替换密码信息后保存
shell_cmd = " ".join(cmd_args)
ArchiveLog.objects.create(
archive=archive_info,
cmd=(
shell_cmd.replace(s_ins.password, "***").replace(d_ins.password, "***")
if mode == "dest"
else shell_cmd.replace(s_ins.password, "***")
),
condition=condition,
mode=mode,
no_delete=no_delete,
sleep=sleep,
select_cnt=select_cnt,
insert_cnt=insert_cnt,
delete_cnt=delete_cnt,
statistics=statistics,
success=success,
error_info=error_info,
start_time=t.start,
end_time=t.end,
)
if not success:
raise Exception(f"{error_info}\n{statistics}")
return src_db_name, src_table_name
return src_db_name, src_table_name,error_info
except Exception as e:
return src_db_name, src_table_name, error_info
return src_db_name, src_table_name, str(e)



@permission_required("sql.menu_archive", raise_exception=True)
Expand Down
8 changes: 6 additions & 2 deletions sql/binlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,5 +248,9 @@ def my2sql_file(args, user):
args["output-dir"] = path
cmd_args = my2sql.generate_args2cmd(args)
# 使用output-dir参数执行命令保存sql
my2sql.execute_cmd(cmd_args)
return user, path
try:
my2sql.execute_cmd(cmd_args)
return user, path
except Exception as e:
# 捕获所有异常并返回错误信息
return user, str(e)
15 changes: 6 additions & 9 deletions sql/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,15 +600,12 @@ def notify_for_archive(task):
:param task:
:return:
"""
if task.success:
result = My2SqlResult(
success=True,
src_db_name=task.result[0],
src_table_name=task.result[1],
error=task.result[2],
)
else:
result = My2SqlResult(success=False, error=task.result)
result = ArchiveResult(
success=task.success,
src_db_name=task.result[0],
src_table_name=task.result[1],
error=task.result[2],
)
# 发送
sys_config = SysConfig()
auto_notify(workflow=result, sys_config=sys_config, event_type=EventType.ARCHIVE)

0 comments on commit dcb6a42

Please sign in to comment.