11import csv
22import logging
3- from datetime import datetime , timedelta
3+ from datetime import datetime , timedelta , timezone
44import requests
55import stringcase
6+ import zipfile
7+ import io
68from sqlalchemy .orm import Session
9+ import sqlalchemy
710from lib .x .postlookup import lookup
811from birdxplorer_common .storage import (
912 RowNoteRecord ,
1215 RowUserRecord ,
1316 RowNoteStatusRecord ,
1417 RowPostEmbedURLRecord ,
18+ RowNoteRatingRecord ,
1519)
1620import settings
21+ from constants import TARGET_KEYWORDS
1722
1823
1924def extract_data (sqlite : Session , postgresql : Session ):
@@ -36,7 +41,7 @@ def extract_data(sqlite: Session, postgresql: Session):
3641 break
3742
3843 dateString = date .strftime ("%Y/%m/%d" )
39- note_url = f"https://ton.twimg.com/birdwatch-public-data/{ dateString } /notes/notes-00000.tsv "
44+ note_url = f"https://ton.twimg.com/birdwatch-public-data/{ dateString } /notes/notes-00000.zip "
4045 if settings .USE_DUMMY_DATA :
4146 note_url = (
4247 "https://raw.githubusercontent.com/codeforjapan/BirdXplorer/refs/heads/main/etl/data/notes_sample.tsv"
@@ -46,48 +51,125 @@ def extract_data(sqlite: Session, postgresql: Session):
4651 res = requests .get (note_url )
4752
4853 if res .status_code == 200 :
49- # res.contentをsqliteのNoteテーブル
50- tsv_data = res .content .decode ("utf-8" ).splitlines ()
51- reader = csv .DictReader (tsv_data , delimiter = "\t " )
52- reader .fieldnames = [stringcase .snakecase (field ) for field in reader .fieldnames ]
53-
54- rows_to_add = []
55- for index , row in enumerate (reader ):
56- if sqlite .query (RowNoteRecord ).filter (RowNoteRecord .note_id == row ["note_id" ]).first ():
57- continue
58- rows_to_add .append (RowNoteRecord (** row ))
59- if index % 1000 == 0 :
60- sqlite .bulk_save_objects (rows_to_add )
61- rows_to_add = []
62- sqlite .bulk_save_objects (rows_to_add )
63-
64- status_url = f"https://ton.twimg.com/birdwatch-public-data/{ dateString } /noteStatusHistory/noteStatusHistory-00000.tsv"
6554 if settings .USE_DUMMY_DATA :
66- status_url = "https://raw.githubusercontent.com/codeforjapan/BirdXplorer/refs/heads/main/etl/data/noteStatus_sample.tsv"
67-
68- logging .info (status_url )
69- res = requests .get (status_url )
70-
71- if res .status_code == 200 :
55+ # Handle dummy data as TSV
7256 tsv_data = res .content .decode ("utf-8" ).splitlines ()
7357 reader = csv .DictReader (tsv_data , delimiter = "\t " )
7458 reader .fieldnames = [stringcase .snakecase (field ) for field in reader .fieldnames ]
7559
7660 rows_to_add = []
7761 for index , row in enumerate (reader ):
78- for key , value in list (row .items ()):
79- if value == "" :
80- row [key ] = None
81- status = (
82- sqlite .query (RowNoteStatusRecord ).filter (RowNoteStatusRecord .note_id == row ["note_id" ]).first ()
83- )
84- if status is None or status .created_at_millis > int (datetime .now ().timestamp () * 1000 ):
85- sqlite .query (RowNoteStatusRecord ).filter (RowNoteStatusRecord .note_id == row ["note_id" ]).delete ()
86- rows_to_add .append (RowNoteStatusRecord (** row ))
62+ if sqlite .query (RowNoteRecord ).filter (RowNoteRecord .note_id == row ["note_id" ]).first ():
63+ continue
64+ rows_to_add .append (RowNoteRecord (** row ))
8765 if index % 1000 == 0 :
8866 sqlite .bulk_save_objects (rows_to_add )
8967 rows_to_add = []
9068 sqlite .bulk_save_objects (rows_to_add )
69+ else :
70+ # Handle real data as zip file
71+ try :
72+ with zipfile .ZipFile (io .BytesIO (res .content )) as zip_file :
73+ file_names = zip_file .namelist ()
74+ if file_names :
75+ tsv_file_name = file_names [0 ]
76+ with zip_file .open (tsv_file_name ) as tsv_file :
77+ tsv_data = tsv_file .read ().decode ("utf-8" ).splitlines ()
78+ reader = csv .DictReader (tsv_data , delimiter = "\t " )
79+ reader .fieldnames = [stringcase .snakecase (field ) for field in reader .fieldnames ]
80+
81+ rows_to_add = []
82+ for index , row in enumerate (reader ):
83+ if (
84+ sqlite .query (RowNoteRecord )
85+ .filter (RowNoteRecord .note_id == row ["note_id" ])
86+ .first ()
87+ ):
88+ continue
89+ rows_to_add .append (RowNoteRecord (** row ))
90+ if index % 1000 == 0 :
91+ sqlite .bulk_save_objects (rows_to_add )
92+ rows_to_add = []
93+ sqlite .bulk_save_objects (rows_to_add )
94+ except zipfile .BadZipFile :
95+ logging .error (f"Invalid zip file from { note_url } " )
96+ continue
97+ except Exception as e :
98+ logging .error (f"Error processing note data from { note_url } : { e } " )
99+ continue
100+
101+ status_url = f"https://ton.twimg.com/birdwatch-public-data/{ dateString } /noteStatusHistory/noteStatusHistory-00000.zip"
102+ if settings .USE_DUMMY_DATA :
103+ status_url = "https://raw.githubusercontent.com/codeforjapan/BirdXplorer/refs/heads/main/etl/data/noteStatus_sample.tsv"
104+
105+ logging .info (status_url )
106+ res = requests .get (status_url )
107+
108+ if res .status_code == 200 :
109+ if settings .USE_DUMMY_DATA :
110+ # Handle dummy data as TSV
111+ tsv_data = res .content .decode ("utf-8" ).splitlines ()
112+ reader = csv .DictReader (tsv_data , delimiter = "\t " )
113+ reader .fieldnames = [stringcase .snakecase (field ) for field in reader .fieldnames ]
114+
115+ rows_to_add = []
116+ for index , row in enumerate (reader ):
117+ for key , value in list (row .items ()):
118+ if value == "" :
119+ row [key ] = None
120+ status = (
121+ sqlite .query (RowNoteStatusRecord )
122+ .filter (RowNoteStatusRecord .note_id == row ["note_id" ])
123+ .first ()
124+ )
125+ if status is None or status .created_at_millis > int (datetime .now ().timestamp () * 1000 ):
126+ sqlite .query (RowNoteStatusRecord ).filter (
127+ RowNoteStatusRecord .note_id == row ["note_id" ]
128+ ).delete ()
129+ rows_to_add .append (RowNoteStatusRecord (** row ))
130+ if index % 1000 == 0 :
131+ sqlite .bulk_save_objects (rows_to_add )
132+ rows_to_add = []
133+ sqlite .bulk_save_objects (rows_to_add )
134+ else :
135+ # Handle real data as zip file
136+ try :
137+ with zipfile .ZipFile (io .BytesIO (res .content )) as zip_file :
138+ file_names = zip_file .namelist ()
139+ if file_names :
140+ tsv_file_name = file_names [0 ]
141+ with zip_file .open (tsv_file_name ) as tsv_file :
142+ tsv_data = tsv_file .read ().decode ("utf-8" ).splitlines ()
143+ reader = csv .DictReader (tsv_data , delimiter = "\t " )
144+ reader .fieldnames = [stringcase .snakecase (field ) for field in reader .fieldnames ]
145+
146+ rows_to_add = []
147+ for index , row in enumerate (reader ):
148+ for key , value in list (row .items ()):
149+ if value == "" :
150+ row [key ] = None
151+ status = (
152+ sqlite .query (RowNoteStatusRecord )
153+ .filter (RowNoteStatusRecord .note_id == row ["note_id" ])
154+ .first ()
155+ )
156+ if status is None or status .created_at_millis > int (
157+ datetime .now ().timestamp () * 1000
158+ ):
159+ sqlite .query (RowNoteStatusRecord ).filter (
160+ RowNoteStatusRecord .note_id == row ["note_id" ]
161+ ).delete ()
162+ rows_to_add .append (RowNoteStatusRecord (** row ))
163+ if index % 1000 == 0 :
164+ sqlite .bulk_save_objects (rows_to_add )
165+ rows_to_add = []
166+ sqlite .bulk_save_objects (rows_to_add )
167+ except zipfile .BadZipFile :
168+ logging .error (f"Invalid zip file from { status_url } " )
169+ continue
170+ except Exception as e :
171+ logging .error (f"Error processing note status data from { status_url } : { e } " )
172+ continue
91173
92174 break
93175
@@ -96,11 +178,20 @@ def extract_data(sqlite: Session, postgresql: Session):
96178 sqlite .commit ()
97179
98180 # Noteに紐づくtweetデータを取得
181+ # Build keyword filter conditions using shared TARGET_KEYWORDS
182+ keyword_conditions = []
183+ for keyword in TARGET_KEYWORDS :
184+ keyword_conditions .append (RowNoteRecord .summary .ilike (f"%{ keyword } %" ))
185+
99186 postExtract_targetNotes = (
100187 sqlite .query (RowNoteRecord )
101188 .filter (RowNoteRecord .tweet_id != None )
102189 .filter (RowNoteRecord .created_at_millis >= settings .TARGET_TWITTER_POST_START_UNIX_MILLISECOND )
103190 .filter (RowNoteRecord .created_at_millis <= settings .TARGET_TWITTER_POST_END_UNIX_MILLISECOND )
191+ .filter (
192+ # Use OR condition to match any of the keywords
193+ sqlalchemy .or_ (* keyword_conditions )
194+ )
104195 .all ()
105196 )
106197 logging .info (f"Target notes: { len (postExtract_targetNotes )} " )
@@ -119,8 +210,9 @@ def extract_data(sqlite: Session, postgresql: Session):
119210 if post == None or "data" not in post :
120211 continue
121212
122- created_at = datetime .strptime (post ["data" ]["created_at" ], "%Y-%m-%dT%H:%M:%S.%fZ" )
213+ created_at = datetime .strptime (post ["data" ]["created_at" ], "%Y-%m-%dT%H:%M:%S.%fZ" ). replace ( tzinfo = timezone . utc )
123214 created_at_millis = int (created_at .timestamp () * 1000 )
215+ now_millis = int (datetime .now (timezone .utc ).timestamp () * 1000 )
124216
125217 is_userExist = (
126218 postgresql .query (RowUserRecord ).filter (RowUserRecord .user_id == post ["data" ]["author_id" ]).first ()
@@ -166,6 +258,7 @@ def extract_data(sqlite: Session, postgresql: Session):
166258 quote_count = post ["data" ]["public_metrics" ]["quote_count" ],
167259 reply_count = post ["data" ]["public_metrics" ]["reply_count" ],
168260 lang = post ["data" ]["lang" ],
261+ extracted_at = now_millis ,
169262 )
170263 postgresql .add (row_post )
171264
0 commit comments