-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathfunctions.py
99 lines (86 loc) · 3.03 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import os
import requests
from urllib.error import HTTPError
from shutil import copyfileobj
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def get_video_url(entity, _id, medium):
data_dict = {'tweet_id': _id, 'media_url': '', 'bitrate': -1, 'type': '', 'medium': medium}
for variant in entity['video_info']['variants']:
if 'bitrate' in variant:
if variant['bitrate'] > data_dict['bitrate']:
data_dict['media_url'] = variant['url']
data_dict['bitrate'] = variant['bitrate']
data_dict['type'] = variant['content_type'][-3:]
return data_dict
def get_photo_url(entity, _id, medium):
data_dict = {'tweet_id': _id, 'media_url': '', 'type': '', 'medium': medium}
data_dict['media_url'] = entity['media_url']
data_dict['type'] = entity['media_url'][-3:]
return data_dict
def get_entities(data, _id):
if 'extended_entities' in data:
target = data['extended_entities']
elif 'entities' in data:
target = data['entities']
else:
return {'message': 'No Media Found', 'tweet_id': _id}
if 'media' in target:
entities = []
for ent in target['media']:
if ent['type'] == 'video':
entities.append(get_video_url(ent, _id, medium='video'))
elif ent['type'] == 'photo':
entities.append(get_photo_url(ent, _id, medium='photo'))
elif ent['type'] == 'animated_gif':
entities.append(get_video_url(ent, _id, medium='animated_gif'))
return entities
else:
return {'message': 'No Media Found', 'tweet_id': _id}
def item_retrieve(data_dict):
try:
media_dir = os.path.join('media',data_dict['medium'])
increment = 0
while True:
name = os.path.join(media_dir,'{}_{}_{}.{}'.format(data_dict['original_row'],data_dict['tweet_id'],increment, data_dict['type'][-3:]))
if os.path.exists(name):
increment += 1
else:
break
r = requests_retry_session().get(data_dict['media_url'], stream=True)
with open(name, 'wb') as f:
copyfileobj(r.raw, f)
data_dict['media_file'] = name
except HTTPError:
data_dict['media_file'] = 'Error - Could Not Retrieve'
return
return
def if_no_dir_make(path):
import os
try:
os.makedirs(path)
except OSError:
if not os.path.isdir(path):
raise
finally:
return path
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
if __name__ == '__main__':
pass