diff --git a/src/plex.py b/src/plex.py index 631c8a7..e078f1d 100644 --- a/src/plex.py +++ b/src/plex.py @@ -1,14 +1,12 @@ -import re, requests, os, traceback +import re, requests, traceback from typing import Dict, Union, FrozenSet -import operator -from itertools import groupby as itertools_groupby from urllib3.poolmanager import PoolManager from math import floor from requests.adapters import HTTPAdapter as RequestsHTTPAdapter -from plexapi.video import Episode, Movie +from plexapi.video import Show, Episode, Movie from plexapi.server import PlexServer from plexapi.myplex import MyPlexAccount @@ -37,7 +35,7 @@ class HostNameIgnoringAdapter(RequestsHTTPAdapter): ) -def extract_guids_from_item(item: Union[Movie, Episode]) -> Dict[str, str]: +def extract_guids_from_item(item: Union[Movie, Show, Episode]) -> Dict[str, str]: guids: Dict[str, str] = dict( guid.id.split("://") for guid in item.guids @@ -66,7 +64,7 @@ def get_guids(item: Union[Movie, Episode], completed=True): ) # Merge the metadata and guid dictionaries -def get_user_library_watched_show(show): +def get_user_library_watched_show(show, process_episodes, threads=None): try: show_guids: FrozenSet = frozenset( ( @@ -80,25 +78,20 @@ def get_user_library_watched_show(show): ).items() # Merge the metadata and guid dictionaries ) - watched_episodes = show.watched() - episode_guids = { - # Offset group data because the first value will be the key - season: [episode[1] for episode in episodes] - for season, episodes - # Group episodes by first element of tuple (episode.parentIndex) - in itertools_groupby( - [ - ( - episode.parentIndex, - get_guids(episode, completed=episode in watched_episodes), - ) - for episode in show.episodes() - # Only include watched or partially-watched more than a minute episodes - if episode in watched_episodes or episode.viewOffset >= 60000 - ], - operator.itemgetter(0), - ) - } + episode_guids_args = [] + + for episode in process_episodes: + episode_guids_args.append([get_guids, episode, episode.isWatched]) + + episode_guids_results = future_thread_executor( + episode_guids_args, threads=threads + ) + + episode_guids = {} + for index, episode in enumerate(process_episodes): + if episode.parentIndex not in episode_guids: + episode_guids[episode.parentIndex] = [] + episode_guids[episode.parentIndex].append(episode_guids_results[index]) return show_guids, episode_guids except Exception: @@ -119,39 +112,56 @@ def get_user_library_watched(user, user_plex, library): watched = [] args = [ - [get_guids, video, True] - for video - # Get all watched movies - in library_videos.search(unwatched=False) - ] + [ - [get_guids, video, False] - for video - # Get all partially watched movies - in library_videos.search(inProgress=True) - # Only include partially-watched movies more than a minute - if video.viewOffset >= 60000 + [get_guids, video, video.isWatched] + for video in library_videos.search(unwatched=False) + + library_videos.search(inProgress=True) + if video.isWatched or video.viewOffset >= 60000 ] - for guid in future_thread_executor(args, threads=min(os.cpu_count(), 4)): + for guid in future_thread_executor(args, threads=len(args)): logger(f"Plex: Adding {guid['title']} to {user_name} watched list", 3) watched.append(guid) elif library.type == "show": watched = {} # Get all watched shows and partially watched shows - args = [ - (get_user_library_watched_show, show) - for show in library_videos.search(unwatched=False) - + library_videos.search(inProgress=True) - ] + parallel_show_task = [] + parallel_episodes_task = [] - for show_guids, episode_guids in future_thread_executor(args, threads=4): + for show in library_videos.search(unwatched=False) + library_videos.search( + inProgress=True + ): + process_episodes = [] + for episode in show.episodes(): + if episode.isWatched or episode.viewOffset >= 60000: + process_episodes.append(episode) + + # Shows with more than 24 episodes has its episodes processed in parallel + # Shows with less than 24 episodes has its episodes processed in serial but the shows are processed in parallel + if len(process_episodes) >= 24: + parallel_episodes_task.append( + [ + get_user_library_watched_show, + show, + process_episodes, + len(process_episodes), + ] + ) + else: + parallel_show_task.append( + [get_user_library_watched_show, show, process_episodes, 1] + ) + + for show_guids, episode_guids in future_thread_executor( + parallel_show_task, threads=len(parallel_show_task) + ) + future_thread_executor(parallel_episodes_task, threads=1): if show_guids and episode_guids: watched[show_guids] = episode_guids logger( f"Plex: Added {episode_guids} to {user_name} {show_guids} watched list", 3, ) + else: watched = None @@ -436,7 +446,6 @@ class Plex: try: # Get all libraries users_watched = {} - args = [] for user in users: if self.admin_user == user: @@ -478,13 +487,12 @@ class Plex: ) continue - args.append([get_user_library_watched, user, user_plex, library]) + user_watched = get_user_library_watched(user, user_plex, library) - for user_watched in future_thread_executor(args): - for user, user_watched_temp in user_watched.items(): - if user not in users_watched: - users_watched[user] = {} - users_watched[user].update(user_watched_temp) + for user_watched, user_watched_temp in user_watched.items(): + if user_watched not in users_watched: + users_watched[user_watched] = {} + users_watched[user_watched].update(user_watched_temp) return users_watched except Exception as e: