forked from RemiRigal/Plex-Auto-Languages
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
301 lines (258 loc) · 12.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
import sys
import signal
import argparse
from time import sleep
from typing import List
from apprise import Apprise
from plexapi.video import Episode
from plexapi.server import PlexServer
from datetime import datetime, timedelta
from plexapi.media import AudioStream, SubtitleStream
from utils.logger import init_logger
from utils.scheduler import Scheduler
from utils.configuration import Configuration
from utils.plex import PlexUtils
from utils.healthcheck import HealthcheckServer
class PlexAutoLanguages(object):
def __init__(self, user_config_path: str):
self.alive = False
self.notifier = None
self.set_signal_handlers()
self.healthcheck_server = HealthcheckServer("Plex-Auto-Languages", self.is_ready, self.is_healthy)
self.healthcheck_server.start()
self.config = Configuration(user_config_path)
# Plex
self.plex = PlexServer(self.config.get("plex.url"), self.config.get("plex.token"))
self.plex_user_id = self.get_plex_user_id()
self.session_states = dict() # session_key: session_state
self.default_streams = dict() # item_key: (audio_stream_id, substitle_stream_id)
self.user_clients = dict() # client_identifier: user_id
self.newly_added = dict() # episode_id: added_at
# Scheduler
self.scheduler = None
if self.config.get("scheduler.enable"):
self.scheduler = Scheduler(self.config.get("scheduler.schedule_time"), self.scheduler_callback)
# Notifications
self.apprise = None
if self.config.get("notifications.enable"):
self.apprise = Apprise()
for apprise_config in self.config.get("notifications.apprise_configs"):
self.apprise.add(apprise_config)
def get_plex_user_id(self):
plex_username = self.plex.myPlexAccount().username
for account in self.plex.systemAccounts():
if account.name == plex_username:
logger.info(f"Successfully connected as user '{account.name}' (id: {account.id})")
return account.id
logger.error("Unable to find the user id associated with the provided Plex Token")
sys.exit(0)
def is_ready(self):
return self.alive
def is_healthy(self):
return self.alive and self.notifier is not None and self.notifier.is_alive()
def set_signal_handlers(self):
signal.signal(signal.SIGINT, self.stop)
signal.signal(signal.SIGTERM, self.stop)
def stop(self, *args):
logger.info("Received SIGINT or SIGTERM, stopping gracefully")
self.alive = False
def start(self):
logger.info("Starting alert listener")
self.notifier = self.plex.startAlertListener(self.alert_listener_callback)
if self.scheduler:
logger.info("Starting scheduler")
self.scheduler.start()
self.alive = True
while self.notifier.is_alive() and self.alive:
sleep(1)
if self.scheduler:
logger.info("Stopping scheduler")
self.scheduler.stop_event.set()
logger.info("Stopping alert listener")
self.healthcheck_server.shutdown()
def alert_listener_callback(self, message: dict):
if self.config.get("trigger_on_play") and message["type"] == "playing":
self.process_playing_message(message)
elif self.config.get("trigger_on_activity") and message["type"] == "activity":
self.process_activity_message(message)
elif message["type"] == "timeline":
self.process_timeline_message(message)
def process_playing_message(self, message: dict):
for play_session in message["PlaySessionStateNotification"]:
try:
self.process_play_session(play_session)
except Exception:
logger.exception("Unable to process play session")
def process_play_session(self, play_session: dict):
# Get User id and user's Plex instance
client_identifier = play_session["clientIdentifier"]
if client_identifier not in self.user_clients:
self.user_clients[client_identifier] = PlexUtils.get_user_from_client_identifier(self.plex, client_identifier)
user_id, username = self.user_clients[client_identifier]
if user_id is None:
return
user_plex = PlexUtils.get_plex_instance_of_user(self.plex, self.plex_user_id, user_id)
if user_plex is None:
return
# Skip if not an Episode
item = user_plex.fetchItem(play_session["key"])
if not isinstance(item, Episode):
return
# Skip is the session state is unchanged
session_key = play_session["sessionKey"]
session_state = play_session["state"]
if session_key in self.session_states and self.session_states[session_key] == session_state:
return
self.session_states[session_key] = session_state
# Reset cache if the session is stopped
if session_state == "stopped":
del self.session_states[session_key]
del self.user_clients[client_identifier]
# Skip if selected streams are unchanged
item.reload()
selected_audio_stream, selected_subtitle_stream = PlexUtils.get_selected_streams(item)
pair_id = (selected_audio_stream.id, selected_subtitle_stream.id if selected_subtitle_stream is not None else None)
if item.key in self.default_streams and self.default_streams[item.key] == pair_id:
return
self.default_streams.setdefault(item.key, pair_id)
# Change tracks if needed
self.change_default_tracks_if_needed(username, item)
def process_activity_message(self, message: dict):
for activity in message["ActivityNotification"]:
try:
self.process_activity(activity)
except Exception:
logger.exception("Unable to process activity")
def process_activity(self, activity: dict):
event_state = activity["event"]
if event_state != "ended":
return
activity_type = activity["Activity"]["type"]
if activity_type != "library.refresh.items":
return
media_key = activity["Activity"]["Context"]["key"]
user_id = activity["Activity"]["userID"]
# Switch to the user's Plex instance
user_plex = PlexUtils.get_plex_instance_of_user(self.plex, self.plex_user_id, user_id)
if user_plex is None:
return
# Skip if not an Episode
item = user_plex.fetchItem(media_key)
if not isinstance(item, Episode):
return
# Change tracks if needed
item.reload()
user = PlexUtils.get_user_from_user_id(self.plex, user_id)
if user is None:
return
self.change_default_tracks_if_needed(user.name, item)
def process_timeline_message(self, message: dict):
for timeline in message["TimelineEntry"]:
try:
self.process_timeline(timeline)
except Exception:
logger.exception("Unable to process timeline")
def process_timeline(self, timeline: dict):
if "metadataState" in timeline or "mediaState" in timeline:
return
identifier = timeline["identifier"]
state = timeline["state"]
entry_type = timeline["type"]
if identifier != "com.plexapp.plugins.library" or state != 5 or entry_type == -1:
return
item_id = int(timeline["itemID"])
# Skip if not an Episode
item = self.plex.fetchItem(item_id)
if not isinstance(item, Episode):
return
# Check if the item has been added recently
if item.addedAt < datetime.now() - timedelta(minutes=2):
return
if item_id in self.newly_added and self.newly_added[item_id] == item.addedAt:
return
self.newly_added[item_id] = item.addedAt
# Change tracks for all users
logger.info(f"Processing newly added episode '{item.show().title}' (S{item.seasonNumber:02}E{item.episodeNumber:02})")
all_user_ids = [self.plex_user_id] + PlexUtils.get_all_user_ids(self.plex)
for user_id in all_user_ids:
# Switch to the user's Plex instance
user_plex = PlexUtils.get_plex_instance_of_user(self.plex, self.plex_user_id, user_id)
if user_plex is None:
continue
# Get the most recently watched episode or the first one of the show
user_item = user_plex.fetchItem(item_id)
reference = PlexUtils.get_last_watched_or_first_episode(user_plex, user_item.show())
if reference is None:
continue
# Change tracks
reference.reload()
user_item.reload()
user = PlexUtils.get_user_from_user_id(self.plex, user_id)
if user is None:
return
self.change_default_tracks_if_needed(user.name, reference, episodes=[user_item])
def notify_changes(self, username: str, episode: Episode, episodes: List[Episode], nb_updated: int, nb_total: int):
target_audio, target_subtitles = PlexUtils.get_selected_streams(episode)
season_numbers = [e.seasonNumber for e in episodes]
min_season_number, max_season_number = min(season_numbers), max(season_numbers)
min_episode_number = min([e.episodeNumber for e in episodes if e.seasonNumber == min_season_number])
max_episode_number = max([e.episodeNumber for e in episodes if e.seasonNumber == max_season_number])
from_str = f"S{min_season_number:02}E{min_episode_number:02}"
to_str = f"S{max_season_number:02}E{max_episode_number:02}"
range_str = f"{from_str} - {to_str}" if from_str != to_str else from_str
title = f"PlexAutoLanguages - {episode.show().title}"
message = (
f"Show: {episode.show().title}\n"
f"User: {username}\n"
f"Audio: {target_audio.displayTitle if target_audio is not None else 'None'}\n"
f"Subtitles: {target_subtitles.displayTitle if target_subtitles is not None else 'None'}\n"
f"Updated episodes: {nb_updated}/{nb_total} ({range_str})"
)
inline_message = message.replace("\n", " | ")
logger.info(f"Language update: {inline_message}")
if self.apprise is None:
return
self.apprise.notify(title=title, body=message)
def scheduler_callback(self):
logger.info("Starting scheduler task")
min_date = datetime.now() - timedelta(days=1)
history = self.plex.history(mindate=min_date)
for episode in [media for media in history if isinstance(media, Episode)]:
episode.reload()
self.change_default_tracks_if_needed(None, episode)
def change_default_tracks_if_needed(self, username: str, episode: Episode, episodes: List[Episode] = None):
if episodes is None:
# Get episodes to update
update_level = self.config.get("update_level")
update_strategy = self.config.get("update_strategy")
episodes = PlexUtils.get_episodes_to_process(update_level, update_strategy, episode)
# Get changes to perform
changes = PlexUtils.get_track_changes(episode, episodes)
if len(changes) == 0:
return
# Perform changes
for _, part, stream_type, new_stream in changes:
if stream_type == AudioStream.STREAMTYPE:
part.setDefaultAudioStream(new_stream)
elif stream_type == SubtitleStream.STREAMTYPE and new_stream is None:
part.resetDefaultSubtitleStream()
elif stream_type == SubtitleStream.STREAMTYPE:
part.setDefaultSubtitleStream(new_stream)
# Notify changes
nb_updated_episodes = len({e.key for e, _, _, _ in changes})
nb_total_episodes = len(episodes)
self.notify_changes(username, episode, episodes, nb_updated_episodes, nb_total_episodes)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config_file", type=str, help="Config file path")
return parser.parse_args()
if __name__ == "__main__":
logger = init_logger()
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config_file", type=str, help="Config file path")
args = parser.parse_args()
plex_auto_languages = PlexAutoLanguages(args.config_file)
try:
plex_auto_languages.start()
except KeyboardInterrupt:
logger.info("Caught KeyboardInterrupt, shutting down gracefully")