diff --git a/src/black_white.py b/src/black_white.py index d6880ee..1c0ba3d 100644 --- a/src/black_white.py +++ b/src/black_white.py @@ -12,7 +12,7 @@ def setup_black_white_lists( whitelist_users: list[str] | None, library_mapping: dict[str, str] | None = None, user_mapping: dict[str, str] | None = None, -): +) -> tuple[list[str], list[str], list[str], list[str], list[str], list[str]]: blacklist_library, blacklist_library_type, blacklist_users = setup_x_lists( blacklist_library, blacklist_library_type, diff --git a/src/connection.py b/src/connection.py index 2083078..454ebf2 100644 --- a/src/connection.py +++ b/src/connection.py @@ -25,17 +25,17 @@ def jellyfin_emby_server_connection( f"{server_type.upper()}_BASEURL and {server_type.upper()}_TOKEN must have the same number of entries" ) - for i, baseurl in enumerate(server_baseurls): - baseurl = baseurl.strip() - if baseurl[-1] == "/": - baseurl = baseurl[:-1] + for i, base_url in enumerate(server_baseurls): + base_url = base_url.strip() + if base_url[-1] == "/": + base_url = base_url[:-1] if server_type == "jellyfin": - server = Jellyfin(baseurl=baseurl, token=server_tokens[i].strip()) + server = Jellyfin(base_url=base_url, token=server_tokens[i].strip()) servers.append(server) elif server_type == "emby": - server = Emby(baseurl=baseurl, token=server_tokens[i].strip()) + server = Emby(base_url=base_url, token=server_tokens[i].strip()) servers.append(server) else: raise Exception("Unknown server type") diff --git a/src/emby.py b/src/emby.py index a5d368a..726d30b 100644 --- a/src/emby.py +++ b/src/emby.py @@ -4,7 +4,7 @@ from loguru import logger class Emby(JellyfinEmby): - def __init__(self, baseurl, token): + def __init__(self, base_url: str, token: str) -> None: authorization = ( "Emby , " 'Client="JellyPlex-Watched", ' @@ -19,7 +19,7 @@ class Emby(JellyfinEmby): } super().__init__( - server_type="Emby", baseurl=baseurl, token=token, headers=headers + server_type="Emby", base_url=base_url, token=token, headers=headers ) def is_partial_update_supported(self, server_version: Version) -> bool: diff --git a/src/functions.py b/src/functions.py index 50175a0..a70c031 100644 --- a/src/functions.py +++ b/src/functions.py @@ -16,7 +16,7 @@ def log_marked( movie_show: str, episode: str | None = None, duration: float | None = None, -): +) -> None: output = f"{server_type}/{server_name}/{username}/{library}/{movie_show}" if episode: diff --git a/src/jellyfin.py b/src/jellyfin.py index 7c49887..dce0bec 100644 --- a/src/jellyfin.py +++ b/src/jellyfin.py @@ -4,7 +4,7 @@ from loguru import logger class Jellyfin(JellyfinEmby): - def __init__(self, baseurl, token): + def __init__(self, base_url: str, token: str) -> None: authorization = ( "MediaBrowser , " 'Client="JellyPlex-Watched", ' @@ -19,7 +19,7 @@ class Jellyfin(JellyfinEmby): } super().__init__( - server_type="Jellyfin", baseurl=baseurl, token=token, headers=headers + server_type="Jellyfin", base_url=base_url, token=token, headers=headers ) def is_partial_update_supported(self, server_version: Version) -> bool: diff --git a/src/jellyfin_emby.py b/src/jellyfin_emby.py index d685f43..75fc3ab 100644 --- a/src/jellyfin_emby.py +++ b/src/jellyfin_emby.py @@ -30,7 +30,9 @@ generate_guids = str_to_bool(os.getenv("GENERATE_GUIDS", "True")) generate_locations = str_to_bool(os.getenv("GENERATE_LOCATIONS", "True")) -def extract_identifiers_from_item(server_type: str, item: dict) -> MediaIdentifiers: +def extract_identifiers_from_item( + server_type: str, item: dict[str, Any] +) -> MediaIdentifiers: title = item.get("Name") id = None if not title: @@ -45,7 +47,7 @@ def extract_identifiers_from_item(server_type: str, item: dict) -> MediaIdentifi f"{server_type}: {title if title else id} has no guids", ) - locations: tuple = tuple() + locations: tuple[str, ...] = tuple() if generate_locations: if item.get("Path"): locations = tuple([item["Path"].split("/")[-1]]) @@ -70,7 +72,7 @@ def extract_identifiers_from_item(server_type: str, item: dict) -> MediaIdentifi ) -def get_mediaitem(server_type: str, item: dict) -> MediaItem: +def get_mediaitem(server_type: str, item: dict[str, Any]) -> MediaItem: return MediaItem( identifiers=extract_identifiers_from_item(server_type, item), status=WatchedStatus( @@ -86,20 +88,20 @@ class JellyfinEmby: def __init__( self, server_type: Literal["Jellyfin", "Emby"], - baseurl: str, + base_url: str, token: str, headers: dict[str, str], - ): + ) -> None: if server_type not in ["Jellyfin", "Emby"]: raise Exception(f"Server type {server_type} not supported") self.server_type: str = server_type - self.baseurl: str = baseurl + self.base_url: str = base_url self.token: str = token self.headers: dict[str, str] = headers self.timeout: int = int(os.getenv("REQUEST_TIMEOUT", 300)) - if not self.baseurl: - raise Exception(f"{self.server_type} baseurl not set") + if not self.base_url: + raise Exception(f"{self.server_type} base_url not set") if not self.token: raise Exception(f"{self.server_type} token not set") @@ -118,13 +120,13 @@ class JellyfinEmby: query_type: Literal["get", "post"], identifiers: dict[str, str] | None = None, json: dict[str, float] | None = None, - ): + ) -> list[dict[str, Any]] | dict[str, Any] | None: try: results = None if query_type == "get": response = self.session.get( - self.baseurl + query, headers=self.headers, timeout=self.timeout + self.base_url + query, headers=self.headers, timeout=self.timeout ) if response.status_code not in [200, 204]: raise Exception( @@ -137,7 +139,7 @@ class JellyfinEmby: elif query_type == "post": response = self.session.post( - self.baseurl + query, + self.base_url + query, headers=self.headers, json=json, timeout=self.timeout, @@ -173,9 +175,9 @@ class JellyfinEmby: try: query_string = "/System/Info/Public" - response: dict[str, Any] | None = self.query(query_string, "get") + response = self.query(query_string, "get") - if response: + if response and isinstance(response, dict): if name_only: return response.get("ServerName") elif version_only: @@ -194,14 +196,11 @@ class JellyfinEmby: users: dict[str, str] = {} query_string = "/Users" - response: list[dict[str, str | bool]] | None = self.query( - query_string, "get" - ) + response = self.query(query_string, "get") - if response: + if response and isinstance(response, list): for user in response: - if isinstance(user["Name"], str) and isinstance(user["Id"], str): - users[user["Name"]] = user["Id"] + users[user["Name"]] = user["Id"] return users except Exception as e: @@ -216,11 +215,9 @@ class JellyfinEmby: users = self.get_users() for user_name, user_id in users.items(): - user_libraries: dict[str, Any] | None = self.query( - f"/Users/{user_id}/Views", "get" - ) + user_libraries = self.query(f"/Users/{user_id}/Views", "get") - if not user_libraries: + if not user_libraries or not isinstance(user_libraries, dict): logger.error( f"{self.server_type}: Failed to get libraries for {user_name}" ) @@ -264,19 +261,26 @@ class JellyfinEmby: # Movies if library_type == "Movie": + movie_items = [] watched_items = self.query( f"/Users/{user_id}/Items" + f"?ParentId={library_id}&Filters=IsPlayed&IncludeItemTypes=Movie&Recursive=True&Fields=ItemCounts,ProviderIds,MediaSources", "get", - ).get("Items", []) + ) + + if watched_items and isinstance(watched_items, dict): + movie_items += watched_items.get("Items", []) in_progress_items = self.query( f"/Users/{user_id}/Items" + f"?ParentId={library_id}&Filters=IsResumable&IncludeItemTypes=Movie&Recursive=True&Fields=ItemCounts,ProviderIds,MediaSources", "get", - ).get("Items", []) + ) - for movie in watched_items + in_progress_items: + if in_progress_items and isinstance(in_progress_items, dict): + movie_items += in_progress_items.get("Items", []) + + for movie in movie_items: # Skip if theres no user data which means the movie has not been watched if not movie.get("UserData"): continue @@ -295,15 +299,21 @@ class JellyfinEmby: # TV Shows if library_type in ["Series", "Episode"]: # Retrieve a list of watched TV shows - watched_shows = self.query( + all_shows = self.query( f"/Users/{user_id}/Items" + f"?ParentId={library_id}&isPlaceHolder=false&IncludeItemTypes=Series&Recursive=True&Fields=ProviderIds,Path,RecursiveItemCount", "get", - ).get("Items", []) + ) + + if not all_shows or not isinstance(all_shows, dict): + logger.debug( + f"{self.server_type}: Failed to get shows for {user_name} in {library_title}" + ) + return watched # Filter the list of shows to only include those that have been partially or fully watched watched_shows_filtered = [] - for show in watched_shows: + for show in all_shows.get("Items", []): if not show.get("UserData"): continue @@ -312,6 +322,7 @@ class JellyfinEmby: # Retrieve the watched/partially watched list of episodes of each watched show for show in watched_shows_filtered: + show_name = show.get("Name") show_guids = { k.lower(): v for k, v in show.get("ProviderIds", {}).items() } @@ -325,12 +336,18 @@ class JellyfinEmby: f"/Shows/{show.get('Id')}/Episodes" + f"?userId={user_id}&isPlaceHolder=false&Fields=ProviderIds,MediaSources", "get", - ).get("Items", []) + ) + + if not show_episodes or not isinstance(show_episodes, dict): + logger.debug( + f"{self.server_type}: Failed to get episodes for {user_name} {library_title} {show_name}" + ) + continue # Iterate through the episodes # Create a list to store the episodes episode_mediaitem = [] - for episode in show_episodes: + for episode in show_episodes.get("Items", []): if not episode.get("UserData"): continue @@ -381,35 +398,44 @@ class JellyfinEmby: users_watched: dict[str, UserData] = {} for user_name, user_id in users.items(): - libraries = [ - self.query( - f"/Users/{user_id}/Items" - f"?ParentId={lib.get('Id')}&Filters=IsPlayed&Recursive=True&excludeItemTypes=Folder&limit=100", - "get", - identifiers={ - "library_id": lib["Id"], - "library_title": lib["Name"], - }, - ) - for lib in self.query(f"/Users/{user_id}/Views", "get").get( - "Items", [] - ) - if lib.get("Name") in sync_libraries - ] + libraries = [] - for library in libraries: - if not library.get("Items"): + all_libraries = self.query(f"/Users/{user_id}/Views", "get") + for library in all_libraries["Items"]: + library_id = library["Id"] + library_title = library["Name"] + + if library_title not in sync_libraries: continue - library_id = library.get("Identifiers", {}).get("library_id") - library_title = library.get("Identifiers", {}).get("library_title") + identifiers: dict[str, str] = { + "library_id": library_id, + "library_title": library_title, + } + libraries.append( + self.query( + f"/Users/{user_id}/Items" + + f"?ParentId={library_id}&Filters=IsPlayed&Recursive=True&excludeItemTypes=Folder&limit=100", + "get", + identifiers=identifiers, + ) + ) + + for library in libraries: + if len(library["Items"]) == 0: + continue + + library_id: str = library["Identifiers"]["library_id"] + library_title: str = library["Identifiers"]["library_title"] # Get all library types excluding "Folder" - types = { - x["Type"] - for x in library.get("Items", []) - if x.get("Type") in {"Movie", "Series", "Episode"} - } + types = set( + [ + x["Type"] + for x in library["Items"] + if x["Type"] in ["Movie", "Series", "Episode"] + ] + ) for library_type in types: # Get watched for user @@ -441,7 +467,7 @@ class JellyfinEmby: library_name: str, library_id: str, dryrun: bool, - ): + ) -> None: try: # If there are no movies or shows to update, exit early. if not library_data.series and not library_data.movies: @@ -458,9 +484,15 @@ class JellyfinEmby: + f"?SortBy=SortName&SortOrder=Ascending&Recursive=True&ParentId={library_id}" + "&isPlayed=false&Fields=ItemCounts,ProviderIds,MediaSources&IncludeItemTypes=Movie", "get", - ).get("Items", []) + ) - for jellyfin_video in jellyfin_search: + if not jellyfin_search or not isinstance(jellyfin_search, dict): + logger.debug( + f"{self.server_type}: Failed to get movies for {user_name} {library_name}" + ) + return + + for jellyfin_video in jellyfin_search.get("Items", []): jelly_identifiers = extract_identifiers_from_item( self.server_type, jellyfin_video ) @@ -522,6 +554,12 @@ class JellyfinEmby: + "&Fields=ItemCounts,ProviderIds,Path&IncludeItemTypes=Series", "get", ) + if not jellyfin_search or not isinstance(jellyfin_search, dict): + logger.debug( + f"{self.server_type}: Failed to get shows for {user_name} {library_name}" + ) + return + jellyfin_shows = [x for x in jellyfin_search.get("Items", [])] for jellyfin_show in jellyfin_shows: @@ -543,9 +581,17 @@ class JellyfinEmby: f"/Shows/{jellyfin_show_id}/Episodes" + f"?userId={user_id}&Fields=ItemCounts,ProviderIds,MediaSources", "get", - ).get("Items", []) + ) - for jellyfin_episode in jellyfin_episodes: + if not jellyfin_episodes or not isinstance( + jellyfin_episodes, dict + ): + logger.debug( + f"{self.server_type}: Failed to get episodes for {user_name} {library_name} {jellyfin_show.get('Name')}" + ) + return + + for jellyfin_episode in jellyfin_episodes.get("Items", []): jellyfin_episode_identifiers = ( extract_identifiers_from_item( self.server_type, jellyfin_episode @@ -629,10 +675,10 @@ class JellyfinEmby: def update_watched( self, watched_list: dict[str, UserData], - user_mapping=None, - library_mapping=None, - dryrun=False, - ): + user_mapping: dict[str, str] | None = None, + library_mapping: dict[str, str] | None = None, + dryrun: bool = False, + ) -> None: try: for user, user_data in watched_list.items(): user_other = None @@ -662,6 +708,13 @@ class JellyfinEmby: f"/Users/{user_id}/Views", "get", ) + + if not jellyfin_libraries or not isinstance(jellyfin_libraries, dict): + logger.debug( + f"{self.server_type}: Failed to get libraries for {user_name}" + ) + continue + jellyfin_libraries = [x for x in jellyfin_libraries.get("Items", [])] for library_name in user_data.libraries: diff --git a/src/library.py b/src/library.py index 2d161c8..abafb97 100644 --- a/src/library.py +++ b/src/library.py @@ -5,6 +5,10 @@ from src.functions import ( search_mapping, ) +from src.emby import Emby +from src.jellyfin import Jellyfin +from src.plex import Plex + def check_skip_logic( library_title: str, @@ -54,7 +58,7 @@ def check_blacklist_logic( blacklist_library: list[str], blacklist_library_type: list[str], library_other: str | None = None, -): +) -> str | None: skip_reason = None if isinstance(library_type, (list, tuple, set)): for library_type_item in library_type: @@ -90,7 +94,7 @@ def check_whitelist_logic( whitelist_library: list[str], whitelist_library_type: list[str], library_other: str | None = None, -): +) -> str | None: skip_reason = None if len(whitelist_library_type) > 0: if isinstance(library_type, (list, tuple, set)): @@ -161,8 +165,8 @@ def filter_libaries( def setup_libraries( - server_1, - server_2, + server_1: Plex | Jellyfin | Emby, + server_2: Plex | Jellyfin | Emby, blacklist_library: list[str], blacklist_library_type: list[str], whitelist_library: list[str], diff --git a/src/main.py b/src/main.py index c848eb8..731a37a 100644 --- a/src/main.py +++ b/src/main.py @@ -27,7 +27,7 @@ log_file = os.getenv("LOG_FILE", os.getenv("LOGFILE", "log.log")) level = os.getenv("DEBUG_LEVEL", "INFO").upper() -def configure_logger(): +def configure_logger() -> None: # Remove default logger to configure our own logger.remove() @@ -111,18 +111,20 @@ def should_sync_server( return True -def main_loop(): +def main_loop() -> None: dryrun = str_to_bool(os.getenv("DRYRUN", "False")) logger.info(f"Dryrun: {dryrun}") - user_mapping = os.getenv("USER_MAPPING", None) - if user_mapping: - user_mapping = json.loads(user_mapping.lower()) + user_mapping_env = os.getenv("USER_MAPPING", None) + user_mapping = None + if user_mapping_env: + user_mapping = json.loads(user_mapping_env.lower()) logger.info(f"User Mapping: {user_mapping}") - library_mapping = os.getenv("LIBRARY_MAPPING", None) - if library_mapping: - library_mapping = json.loads(library_mapping) + library_mapping_env = os.getenv("LIBRARY_MAPPING", None) + library_mapping = None + if library_mapping_env: + library_mapping = json.loads(library_mapping_env) logger.info(f"Library Mapping: {library_mapping}") # Create (black/white)lists @@ -241,7 +243,7 @@ def main_loop(): @logger.catch -def main(): +def main() -> None: run_only_once = str_to_bool(os.getenv("RUN_ONLY_ONCE", "False")) sleep_duration = float(os.getenv("SLEEP_DURATION", "3600")) times: list[float] = [] diff --git a/src/plex.py b/src/plex.py index 063e215..31f1f26 100644 --- a/src/plex.py +++ b/src/plex.py @@ -36,7 +36,9 @@ generate_locations = str_to_bool(os.getenv("GENERATE_LOCATIONS", "True")) # Bypass hostname validation for ssl. Taken from https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186 class HostNameIgnoringAdapter(RequestsHTTPAdapter): - def init_poolmanager(self, connections, maxsize, block=..., **pool_kwargs): + def init_poolmanager( + self, connections: int, maxsize: int | None, block=..., **pool_kwargs + ) -> None: self.poolmanager = PoolManager( num_pools=connections, maxsize=maxsize, @@ -89,7 +91,7 @@ def update_user_watched( library_data: LibraryData, library_name: str, dryrun: bool, -): +) -> None: try: # If there are no movies or shows to update, exit early. if not library_data.series and not library_data.movies: @@ -224,8 +226,8 @@ class Plex: password: str | None = None, server_name: str | None = None, ssl_bypass: bool = False, - session=None, - ): + session: requests.Session | None = None, + ) -> None: self.server_type: str = "Plex" self.ssl_bypass: bool = ssl_bypass if ssl_bypass: @@ -426,10 +428,10 @@ class Plex: def update_watched( self, watched_list: dict[str, UserData], - user_mapping=None, - library_mapping=None, - dryrun=False, - ): + user_mapping: dict[str, str] | None = None, + library_mapping: dict[str, str] | None = None, + dryrun: bool = False, + ) -> None: try: for user, user_data in watched_list.items(): user_other = None diff --git a/src/users.py b/src/users.py index 84cade1..bdebf6a 100644 --- a/src/users.py +++ b/src/users.py @@ -1,4 +1,4 @@ -from plexapi.myplex import MyPlexAccount +from plexapi.myplex import MyPlexAccount, MyPlexUser from loguru import logger from src.emby import Emby @@ -109,7 +109,10 @@ def setup_users( blacklist_users: list[str], whitelist_users: list[str], user_mapping: dict[str, str] | None = None, -) -> tuple[list[MyPlexAccount] | dict[str, str], list[MyPlexAccount] | dict[str, str]]: +) -> tuple[ + list[MyPlexAccount | MyPlexUser] | dict[str, str], + list[MyPlexAccount | MyPlexUser] | dict[str, str], +]: server_1_users = generate_user_list(server_1) server_2_users = generate_user_list(server_2) logger.debug(f"Server 1 users: {server_1_users}") diff --git a/src/watched.py b/src/watched.py index 217d781..6ce9f70 100644 --- a/src/watched.py +++ b/src/watched.py @@ -1,6 +1,7 @@ import copy from pydantic import BaseModel, Field from loguru import logger +from typing import Any from src.functions import search_mapping @@ -103,8 +104,8 @@ def check_remove_entry(item1: MediaItem, item2: MediaItem) -> bool: def cleanup_watched( watched_list_1: dict[str, UserData], watched_list_2: dict[str, UserData], - user_mapping=None, - library_mapping=None, + user_mapping: dict[str, str] | None = None, + library_mapping: dict[str, str] | None = None, ) -> dict[str, UserData]: modified_watched_list_1 = copy.deepcopy(watched_list_1) @@ -199,11 +200,17 @@ def cleanup_watched( return modified_watched_list_1 -def get_other(watched_list, object_1, object_2): +def get_other( + watched_list: dict[str, Any], object_1: str, object_2: str | None +) -> str | None: if object_1 in watched_list: return object_1 - elif object_2 in watched_list: + + if object_2 and object_2 in watched_list: return object_2 - else: - logger.info(f"{object_1} and {object_2} not found in watched list 2") - return None + + logger.info( + f"{object_1}{' and ' + object_2 if object_2 else ''} not found in watched list 2" + ) + + return None