Skip to content

Commit def6866

Browse files
authored
Merge pull request #208 from codeforjapan/feat/add-skip-logic
refactor: add `skip_topic_detect` and `skip_tweet_lookup` flags acros…
2 parents 047a118 + d529c22 commit def6866

File tree

6 files changed

+153
-81
lines changed

6 files changed

+153
-81
lines changed

etl/seed/settings.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
"filter": {
33
"languages": ["ja"],
44
"keywords": [],
5-
"start_millis": 1768834800000,
5+
"start_millis": 1762009200000,
66
"end_millis": 1771858740000
77
},
8-
"description": "Note filtering configuration. start_millis is required. end_millis is optional (null = no upper limit). Timestamps in milliseconds UTC. Current: 2025-01-20 00:00:00 JST ~ 2025-02-23 23:59:59 JST"
8+
"description": "Note filtering configuration. start_millis is required. end_millis is optional (null = no upper limit). Timestamps in milliseconds UTC. Current: 2025-11-01 00:00:00 JST ~ 2026-02-23 23:59:00 JST"
99
}

etl/src/birdxplorer_etl/extract_ecs.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ def extract_data(postgresql: Session):
173173

174174
# バッチ処理後にSQSキューイング(新規追加のみ)
175175
for note in rows_to_add.values():
176-
enqueue_notes(note.note_id, note.summary, note.tweet_id)
176+
enqueue_notes(note.note_id, note.summary, note.tweet_id, note.language)
177177

178178
rows_to_add = {}
179179
rows_to_update = []
@@ -185,7 +185,7 @@ def extract_data(postgresql: Session):
185185

186186
# 最後のバッチのSQSキューイング(新規追加のみ)
187187
for note in rows_to_add.values():
188-
enqueue_notes(note.note_id, note.summary, note.tweet_id)
188+
enqueue_notes(note.note_id, note.summary, note.tweet_id, note.language)
189189

190190
status_url = (
191191
f"https://ton.twimg.com/birdwatch-public-data/{dateString}/"
@@ -280,17 +280,18 @@ def extract_data(postgresql: Session):
280280
return
281281

282282

283-
def enqueue_notes(note_id: str, summary: str, post_id: str = None):
283+
def enqueue_notes(note_id: str, summary: str, post_id: str = None, language: str = None):
284284
"""
285285
ノート処理用のSQSキューにメッセージを送信
286286
lang-detect-queueに送信(summaryとpost_idも含める)
287287
"""
288288
sqs_client = boto3.client("sqs", region_name=os.environ.get("AWS_REGION", "ap-northeast-1"))
289289

290290
# lang-detect-queue用のメッセージ(summaryとpost_idを含める)
291-
lang_detect_message = json.dumps(
292-
{"note_id": note_id, "summary": summary, "post_id": post_id, "processing_type": "language_detect"}
293-
)
291+
message = {"note_id": note_id, "summary": summary, "post_id": post_id, "processing_type": "language_detect"}
292+
if language:
293+
message["language"] = language
294+
lang_detect_message = json.dumps(message)
294295

295296
# lang-detect-queueに送信
296297
try:
@@ -324,4 +325,3 @@ def enqueue_note_status_update(note_id: str):
324325
logging.info(f"Enqueued note {note_id} to note-status-update queue, messageId={response.get('MessageId')}")
325326
except Exception as e:
326327
logging.error(f"Failed to enqueue note {note_id} to note-status-update queue: {e}")
327-

etl/src/birdxplorer_etl/lib/lambda_handler/language_detect_lambda.py

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
from birdxplorer_etl import settings
66
from birdxplorer_etl.lib.ai_model.ai_model_interface import get_ai_service
7-
from birdxplorer_etl.lib.lambda_handler.common.retry_handler import call_ai_api_with_retry
7+
from birdxplorer_etl.lib.lambda_handler.common.retry_handler import (
8+
call_ai_api_with_retry,
9+
)
810
from birdxplorer_etl.lib.lambda_handler.common.sqs_handler import SQSHandler
911

1012
# Lambda用のロガー設定
@@ -37,6 +39,7 @@ def lambda_handler(event, context):
3739
try:
3840
note_id = None
3941
summary = None
42+
existing_language = None
4043

4144
# SQSイベントの場合
4245
if "Records" in event:
@@ -53,6 +56,7 @@ def lambda_handler(event, context):
5356
if processing_type == "language_detect":
5457
note_id = message_body.get("note_id")
5558
summary = message_body.get("summary")
59+
existing_language = message_body.get("language")
5660
logger.info(f"Found language_detect message for note_id: {note_id}")
5761
break
5862
else:
@@ -71,16 +75,20 @@ def lambda_handler(event, context):
7175
if note_id and summary:
7276
logger.info(f"[START] Detecting language for note: {note_id}")
7377

74-
ai_service = get_ai_service()
75-
76-
# 言語判定を実行(リトライ付き)
77-
logger.info(f"[PROCESSING] Calling AI service for language detection...")
78-
detected_language = call_ai_api_with_retry(
79-
ai_service.detect_language,
80-
summary,
81-
max_retries=3,
82-
initial_delay=1.0,
83-
)
78+
if existing_language:
79+
logger.info(f"[SKIP] Language already known for note {note_id}: {existing_language}")
80+
detected_language = existing_language
81+
else:
82+
ai_service = get_ai_service()
83+
84+
# 言語判定を実行(リトライ付き)
85+
logger.info(f"[PROCESSING] Calling AI service for language detection...")
86+
detected_language = call_ai_api_with_retry(
87+
ai_service.detect_language,
88+
summary,
89+
max_retries=3,
90+
initial_delay=1.0,
91+
)
8492

8593
logger.info(f"[SUCCESS] Language detected for note {note_id}: {detected_language}")
8694

etl/src/birdxplorer_etl/lib/lambda_handler/note_transform_lambda.py

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99

1010
from birdxplorer_common.storage import (
1111
NoteRecord,
12+
NoteTopicAssociation,
1213
RowNoteRecord,
1314
RowNoteStatusRecord,
15+
RowPostRecord,
1416
TopicRecord,
1517
)
1618
from birdxplorer_etl import settings
@@ -204,8 +206,31 @@ def process_single_message(message: dict, postgresql, sqs_handler, topics_cache:
204206
existing_note = postgresql.query(NoteRecord).filter(NoteRecord.note_id == note_id).first()
205207

206208
if existing_note:
207-
logger.info(f"Note already exists in notes table: {note_id}")
208-
return {"note_id": note_id, "status": "skipped", "message": "Note already exists"}
209+
logger.info(f"Note already exists in notes table: {note_id}, checking downstream status")
210+
211+
# note_topicテーブルでトピック設定済みかチェック
212+
has_topics = (
213+
postgresql.query(NoteTopicAssociation).filter(NoteTopicAssociation.note_id == note_id).first() is not None
214+
)
215+
216+
# row_postsテーブルで投稿取得済みかチェック
217+
post_id = existing_note.post_id
218+
has_post = False
219+
if post_id:
220+
has_post = postgresql.query(RowPostRecord).filter(RowPostRecord.post_id == post_id).first() is not None
221+
222+
logger.info(f"Note {note_id}: has_topics={has_topics}, has_post={has_post}")
223+
224+
return {
225+
"note_id": note_id,
226+
"status": "existing",
227+
"detected_language": str(existing_note.language) if existing_note.language else "",
228+
"summary": existing_note.summary,
229+
"post_id": str(post_id) if post_id else None,
230+
"created_at_millis": int(existing_note.created_at) if existing_note.created_at else None,
231+
"skip_topic_detect": has_topics,
232+
"skip_tweet_lookup": has_post,
233+
}
209234

210235
# 言語を取得(優先順位: メッセージ > DB)
211236
detected_language = message_language or note_row.language
@@ -233,8 +258,7 @@ def process_single_message(message: dict, postgresql, sqs_handler, topics_cache:
233258
)
234259
if sent_message_id:
235260
logger.info(
236-
f"Language not available for note {note_id}, "
237-
f"requeued with retry_count={retry_count + 1}, delay=30s"
261+
f"Language not available for note {note_id}, " f"requeued with retry_count={retry_count + 1}, delay=30s"
238262
)
239263
return {"note_id": note_id, "status": "requeued", "retry_count": retry_count + 1}
240264
else:
@@ -311,11 +335,7 @@ def lambda_handler(event, context):
311335
logger.error(f"Commit error: {e}")
312336
postgresql.rollback()
313337
# コミット失敗時は全メッセージを失敗扱い
314-
return {
315-
"batchItemFailures": [
316-
{"itemIdentifier": record.get("messageId")} for record in records
317-
]
318-
}
338+
return {"batchItemFailures": [{"itemIdentifier": record.get("messageId")} for record in records]}
319339

320340
# 成功したノートに対してtopic-detect-queueに送信
321341
settings_config = load_settings()
@@ -327,7 +347,7 @@ def lambda_handler(event, context):
327347

328348
topic_detect_queued = 0
329349
for result in results:
330-
if result.get("status") != "success":
350+
if result.get("status") not in ("success", "existing"):
331351
continue
332352

333353
note_id = result["note_id"]
@@ -356,6 +376,8 @@ def lambda_handler(event, context):
356376
"post_id": post_id,
357377
"topics": lambda_handler._topics_cache,
358378
"processing_type": "topic_detect",
379+
"skip_topic_detect": result.get("skip_topic_detect", False),
380+
"skip_tweet_lookup": result.get("skip_tweet_lookup", False),
359381
}
360382

361383
if sqs_handler.send_message(queue_url=settings.TOPIC_DETECT_QUEUE_URL, message_body=topic_detect_message):
@@ -376,9 +398,7 @@ def lambda_handler(event, context):
376398
logger.error(f"Lambda execution error: {str(e)}")
377399
# 全体エラー時は全メッセージを失敗扱い
378400
return {
379-
"batchItemFailures": [
380-
{"itemIdentifier": record.get("messageId")} for record in event.get("Records", [])
381-
]
401+
"batchItemFailures": [{"itemIdentifier": record.get("messageId")} for record in event.get("Records", [])]
382402
}
383403
finally:
384404
postgresql.close()

etl/src/birdxplorer_etl/lib/lambda_handler/postlookup_lambda.py

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,16 @@ def lambda_handler(event: dict, context: Any) -> dict:
150150
try:
151151
tweet_id = None
152152

153+
skip_tweet_lookup = False
154+
153155
# SQSイベントの場合
154156
if "Records" in event:
155157
for record in event["Records"]:
156158
try:
157159
message_body = json.loads(record["body"])
158160
if message_body.get("processing_type") == "tweet_lookup":
159161
tweet_id = message_body.get("tweet_id")
162+
skip_tweet_lookup = message_body.get("skip_tweet_lookup", False)
160163
break
161164
except json.JSONDecodeError as e:
162165
logger.error(f"Failed to parse SQS message body: {e}")
@@ -166,6 +169,24 @@ def lambda_handler(event: dict, context: Any) -> dict:
166169
elif "tweet_id" in event:
167170
tweet_id = event["tweet_id"]
168171

172+
if tweet_id and skip_tweet_lookup:
173+
logger.info(f"[SKIP] Post already fetched for tweet: {tweet_id}")
174+
# POST_TRANSFORM_QUEUEへの転送のみ実行
175+
post_transform_queue_url = os.environ.get("POST_TRANSFORM_QUEUE_URL")
176+
if post_transform_queue_url:
177+
transform_message = {
178+
"operation": "transform_post",
179+
"post_id": tweet_id,
180+
"retry_count": 0,
181+
}
182+
sqs_handler.send_message(
183+
queue_url=post_transform_queue_url,
184+
message_body=transform_message,
185+
delay_seconds=5,
186+
)
187+
logger.info(f"[SQS_SUCCESS] Sent transform request for skipped tweet {tweet_id}")
188+
return {"statusCode": 200, "body": json.dumps({"skipped": True, "tweet_id": tweet_id})}
189+
169190
if tweet_id:
170191
logger.info(f"Looking up tweet: {tweet_id}")
171192

@@ -217,11 +238,13 @@ def lambda_handler(event: dict, context: Any) -> dict:
217238

218239
return {
219240
"statusCode": 200,
220-
"body": json.dumps({
221-
"rate_limited": True,
222-
"tweet_id": tweet_id,
223-
"delay_seconds": delay_seconds,
224-
}),
241+
"body": json.dumps(
242+
{
243+
"rate_limited": True,
244+
"tweet_id": tweet_id,
245+
"delay_seconds": delay_seconds,
246+
}
247+
),
225248
}
226249

227250
# 削除/非公開/その他エラーの場合はスキップ
@@ -236,12 +259,14 @@ def lambda_handler(event: dict, context: Any) -> dict:
236259
# 正常終了として返す(DLQに送らない)
237260
return {
238261
"statusCode": 200,
239-
"body": json.dumps({
240-
"skipped": True,
241-
"tweet_id": tweet_id,
242-
"reason": status,
243-
"detail": detail,
244-
}),
262+
"body": json.dumps(
263+
{
264+
"skipped": True,
265+
"tweet_id": tweet_id,
266+
"reason": status,
267+
"detail": detail,
268+
}
269+
),
245270
}
246271

247272
# dataがない場合は予期しないエラー

0 commit comments

Comments
 (0)