Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 29 additions & 19 deletions etl/src/birdxplorer_etl/extract_ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,7 @@ def _validate_rating_row(row: dict, existing_row_note_ids: set) -> bool:
row[field] = "0"
elif value not in ["0", "1"]:
logging.warning(
f"Unexpected value '{value}' for field '{field}' in rating "
f"(note_id={note_id}). Setting to '0'."
f"Unexpected value '{value}' for field '{field}' in rating " f"(note_id={note_id}). Setting to '0'."
)
row[field] = "0"

Expand Down Expand Up @@ -715,12 +714,15 @@ def extract_ratings(postgresql: Session, dateString: str, existing_row_note_ids:
staging_count = total_loaded - dedup_deleted
# 最低行数: 現在テーブルの推定行数の50%(COUNT(*)はタイムアウトするのでreltuples使用)
# reltuples はANALYZE未実行時に-1を返すため、その場合はstaging_countの50%をフォールバックとして使用
current_count = postgresql.execute(
text(
"SELECT reltuples::bigint FROM pg_class "
"WHERE relname='row_note_ratings' AND relnamespace = current_schema()::regnamespace"
)
).scalar() or 0
current_count = (
postgresql.execute(
text(
"SELECT reltuples::bigint FROM pg_class "
"WHERE relname='row_note_ratings' AND relnamespace = current_schema()::regnamespace"
)
).scalar()
or 0
)
if current_count <= 0:
current_count = staging_count
min_rows = max(int(current_count * 0.5), 1)
Expand Down Expand Up @@ -826,19 +828,15 @@ def _create_staging_table(postgresql: Session) -> None:
postgresql.execute(text(f"DROP TABLE IF EXISTS {_OLD_TABLE}"))
# INCLUDING ALL でNOT NULL等の制約をコピーし、PKとインデックスだけ除外
postgresql.execute(
text(
f"CREATE UNLOGGED TABLE {_STAGING_TABLE} "
f"(LIKE row_note_ratings INCLUDING ALL EXCLUDING INDEXES)"
)
text(f"CREATE UNLOGGED TABLE {_STAGING_TABLE} " f"(LIKE row_note_ratings INCLUDING ALL EXCLUDING INDEXES)")
)
postgresql.commit()
logging.info("Created staging table for ratings bulk load")


def _deduplicate_staging_table(postgresql: Session) -> int:
"""staging table内の重複PKを除去し、created_at_millisが最新の行を残す。"""
result = postgresql.execute(
text(f"""
result = postgresql.execute(text(f"""
DELETE FROM {_STAGING_TABLE} a USING (
SELECT ctid, ROW_NUMBER() OVER (
PARTITION BY note_id, rater_participant_id
Expand All @@ -847,8 +845,7 @@ def _deduplicate_staging_table(postgresql: Session) -> int:
FROM {_STAGING_TABLE}
) b
WHERE a.ctid = b.ctid AND b.rn > 1
""")
)
"""))
deleted = result.rowcount
postgresql.commit()
logging.info(f"Deduplicated staging table: removed {deleted} duplicate rows")
Expand All @@ -866,6 +863,19 @@ def _swap_ratings_table(postgresql: Session, min_rows: int, staging_count: int)
logging.info(f"Staging table row count: {staging_count} (minimum: {min_rows})")

# PK構築(シーケンシャルビルド — ランダムI/Oなし)
# 過去のswapでPKリネームが失敗した場合、同名の制約が本番テーブルに残っている可能性があるため
# 事前にインデックスの存在をチェックし、存在すればリネームして名前衝突を回避する
existing_owner = postgresql.execute(
text("SELECT tablename FROM pg_indexes " f"WHERE indexname = '{_STAGING_TABLE}_pkey'")
).scalar()
if existing_owner and existing_owner != _STAGING_TABLE:
logging.warning(
f"PK index '{_STAGING_TABLE}_pkey' already exists on table '{existing_owner}', "
"renaming to avoid conflict"
)
postgresql.execute(text(f'ALTER INDEX "{_STAGING_TABLE}_pkey" RENAME TO "{_STAGING_TABLE}_pkey_old"'))
postgresql.commit()

pk_start = time.time()
postgresql.execute(
text(
Expand All @@ -892,9 +902,7 @@ def _swap_ratings_table(postgresql: Session, min_rows: int, staging_count: int)
# 旧テーブルのPKインデックスをリネーム(名前衝突回避)
old_pk_name = postgresql.execute(
text(
"SELECT indexname FROM pg_indexes "
f"WHERE tablename = '{_OLD_TABLE}' "
"AND indexdef LIKE '%PRIMARY KEY%'"
"SELECT indexname FROM pg_indexes " f"WHERE tablename = '{_OLD_TABLE}' " "AND indexdef LIKE '%PRIMARY KEY%'"
)
).scalar()
if old_pk_name and old_pk_name != f"{_OLD_TABLE}_pkey":
Expand Down Expand Up @@ -923,6 +931,8 @@ def _swap_ratings_table(postgresql: Session, min_rows: int, staging_count: int)
def _cleanup_staging_table(postgresql: Session) -> None:
"""障害時のクリーンアップ: staging/old tableの残骸を削除。"""
try:
# abortedトランザクション状態の場合に備えてrollbackしてからDROP
postgresql.rollback()
postgresql.execute(text(f"DROP TABLE IF EXISTS {_STAGING_TABLE}"))
postgresql.execute(text(f"DROP TABLE IF EXISTS {_OLD_TABLE}"))
postgresql.commit()
Expand Down
9 changes: 6 additions & 3 deletions etl/tests/test_extract_ratings_swap.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,9 @@ def test_aborts_when_below_min_rows(self) -> None:

def test_succeeds_when_above_min_rows(self) -> None:
mock_session = MagicMock()
# pg_indexesクエリ: scalar()は旧PK名、新PK名の順に呼ばれる
# scalar()呼び出し順: PK衝突チェック(None=衝突なし), 旧PK名, 新PK名
mock_session.execute.return_value.scalar.side_effect = [
None, # PK衝突チェック: 同名インデックスは存在しない
"row_note_ratings_pkey", # 旧テーブルのPK名
"row_note_ratings_new_pkey", # 新テーブルのPK名
]
Expand All @@ -190,8 +191,9 @@ def test_succeeds_when_above_min_rows(self) -> None:

def test_swap_sql_sequence(self) -> None:
mock_session = MagicMock()
# pg_indexesクエリ: scalar()は旧PK名、新PK名の順に呼ばれる
# scalar()呼び出し順: PK衝突チェック(None=衝突なし), 旧PK名, 新PK名
mock_session.execute.return_value.scalar.side_effect = [
None, # PK衝突チェック: 同名インデックスは存在しない
"row_note_ratings_pkey", # 旧テーブルのPK名
"row_note_ratings_new_pkey", # 新テーブルのPK名
]
Expand Down Expand Up @@ -226,7 +228,8 @@ def test_catches_exception_and_rolls_back(self) -> None:

# 例外を投げずに正常終了する
_cleanup_staging_table(mock_session)
mock_session.rollback.assert_called_once()
# rollback()は2回呼ばれる: 1回目はabortedトランザクション解消用、2回目はexceptブロック内
assert mock_session.rollback.call_count == 2


class TestProcessRatingRows:
Expand Down