-
Notifications
You must be signed in to change notification settings - Fork 29
Open
Description
async with AsyncChromeDaemon(host="127.0.0.1", headless=False, debug=False, port = self.port,
extra_config=[f'--user-data-dir={self.infos["path"]}', '--disable-features=PrivacySandboxSettings4'],
) as cd:
async with cd.connect_tab(index=0, auto_close=True) as tab:
async with tab.iter_fetch(fixed_request_patterns) as fetch:
url = 'https://www.tiktok.com/@changhongofficial'
export_task = asyncio.create_task(self.operate(tab,boke))
capture_task = asyncio.create_task(self.capture(tab,fetch, fixed_request_patterns)) # This is the method for packet capture
await tab.goto(url)
await self.getPid(cd)
await asyncio.gather(export_task, capture_task)
This is the method for packet capture
async def capture(self,tab,fetch,fixed_request_patterns):
try:
async for event in fetch: # There is a risk of packet loss。
if fetch.match_event(event, fixed_request_patterns[0]):
response = await fetch.get_response(event, timeout=5)
itemDate = json.loads(response["data"])
currentUrl = response['params']['response']['url']
self.infoData = await self.parse_chunk(itemDate)
elif fetch.match_event(event, fixed_request_patterns[1]):
response = await fetch.get_response(event, timeout=5)
self.commentUrl = response['params']['response']['url']
itemDate = json.loads(response["data"])
self.commentData = await self.parse_comments(itemDate)
except asyncio.TimeoutError:
pass
async def sendComment(self):
pass
Metadata
Metadata
Assignees
Labels
No labels