Plex: Remove recursive thread calls

Signed-off-by: Luigi311 <git@luigi311.com>
pull/136/head
Luigi311 2024-01-10 02:20:29 -07:00
parent 873735900f
commit fe65716706
1 changed files with 58 additions and 50 deletions

View File

@ -1,14 +1,12 @@
import re, requests, os, traceback
import re, requests, traceback
from typing import Dict, Union, FrozenSet
import operator
from itertools import groupby as itertools_groupby
from urllib3.poolmanager import PoolManager
from math import floor
from requests.adapters import HTTPAdapter as RequestsHTTPAdapter
from plexapi.video import Episode, Movie
from plexapi.video import Show, Episode, Movie
from plexapi.server import PlexServer
from plexapi.myplex import MyPlexAccount
@ -37,7 +35,7 @@ class HostNameIgnoringAdapter(RequestsHTTPAdapter):
)
def extract_guids_from_item(item: Union[Movie, Episode]) -> Dict[str, str]:
def extract_guids_from_item(item: Union[Movie, Show, Episode]) -> Dict[str, str]:
guids: Dict[str, str] = dict(
guid.id.split("://")
for guid in item.guids
@ -66,7 +64,7 @@ def get_guids(item: Union[Movie, Episode], completed=True):
) # Merge the metadata and guid dictionaries
def get_user_library_watched_show(show):
def get_user_library_watched_show(show, process_episodes, threads=None):
try:
show_guids: FrozenSet = frozenset(
(
@ -80,25 +78,20 @@ def get_user_library_watched_show(show):
).items() # Merge the metadata and guid dictionaries
)
watched_episodes = show.watched()
episode_guids = {
# Offset group data because the first value will be the key
season: [episode[1] for episode in episodes]
for season, episodes
# Group episodes by first element of tuple (episode.parentIndex)
in itertools_groupby(
[
(
episode.parentIndex,
get_guids(episode, completed=episode in watched_episodes),
)
for episode in show.episodes()
# Only include watched or partially-watched more than a minute episodes
if episode in watched_episodes or episode.viewOffset >= 60000
],
operator.itemgetter(0),
)
}
episode_guids_args = []
for episode in process_episodes:
episode_guids_args.append([get_guids, episode, episode.isWatched])
episode_guids_results = future_thread_executor(
episode_guids_args, threads=threads
)
episode_guids = {}
for index, episode in enumerate(process_episodes):
if episode.parentIndex not in episode_guids:
episode_guids[episode.parentIndex] = []
episode_guids[episode.parentIndex].append(episode_guids_results[index])
return show_guids, episode_guids
except Exception:
@ -119,39 +112,56 @@ def get_user_library_watched(user, user_plex, library):
watched = []
args = [
[get_guids, video, True]
for video
# Get all watched movies
in library_videos.search(unwatched=False)
] + [
[get_guids, video, False]
for video
# Get all partially watched movies
in library_videos.search(inProgress=True)
# Only include partially-watched movies more than a minute
if video.viewOffset >= 60000
[get_guids, video, video.isWatched]
for video in library_videos.search(unwatched=False)
+ library_videos.search(inProgress=True)
if video.isWatched or video.viewOffset >= 60000
]
for guid in future_thread_executor(args, threads=min(os.cpu_count(), 4)):
for guid in future_thread_executor(args, threads=len(args)):
logger(f"Plex: Adding {guid['title']} to {user_name} watched list", 3)
watched.append(guid)
elif library.type == "show":
watched = {}
# Get all watched shows and partially watched shows
args = [
(get_user_library_watched_show, show)
for show in library_videos.search(unwatched=False)
+ library_videos.search(inProgress=True)
]
parallel_show_task = []
parallel_episodes_task = []
for show_guids, episode_guids in future_thread_executor(args, threads=4):
for show in library_videos.search(unwatched=False) + library_videos.search(
inProgress=True
):
process_episodes = []
for episode in show.episodes():
if episode.isWatched or episode.viewOffset >= 60000:
process_episodes.append(episode)
# Shows with more than 24 episodes has its episodes processed in parallel
# Shows with less than 24 episodes has its episodes processed in serial but the shows are processed in parallel
if len(process_episodes) >= 24:
parallel_episodes_task.append(
[
get_user_library_watched_show,
show,
process_episodes,
len(process_episodes),
]
)
else:
parallel_show_task.append(
[get_user_library_watched_show, show, process_episodes, 1]
)
for show_guids, episode_guids in future_thread_executor(
parallel_show_task, threads=len(parallel_show_task)
) + future_thread_executor(parallel_episodes_task, threads=1):
if show_guids and episode_guids:
watched[show_guids] = episode_guids
logger(
f"Plex: Added {episode_guids} to {user_name} {show_guids} watched list",
3,
)
else:
watched = None
@ -436,7 +446,6 @@ class Plex:
try:
# Get all libraries
users_watched = {}
args = []
for user in users:
if self.admin_user == user:
@ -478,13 +487,12 @@ class Plex:
)
continue
args.append([get_user_library_watched, user, user_plex, library])
user_watched = get_user_library_watched(user, user_plex, library)
for user_watched in future_thread_executor(args):
for user, user_watched_temp in user_watched.items():
if user not in users_watched:
users_watched[user] = {}
users_watched[user].update(user_watched_temp)
for user_watched, user_watched_temp in user_watched.items():
if user_watched not in users_watched:
users_watched[user_watched] = {}
users_watched[user_watched].update(user_watched_temp)
return users_watched
except Exception as e: