Skip to content

Commit 6605804

Browse files
authored
refactor: handle the failed task message (#600)
* refactor: handle the failed task message * chore: release petercat-utils/0.1.41
1 parent 04e6ea7 commit 6605804

File tree

7 files changed

+91
-70
lines changed

7 files changed

+91
-70
lines changed

petercat_utils/rag_helper/git_doc_task.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,15 @@ def __init__(
6868
status=TaskStatus.NOT_STARTED,
6969
from_id=None,
7070
id=None,
71+
retry_count=0,
7172
):
7273
super().__init__(
7374
type=TaskType.GIT_DOC,
7475
from_id=from_id,
7576
id=id,
7677
status=status,
7778
repo_name=repo_name,
79+
retry_count=retry_count,
7880
)
7981
self.commit_id = commit_id
8082
self.node_type = GitDocTaskNodeType(node_type)

petercat_utils/rag_helper/git_issue_task.py

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ def add_rag_git_issue_task(config: RAGGitIssueConfig):
1111
g.get_repo(config.repo_name)
1212

1313
issue_task = GitIssueTask(
14-
issue_id='',
14+
issue_id="",
1515
node_type=GitIssueTaskNodeType.REPO,
1616
bot_id=config.bot_id,
17-
repo_name=config.repo_name
17+
repo_name=config.repo_name,
1818
)
1919
res = issue_task.save()
2020
issue_task.send()
@@ -26,17 +26,26 @@ class GitIssueTask(GitTask):
2626
issue_id: str
2727
node_type: GitIssueTaskNodeType
2828

29-
def __init__(self,
30-
issue_id,
31-
node_type: GitIssueTaskNodeType,
32-
bot_id,
33-
repo_name,
34-
status=TaskStatus.NOT_STARTED,
35-
from_id=None,
36-
id=None
37-
):
38-
super().__init__(bot_id=bot_id, type=TaskType.GIT_ISSUE, from_id=from_id, id=id, status=status,
39-
repo_name=repo_name)
29+
def __init__(
30+
self,
31+
issue_id,
32+
node_type: GitIssueTaskNodeType,
33+
bot_id,
34+
repo_name,
35+
status=TaskStatus.NOT_STARTED,
36+
from_id=None,
37+
id=None,
38+
retry_count=0,
39+
):
40+
super().__init__(
41+
bot_id=bot_id,
42+
type=TaskType.GIT_ISSUE,
43+
from_id=from_id,
44+
id=id,
45+
status=status,
46+
repo_name=repo_name,
47+
retry_count=retry_count,
48+
)
4049
self.issue_id = issue_id
4150
self.node_type = GitIssueTaskNodeType(node_type)
4251

@@ -75,27 +84,28 @@ def handle_repo_node(self):
7584
if len(task_list) > 0:
7685
result = self.get_table().insert(task_list).execute()
7786
for record in result.data:
78-
issue_task = GitIssueTask(id=record["id"],
79-
issue_id=record["issue_id"],
80-
repo_name=record["repo_name"],
81-
node_type=record["node_type"],
82-
bot_id=record["bot_id"],
83-
status=record["status"],
84-
from_id=record["from_task_id"]
85-
)
87+
issue_task = GitIssueTask(
88+
id=record["id"],
89+
issue_id=record["issue_id"],
90+
repo_name=record["repo_name"],
91+
node_type=record["node_type"],
92+
bot_id=record["bot_id"],
93+
status=record["status"],
94+
from_id=record["from_task_id"],
95+
)
8696
issue_task.send()
8797

88-
return (self.get_table().update(
89-
{"status": TaskStatus.COMPLETED.value})
90-
.eq("id", self.id)
91-
.execute())
98+
return (
99+
self.get_table()
100+
.update({"status": TaskStatus.COMPLETED.value})
101+
.eq("id", self.id)
102+
.execute()
103+
)
92104

93105
def handle_issue_node(self):
94106
issue_retrieval.add_knowledge_by_issue(
95107
RAGGitIssueConfig(
96-
repo_name=self.repo_name,
97-
bot_id=self.bot_id,
98-
issue_id=self.issue_id
108+
repo_name=self.repo_name, bot_id=self.bot_id, issue_id=self.issue_id
99109
)
100110
)
101111
return self.update_status(TaskStatus.COMPLETED)

petercat_utils/rag_helper/git_task.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ def __init__(
2424
status=TaskStatus.NOT_STARTED,
2525
from_id=None,
2626
id=None,
27+
retry_count=0,
2728
):
2829
self.type = type
2930
self.id = id
3031
self.from_id = from_id
3132
self.status = status
3233
self.repo_name = repo_name
34+
self.retry_count = retry_count
3335

3436
@staticmethod
3537
def get_table_name(type: TaskType):
@@ -82,11 +84,17 @@ def send(self):
8284
QueueUrl=SQS_QUEUE_URL,
8385
DelaySeconds=10,
8486
MessageBody=(
85-
json.dumps({"task_id": self.id, "task_type": self.type.value})
87+
json.dumps(
88+
{
89+
"task_id": self.id,
90+
"task_type": self.type.value,
91+
"retry_count": self.retry_count,
92+
}
93+
)
8694
),
8795
)
8896
message_id = response["MessageId"]
8997
print(
90-
f"task_id={self.id}, task_type={self.type.value}, message_id={message_id}"
98+
f"task_id={self.id}, task_type={self.type.value}, message_id={message_id}, retry_count={self.retry_count}"
9199
)
92100
return message_id

petercat_utils/rag_helper/task.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,6 @@
2323
SQS_QUEUE_URL = get_env_variable("SQS_QUEUE_URL")
2424

2525

26-
def send_task_message(task_id: str):
27-
response = sqs.send_message(
28-
QueueUrl=SQS_QUEUE_URL,
29-
DelaySeconds=10,
30-
MessageBody=(json.dumps({"task_id": task_id})),
31-
)
32-
return response["MessageId"]
33-
34-
3526
def get_oldest_task():
3627
supabase = get_client()
3728

@@ -54,10 +45,7 @@ def get_task_by_id(task_id):
5445
return response.data[0] if (len(response.data) > 0) else None
5546

5647

57-
def get_task(
58-
task_type: TaskType,
59-
task_id: str,
60-
) -> GitTask:
48+
def get_task(task_type: TaskType, task_id: str, retry_count=0) -> GitTask:
6149
supabase = get_client()
6250
response = (
6351
supabase.table(GitTask.get_table_name(task_type))
@@ -77,6 +65,7 @@ def get_task(
7765
path=data["path"],
7866
status=data["status"],
7967
from_id=data["from_task_id"],
68+
retry_count=retry_count,
8069
)
8170
if task_type == TaskType.GIT_ISSUE:
8271
return GitIssueTask(
@@ -87,11 +76,12 @@ def get_task(
8776
bot_id=data["bot_id"],
8877
status=data["status"],
8978
from_id=data["from_task_id"],
79+
retry_count=retry_count,
9080
)
9181

9282

93-
def trigger_task(task_type: TaskType, task_id: Optional[str]):
94-
task = get_task(task_type, task_id) if task_id else get_oldest_task()
83+
def trigger_task(task_type: TaskType, task_id: Optional[str], retry_count: int = 0):
84+
task = get_task(task_type, task_id, retry_count) if task_id else get_oldest_task()
9585
if task is None:
9686
return task
9787
return task.handle()

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "petercat_utils"
3-
version = "0.1.40"
3+
version = "0.1.41"
44
description = ""
55
authors = ["raoha.rh <[email protected]>"]
66
readme = "README.md"

server/aws/service.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,32 @@
1414
STATIC_SECRET_NAME = get_env_variable("STATIC_SECRET_NAME")
1515
STATIC_KEYPAIR_ID = get_env_variable("STATIC_KEYPAIR_ID")
1616

17+
1718
def rsa_signer(message):
1819
private_key_str = get_private_key(STATIC_SECRET_NAME)
19-
private_key = rsa.PrivateKey.load_pkcs1(private_key_str.encode('utf-8'))
20-
return rsa.sign(message, private_key, 'SHA-1')
20+
private_key = rsa.PrivateKey.load_pkcs1(private_key_str.encode("utf-8"))
21+
return rsa.sign(message, private_key, "SHA-1")
22+
2123

2224
def create_signed_url(url, expire_minutes=60) -> str:
2325
cloudfront_signer = CloudFrontSigner(STATIC_KEYPAIR_ID, rsa_signer)
24-
26+
2527
# 设置过期时间
2628
expire_date = datetime.now() + timedelta(minutes=expire_minutes)
27-
29+
2830
# 创建签名 URL
2931
signed_url = cloudfront_signer.generate_presigned_url(
30-
url=url,
31-
date_less_than=expire_date
32+
url=url, date_less_than=expire_date
3233
)
33-
34+
3435
return signed_url
3536

37+
3638
def upload_image_to_s3(file, metadata: ImageMetaData, s3_client):
3739
try:
3840
file_content = file.file.read()
3941
md5_hash = hashlib.md5()
40-
md5_hash.update(file.filename.encode('utf-8'))
42+
md5_hash.update(file.filename.encode("utf-8"))
4143
s3_key = md5_hash.hexdigest()
4244
encoded_filename = (
4345
base64.b64encode(metadata.title.encode("utf-8")).decode("utf-8")
@@ -62,11 +64,12 @@ def upload_image_to_s3(file, metadata: ImageMetaData, s3_client):
6264
ContentType=file.content_type,
6365
Metadata=custom_metadata,
6466
)
65-
# you need to redirect your static domain to your s3 bucket domain
6667
s3_url = f"{STATIC_URL}/{s3_key}"
67-
signed_url = create_signed_url(url=s3_url, expire_minutes=60) \
68-
if (STATIC_SECRET_NAME and STATIC_KEYPAIR_ID) \
69-
else s3_url
70-
return {"message": "File uploaded successfully", "url": signed_url }
68+
signed_url = (
69+
create_signed_url(url=s3_url, expire_minutes=60)
70+
if (STATIC_SECRET_NAME and STATIC_KEYPAIR_ID)
71+
else s3_url
72+
)
73+
return {"message": "File uploaded successfully", "url": signed_url}
7174
except Exception as e:
7275
raise UploadError(detail=str(e))

subscriber/handler.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,38 @@
33
from petercat_utils import task as task_helper
44
from petercat_utils.data_class import TaskType
55

6+
MAX_RETRY_COUNT = 5
7+
68

79
def lambda_handler(event, context):
810
if event:
911
batch_item_failures = []
1012
sqs_batch_response = {}
1113

1214
for record in event["Records"]:
13-
try:
14-
body = record["body"]
15-
print(f"receive message here: {body}")
15+
body = record["body"]
16+
print(f"receive message here: {body}")
1617

17-
message_dict = json.loads(body)
18-
task_id = message_dict["task_id"]
19-
task_type = message_dict["task_type"]
20-
task = task_helper.get_task(TaskType(task_type), task_id)
18+
message_dict = json.loads(body)
19+
task_id = message_dict["task_id"]
20+
task_type = message_dict["task_type"]
21+
retry_count = message_dict["retry_count"]
22+
task = task_helper.get_task(TaskType(task_type), task_id)
23+
try:
2124
if task is None:
2225
return task
2326
task.handle()
24-
2527
# process message
26-
print(f"message content: message={message_dict}, task_id={task_id}, task={task}")
28+
print(
29+
f"message content: message={message_dict}, task_id={task_id}, task={task}, retry_count={retry_count}"
30+
)
2731
except Exception as e:
28-
print(f"message handle error: ${e}")
29-
batch_item_failures.append({"itemIdentifier": record['messageId']})
32+
if retry_count < MAX_RETRY_COUNT:
33+
retry_count += 1
34+
task_helper.trigger_task(task_type, task_id, retry_count)
35+
else:
36+
print(f"message handle error: ${e}")
37+
batch_item_failures.append({"itemIdentifier": record["messageId"]})
3038

3139
sqs_batch_response["batchItemFailures"] = batch_item_failures
3240
return sqs_batch_response

0 commit comments

Comments
 (0)