Refactor get_user_library_watched_show

This commit is contained in:
Roberto Banić
2023-04-14 10:14:10 +02:00
committed by Luigi311
parent 9498335e22
commit 2e4c2a6817

View File

@@ -1,9 +1,13 @@
import re, requests, os, traceback import re, requests, os, traceback
from typing import Dict, Union from typing import Dict, Union, FrozenSet
import operator
from itertools import groupby as itertools_groupby
from urllib3.poolmanager import PoolManager from urllib3.poolmanager import PoolManager
from math import floor from math import floor
from requests.adapters import HTTPAdapter as RequestsHTTPAdapter
from plexapi.video import Episode, Movie from plexapi.video import Episode, Movie
from plexapi.server import PlexServer from plexapi.server import PlexServer
from plexapi.myplex import MyPlexAccount from plexapi.myplex import MyPlexAccount
@@ -22,7 +26,7 @@ from src.library import (
# Bypass hostname validation for ssl. Taken from https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186 # Bypass hostname validation for ssl. Taken from https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186
class HostNameIgnoringAdapter(requests.adapters.HTTPAdapter): class HostNameIgnoringAdapter(RequestsHTTPAdapter):
def init_poolmanager(self, connections, maxsize, block=..., **pool_kwargs): def init_poolmanager(self, connections, maxsize, block=..., **pool_kwargs):
self.poolmanager = PoolManager( self.poolmanager = PoolManager(
num_pools=connections, num_pools=connections,
@@ -33,7 +37,7 @@ class HostNameIgnoringAdapter(requests.adapters.HTTPAdapter):
) )
def get_guids(item: Union[Movie, Episode], completed=True): def extract_guids_from_item(item: Union[Movie, Episode]) -> Dict[str, str]:
guids: Dict[str, str] = dict( guids: Dict[str, str] = dict(
guid.id.split('://') guid.id.split('://')
for guid for guid
@@ -47,6 +51,10 @@ def get_guids(item: Union[Movie, Episode], completed=True):
1, 1,
) )
return guids
def get_guids(item: Union[Movie, Episode], completed=True):
return { return {
'title': item.title, 'title': item.title,
'locations': tuple([location.split("/")[-1] for location in item.locations]), 'locations': tuple([location.split("/")[-1] for location in item.locations]),
@@ -54,49 +62,41 @@ def get_guids(item: Union[Movie, Episode], completed=True):
"completed": completed, "completed": completed,
"time": item.viewOffset, "time": item.viewOffset,
} }
} | guids } | extract_guids_from_item(item) # Merge the metadata and guid dictionaries
def get_user_library_watched_show(show): def get_user_library_watched_show(show):
try: try:
show_guids = {} show_guids: FrozenSet = frozenset(
try: ({
for show_guid in show.guids: 'title': show.title,
# Extract source and id from guid.id 'locations': tuple(
m = re.match(r"(.*)://(.*)", show_guid.id) [location.split("/")[-1] for location in show.locations])
show_guid_source, show_guid_id = m.group(1).lower(), m.group(2) } | extract_guids_from_item(show)).items() # Merge the metadata and guid dictionaries
show_guids[show_guid_source] = show_guid_id )
except Exception:
logger( watched_episodes = show.watched()
f"Plex: Failed to get guids for {show.title}, Using location only", 1 episode_guids = {
# Offset group data because the first value will be the key
season: [episode[1] for episode in episodes]
for season, episodes
# Group episodes by first element of tuple (episode.parentIndex)
in itertools_groupby(
[
(
episode.parentIndex,
get_guids(episode, completed=episode in watched_episodes)
)
for episode
in show.episodes()
# Only include watched/partially-watched episodes
if episode in watched_episodes or episode.viewOffset > 0
],
operator.itemgetter(0)
) )
}
show_guids["title"] = show.title
show_guids["locations"] = tuple([x.split("/")[-1] for x in show.locations])
show_guids = frozenset(show_guids.items())
# Get all watched episodes for show
episode_guids = {}
watched = show.watched()
for episode in show.episodes():
if episode in watched:
if episode.parentIndex not in episode_guids:
episode_guids[episode.parentIndex] = []
episode_guids[episode.parentTitle].append(
get_guids(episode, completed=True)
)
elif episode.viewOffset > 0:
if episode.parentIndex not in episode_guids:
episode_guids[episode.parentIndex] = []
episode_guids[episode.parentTitle].append(
get_guids(episode, completed=False)
)
return show_guids, episode_guids return show_guids, episode_guids
except Exception: except Exception:
return {}, {} return {}, {}