diff --git a/src/jellyfin_emby.py b/src/jellyfin_emby.py index 723f462..0c31e67 100644 --- a/src/jellyfin_emby.py +++ b/src/jellyfin_emby.py @@ -27,16 +27,16 @@ from src.watched import ( def extract_identifiers_from_item( - server_type: str, + server: Any, + user_id: str, item: dict[str, Any], generate_guids: bool, generate_locations: bool, ) -> MediaIdentifiers: title = item.get("Name") - id = None + id = item.get("Id") if not title: - id = item.get("Id") - logger.debug(f"{server_type}: Name not found for {id}") + logger.debug(f"{server.server_type}: Name not found for {id}") guids = {} if generate_guids: @@ -53,17 +53,15 @@ def extract_identifiers_from_item( locations = tuple([filename_from_any_path(x) for x in full_paths]) full_path = " ".join(full_paths) - if generate_guids: - if not guids: - logger.debug( - f"{server_type}: {title if title else id} has no guids{f', locations: {full_path}' if full_path else ''}", - ) + if generate_guids and not guids: + logger.debug( + f"{server.server_type}: {title or id} has no guids{f', locations: {full_path}' if full_path else ''}", + ) - if generate_locations: - if not locations: - logger.debug( - f"{server_type}: {title if title else id} has no locations{f', guids: {guids}' if guids else ''}", - ) + if generate_locations and not locations: + logger.debug( + f"{server.server_type}: {title or id} has no locations{f', guids: {guids}' if guids else ''}", + ) return MediaIdentifiers( title=title, @@ -71,11 +69,15 @@ def extract_identifiers_from_item( imdb_id=guids.get("imdb"), tvdb_id=guids.get("tvdb"), tmdb_id=guids.get("tmdb"), + id=id, + server=server, + user_id=user_id, ) def get_mediaitem( - server_type: str, + server: Any, + user_id: str, item: dict[str, Any], generate_guids: bool, generate_locations: bool, @@ -87,14 +89,21 @@ def get_mediaitem( if last_played_date: viewed_date = datetime.fromisoformat(last_played_date.replace("Z", "+00:00")) + last_updated_at = datetime.today() + if item.get("DateLastSaved"): + last_updated_at = datetime.fromisoformat( + item.get("DateLastSaved").replace("Z", "+00:00") + ) + return MediaItem( identifiers=extract_identifiers_from_item( - server_type, item, generate_guids, generate_locations + server, user_id, item, generate_guids, generate_locations ), status=WatchedStatus( completed=user_data.get("Played"), time=floor(user_data.get("PlaybackPositionTicks", 0) / 10000), viewed_date=viewed_date, + last_updated_at=last_updated_at, ), ) @@ -109,801 +118,142 @@ class JellyfinEmby: headers: dict[str, str], ) -> None: self.env = env + self.server_type = server_type + self.base_url = base_url + self.token = token + self.headers = headers + self.timeout = int(get_env_value(self.env, "REQUEST_TIMEOUT", 300)) - if server_type not in ["Jellyfin", "Emby"]: - raise Exception(f"Server type {server_type} not supported") - self.server_type: str = server_type - self.base_url: str = base_url - self.token: str = token - self.headers: dict[str, str] = headers - self.timeout: int = int(get_env_value(self.env, "REQUEST_TIMEOUT", 300)) - - if not self.base_url: - raise Exception(f"{self.server_type} base_url not set") - - if not self.token: - raise Exception(f"{self.server_type} token not set") + if not self.base_url or not self.token: + raise Exception(f"{self.server_type} credentials not set") self.session = requests.Session() - self.users: dict[str, str] = self.get_users() - self.server_name: str = self.info(name_only=True) - self.server_version: Version = self.info(version_only=True) - self.update_partial: bool = self.is_partial_update_supported( - self.server_version - ) - self.generate_guids: bool = str_to_bool( - get_env_value(self.env, "GENERATE_GUIDS", "True") - ) - self.generate_locations: bool = str_to_bool( - get_env_value(self.env, "GENERATE_LOCATIONS", "True") - ) + self.users = self.get_users() + self.server_name = self.info(name_only=True) + self.server_version = self.info(version_only=True) + self.update_partial = self.is_partial_update_supported(self.server_version) + self.generate_guids = str_to_bool(get_env_value(self.env, "GENERATE_GUIDS", "True")) + self.generate_locations = str_to_bool(get_env_value(self.env, "GENERATE_LOCATIONS", "True")) - def query( - self, - query: str, - query_type: Literal["get", "post"], - identifiers: dict[str, str] | None = None, - json: dict[str, float] | None = None, - ) -> list[dict[str, Any]] | dict[str, Any] | None: + def query(self, query: str, query_type: Literal["get", "post", "delete"], json: dict | None = None) -> Any: try: - results = None - if query_type == "get": - response = self.session.get( - self.base_url + query, headers=self.headers, timeout=self.timeout - ) - if response.status_code not in [200, 204]: - raise Exception( - f"Query failed with status {response.status_code} {response.reason}" - ) - if response.status_code == 204: - results = None - else: - results = response.json() - + response = self.session.get(self.base_url + query, headers=self.headers, timeout=self.timeout) elif query_type == "post": - response = self.session.post( - self.base_url + query, - headers=self.headers, - json=json, - timeout=self.timeout, - ) - if response.status_code not in [200, 204]: - raise Exception( - f"Query failed with status {response.status_code} {response.reason}" - ) - if response.status_code == 204: - results = None - else: - results = response.json() - - if results: - if not isinstance(results, list) and not isinstance(results, dict): - raise Exception("Query result is not of type list or dict") - - # append identifiers to results - if identifiers and isinstance(results, dict): - results["Identifiers"] = identifiers - - return results - - except Exception as e: - logger.error( - f"{self.server_type}: Query {query_type} {query}\nResults {results}\n{e}", - ) - raise Exception(e) - - def info( - self, name_only: bool = False, version_only: bool = False - ) -> str | Version | None: - try: - query_string = "/System/Info/Public" - - response = self.query(query_string, "get") - - if response and isinstance(response, dict): - if name_only: - return response.get("ServerName") - elif version_only: - return parse(response.get("Version", "")) - - return f"{self.server_type} {response.get('ServerName')}: {response.get('Version')}" + response = self.session.post(self.base_url + query, headers=self.headers, json=json, timeout=self.timeout) + elif query_type == "delete": + response = self.session.delete(self.base_url + query, headers=self.headers, timeout=self.timeout) else: - return None + raise ValueError(f"Unsupported query type: {query_type}") - except Exception as e: - logger.error(f"{self.server_type}: Get server name failed {e}") - raise Exception(e) + response.raise_for_status() + + if response.status_code == 204: + return None + return response.json() + + except requests.exceptions.RequestException as e: + logger.error(f"{self.server_type}: Query {query_type} {query} failed: {e}") + raise + + def info(self, name_only: bool = False, version_only: bool = False) -> Any: + response = self.query("/System/Info/Public", "get") + if not response: + return None + if name_only: + return response.get("ServerName") + if version_only: + return parse(response.get("Version", "")) + return f"{self.server_type} {response.get('ServerName')}: {response.get('Version')}" def get_users(self) -> dict[str, str]: - try: - users: dict[str, str] = {} - - query_string = "/Users" - response = self.query(query_string, "get") - - if response and isinstance(response, list): - for user in response: - users[user["Name"]] = user["Id"] - - return users - except Exception as e: - logger.error(f"{self.server_type}: Get users failed {e}") - raise Exception(e) + response = self.query("/Users", "get") + return {user["Name"]: user["Id"] for user in response} if response else {} def get_libraries(self) -> dict[str, str]: - try: - libraries: dict[str, str] = {} - - # Theres no way to get all libraries so individually get list of libraries from all users - users = self.get_users() - - for user_name, user_id in users.items(): - user_libraries = self.query(f"/Users/{user_id}/Views", "get") - - if not user_libraries or not isinstance(user_libraries, dict): - logger.error( - f"{self.server_type}: Failed to get libraries for {user_name}" - ) - return libraries - - logger.debug( - f"{self.server_type}: All Libraries for {user_name} {[library.get('Name') for library in user_libraries.get('Items', [])]}" - ) - - for library in user_libraries.get("Items", []): - library_title = library.get("Name") - library_type = library.get("CollectionType") - - # If collection type is not set, fallback based on media files - if not library_type: - library_id = library.get("Id") - # Get first 100 items in library - library_items = self.query( - f"/Users/{user_id}/Items" - + f"?ParentId={library_id}&Recursive=True&excludeItemTypes=Folder&limit=100", - "get", - ) - - if not library_items or not isinstance(library_items, dict): - logger.debug( - f"{self.server_type}: Failed to get library items for {user_name} {library_title}" - ) - continue - - all_types = set( - [x.get("Type") for x in library_items.get("Items", [])] - ) - types = set([x for x in all_types if x in ["Movie", "Episode"]]) - - if not len(types) == 1: - logger.debug( - f"{self.server_type}: Skipping Library {library_title} didn't find just a single type, found {all_types}", - ) - continue - - library_type = types.pop() - - library_type = ( - "movies" if library_type == "Movie" else "tvshows" - ) - - if library_type not in ["movies", "tvshows"]: - logger.debug( - f"{self.server_type}: Skipping Library {library_title} found type {library_type}", - ) - continue - - libraries[library_title] = library_type - - return libraries - except Exception as e: - logger.error(f"{self.server_type}: Get libraries failed {e}") - raise Exception(e) - - def get_user_library_watched( - self, - user_name: str, - user_id: str, - library_type: Literal["movies", "tvshows"], - library_id: str, - library_title: str, - ) -> LibraryData: - user_name = user_name.lower() - try: - logger.info( - f"{self.server_type}: Generating watched for {user_name} in library {library_title}", - ) - watched = LibraryData(title=library_title) - - # Movies - if library_type == "movies": - movie_items = [] - watched_items = self.query( - f"/Users/{user_id}/Items" - + f"?ParentId={library_id}&Filters=IsPlayed&IncludeItemTypes=Movie&Recursive=True&Fields=ItemCounts,ProviderIds,MediaSources,UserDataLastPlayedDate", - "get", - ) - - if watched_items and isinstance(watched_items, dict): - movie_items += watched_items.get("Items", []) - - in_progress_items = self.query( - f"/Users/{user_id}/Items" - + f"?ParentId={library_id}&Filters=IsResumable&IncludeItemTypes=Movie&Recursive=True&Fields=ItemCounts,ProviderIds,MediaSources,UserDataLastPlayedDate", - "get", - ) - - if in_progress_items and isinstance(in_progress_items, dict): - movie_items += in_progress_items.get("Items", []) - - for movie in movie_items: - # Skip if theres no user data which means the movie has not been watched - if not movie.get("UserData"): - continue - - # Skip if theres no media tied to the movie - if not movie.get("MediaSources"): - continue - - # Skip if not watched or watched less than a minute - if ( - movie["UserData"].get("Played") - or movie["UserData"].get("PlaybackPositionTicks", 0) > 600000000 - ): - watched.movies.append( - get_mediaitem( - self.server_type, - movie, - self.generate_guids, - self.generate_locations, - ) - ) - - # TV Shows - if library_type == "tvshows": - # Retrieve a list of watched TV shows - all_shows = self.query( - f"/Users/{user_id}/Items" - + f"?ParentId={library_id}&isPlaceHolder=false&IncludeItemTypes=Series&Recursive=True&Fields=ProviderIds,Path,RecursiveItemCount", - "get", - ) - - if not all_shows or not isinstance(all_shows, dict): - logger.debug( - f"{self.server_type}: Failed to get shows for {user_name} in {library_title}" - ) - return watched - - # Filter the list of shows to only include those that have been partially or fully watched - watched_shows_filtered = [] - for show in all_shows.get("Items", []): - if not show.get("UserData"): - continue - - played_percentage = show["UserData"].get("PlayedPercentage") - if played_percentage is None: - # Emby no longer shows PlayedPercentage - total_episodes = show.get("RecursiveItemCount") - unplayed_episodes = show["UserData"].get("UnplayedItemCount") - - if total_episodes is None: - # Failed to get total count of episodes - continue - - if ( - unplayed_episodes is not None - and unplayed_episodes < total_episodes - ): - watched_shows_filtered.append(show) - else: - if played_percentage > 0: - watched_shows_filtered.append(show) - - # Retrieve the watched/partially watched list of episodes of each watched show - for show in watched_shows_filtered: - show_name = show.get("Name") - show_guids = { - k.lower(): v for k, v in show.get("ProviderIds", {}).items() - } - show_locations = ( - tuple([filename_from_any_path(show["Path"])]) - if show.get("Path") - else tuple() - ) - - show_episodes = self.query( - f"/Shows/{show.get('Id')}/Episodes" - + f"?userId={user_id}&isPlaceHolder=false&Fields=ProviderIds,MediaSources,UserDataLastPlayedDate", - "get", - ) - - if not show_episodes or not isinstance(show_episodes, dict): - logger.debug( - f"{self.server_type}: Failed to get episodes for {user_name} {library_title} {show_name}" - ) - continue - - # Iterate through the episodes - # Create a list to store the episodes - episode_mediaitem = [] - for episode in show_episodes.get("Items", []): - if not episode.get("UserData"): - continue - - if not episode.get("MediaSources"): - continue - - # If watched or watched more than a minute - if ( - episode["UserData"].get("Played") - or episode["UserData"].get("PlaybackPositionTicks", 0) - > 600000000 - ): - episode_mediaitem.append( - get_mediaitem( - self.server_type, - episode, - self.generate_guids, - self.generate_locations, - ) - ) - - if episode_mediaitem: - watched.series.append( - Series( - identifiers=MediaIdentifiers( - title=show.get("Name"), - locations=show_locations, - imdb_id=show_guids.get("imdb"), - tvdb_id=show_guids.get("tvdb"), - tmdb_id=show_guids.get("tmdb"), - ), - episodes=episode_mediaitem, - ) - ) - - logger.info( - f"{self.server_type}: Finished getting watched for {user_name} in library {library_title}", - ) - - return watched - except Exception as e: - logger.error( - f"{self.server_type}: Failed to get watched for {user_name} in library {library_title}, Error: {e}", - ) - - logger.error(traceback.format_exc()) - return LibraryData(title=library_title) - - def get_watched( - self, - users: dict[str, str], - sync_libraries: list[str], - users_watched: dict[str, UserData] = None, - ) -> dict[str, UserData]: - try: - if not users_watched: - users_watched: dict[str, UserData] = {} - - for user_name, user_id in users.items(): - if user_name.lower() not in users_watched: - users_watched[user_name.lower()] = UserData() - - all_libraries = self.query(f"/Users/{user_id}/Views", "get") - if not all_libraries or not isinstance(all_libraries, dict): - logger.debug( - f"{self.server_type}: Failed to get all libraries for {user_name}" - ) - continue - - for library in all_libraries.get("Items", []): - library_id = library.get("Id") - library_title = library.get("Name") - library_type = library.get("CollectionType") - - if not library_id or not library_title or not library_type: - logger.debug( - f"{self.server_type}: Failed to get library data for {user_name} {library_title}" - ) - continue - - if library_title not in sync_libraries: - continue - - if library_title in users_watched: - logger.info( - f"{self.server_type}: {user_name} {library_title} watched history has already been gathered, skipping" - ) - continue - - # Get watched for user - library_data = self.get_user_library_watched( - user_name, - user_id, - library_type, - library_id, - library_title, - ) - - if user_name.lower() not in users_watched: - users_watched[user_name.lower()] = UserData() - - users_watched[user_name.lower()].libraries[library_title] = ( - library_data - ) - - return users_watched - except Exception as e: - logger.error(f"{self.server_type}: Failed to get watched, Error: {e}") - return {} - - def update_user_watched( - self, - user_name: str, - user_id: str, - library_data: LibraryData, - library_name: str, - library_id: str, - dryrun: bool, - ) -> None: - try: - # If there are no movies or shows to update, exit early. - if not library_data.series and not library_data.movies: - return - - logger.info( - f"{self.server_type}: Updating watched for {user_name} in library {library_name}", - ) - - # Update movies. - if library_data.movies: - jellyfin_search = self.query( - f"/Users/{user_id}/Items" - + f"?SortBy=SortName&SortOrder=Ascending&Recursive=True&ParentId={library_id}" - + "&isPlayed=false&Fields=ItemCounts,ProviderIds,MediaSources&IncludeItemTypes=Movie", - "get", - ) - - if not jellyfin_search or not isinstance(jellyfin_search, dict): - logger.debug( - f"{self.server_type}: Failed to get movies for {user_name} {library_name}" - ) - return - - for jellyfin_video in jellyfin_search.get("Items", []): - jelly_identifiers = extract_identifiers_from_item( - self.server_type, - jellyfin_video, - self.generate_guids, - self.generate_locations, - ) - # Check each stored movie for a match. - for stored_movie in library_data.movies: - if check_same_identifiers( - jelly_identifiers, stored_movie.identifiers - ): - jellyfin_video_id = jellyfin_video.get("Id") - - viewed_date: str = ( - stored_movie.status.viewed_date.isoformat( - timespec="milliseconds" - ).replace("+00:00", "Z") - ) - - if stored_movie.status.completed: - msg = f"{self.server_type}: {jellyfin_video.get('Name')} as watched for {user_name} in {library_name}" - if not dryrun: - user_data_payload: dict[ - str, float | bool | datetime - ] = { - "PlayCount": 1, - "Played": True, - "PlaybackPositionTicks": 0, - "LastPlayedDate": viewed_date, - } - self.query( - f"/Users/{user_id}/Items/{jellyfin_video_id}/UserData", - "post", - json=user_data_payload, - ) - - logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}") - log_marked( - self.server_type, - self.server_name, - user_name, - library_name, - jellyfin_video.get("Name"), - mark_file=get_env_value( - self.env, "MARK_FILE", "mark.log" - ), - ) - elif self.update_partial: - msg = f"{self.server_type}: {jellyfin_video.get('Name')} as partially watched for {floor(stored_movie.status.time / 60_000)} minutes for {user_name} in {library_name}" - - if not dryrun: - user_data_payload: dict[ - str, float | bool | datetime - ] = { - "PlayCount": 0, - "Played": False, - "PlaybackPositionTicks": stored_movie.status.time - * 10_000, - "LastPlayedDate": viewed_date, - } - self.query( - f"/Users/{user_id}/Items/{jellyfin_video_id}/UserData", - "post", - json=user_data_payload, - ) - - logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}") - log_marked( - self.server_type, - self.server_name, - user_name, - library_name, - jellyfin_video.get("Name"), - duration=floor(stored_movie.status.time / 60_000), - mark_file=get_env_value( - self.env, "MARK_FILE", "mark.log" - ), - ) - else: - logger.trace( - f"{self.server_type}: Skipping movie {jellyfin_video.get('Name')} as it is not in mark list for {user_name}", - ) - - # Update TV Shows (series/episodes). - if library_data.series: - jellyfin_search = self.query( - f"/Users/{user_id}/Items" - + f"?SortBy=SortName&SortOrder=Ascending&Recursive=True&ParentId={library_id}" - + "&Fields=ItemCounts,ProviderIds,Path&IncludeItemTypes=Series", - "get", - ) - if not jellyfin_search or not isinstance(jellyfin_search, dict): - logger.debug( - f"{self.server_type}: Failed to get shows for {user_name} {library_name}" - ) - return - - jellyfin_shows = [x for x in jellyfin_search.get("Items", [])] - - for jellyfin_show in jellyfin_shows: - jellyfin_show_identifiers = extract_identifiers_from_item( - self.server_type, - jellyfin_show, - self.generate_guids, - self.generate_locations, - ) - # Try to find a matching series in your stored library. - for stored_series in library_data.series: - if check_same_identifiers( - jellyfin_show_identifiers, stored_series.identifiers - ): - logger.trace( - f"Found matching show for '{jellyfin_show.get('Name')}'", - ) - # Now update episodes. - # Get the list of Plex episodes for this show. - jellyfin_show_id = jellyfin_show.get("Id") - jellyfin_episodes = self.query( - f"/Shows/{jellyfin_show_id}/Episodes" - + f"?userId={user_id}&Fields=ItemCounts,ProviderIds,MediaSources", - "get", - ) - - if not jellyfin_episodes or not isinstance( - jellyfin_episodes, dict - ): - logger.debug( - f"{self.server_type}: Failed to get episodes for {user_name} {library_name} {jellyfin_show.get('Name')}" - ) - return - - for jellyfin_episode in jellyfin_episodes.get("Items", []): - jellyfin_episode_identifiers = ( - extract_identifiers_from_item( - self.server_type, - jellyfin_episode, - self.generate_guids, - self.generate_locations, - ) - ) - for stored_ep in stored_series.episodes: - if check_same_identifiers( - jellyfin_episode_identifiers, - stored_ep.identifiers, - ): - jellyfin_episode_id = jellyfin_episode.get("Id") - - viewed_date: str = ( - stored_ep.status.viewed_date.isoformat( - timespec="milliseconds" - ).replace("+00:00", "Z") - ) - - if stored_ep.status.completed: - msg = ( - f"{self.server_type}: {jellyfin_episode.get('SeriesName')} {jellyfin_episode.get('SeasonName')} Episode {jellyfin_episode.get('IndexNumber')} {jellyfin_episode.get('Name')}" - + f" as watched for {user_name} in {library_name}" - ) - if not dryrun: - user_data_payload: dict[ - str, float | bool | datetime - ] = { - "PlayCount": 1, - "Played": True, - "PlaybackPositionTicks": 0, - "LastPlayedDate": viewed_date, - } - self.query( - f"/Users/{user_id}/Items/{jellyfin_episode_id}/UserData", - "post", - json=user_data_payload, - ) - - logger.success( - f"{'[DRYRUN] ' if dryrun else ''}{msg}" - ) - log_marked( - self.server_type, - self.server_name, - user_name, - library_name, - jellyfin_episode.get("SeriesName"), - jellyfin_episode.get("Name"), - mark_file=get_env_value( - self.env, "MARK_FILE", "mark.log" - ), - ) - elif self.update_partial: - msg = ( - f"{self.server_type}: {jellyfin_episode.get('SeriesName')} {jellyfin_episode.get('SeasonName')} Episode {jellyfin_episode.get('IndexNumber')} {jellyfin_episode.get('Name')}" - + f" as partially watched for {floor(stored_ep.status.time / 60_000)} minutes for {user_name} in {library_name}" - ) - - if not dryrun: - user_data_payload: dict[ - str, float | bool | datetime - ] = { - "PlayCount": 0, - "Played": False, - "PlaybackPositionTicks": stored_ep.status.time - * 10_000, - "LastPlayedDate": viewed_date, - } - self.query( - f"/Users/{user_id}/Items/{jellyfin_episode_id}/UserData", - "post", - json=user_data_payload, - ) - - logger.success( - f"{'[DRYRUN] ' if dryrun else ''}{msg}" - ) - log_marked( - self.server_type, - self.server_name, - user_name, - library_name, - jellyfin_episode.get("SeriesName"), - jellyfin_episode.get("Name"), - duration=floor( - stored_ep.status.time / 60_000 - ), - mark_file=get_env_value( - self.env, "MARK_FILE", "mark.log" - ), - ) - else: - logger.trace( - f"{self.server_type}: Skipping episode {jellyfin_episode.get('Name')} as it is not in mark list for {user_name}", - ) - else: - logger.trace( - f"{self.server_type}: Skipping show {jellyfin_show.get('Name')} as it is not in mark list for {user_name}", - ) - - except Exception as e: - logger.error( - f"{self.server_type}: Error updating watched for {user_name} in library {library_name}, {e}", - ) - - def update_watched( - self, - watched_list: dict[str, UserData], - user_mapping: dict[str, str] | None = None, - library_mapping: dict[str, str] | None = None, - dryrun: bool = False, - ) -> None: - for user, user_data in watched_list.items(): - user_other = None - user_name = None - if user_mapping: - if user in user_mapping.keys(): - user_other = user_mapping[user] - elif user in user_mapping.values(): - user_other = search_mapping(user_mapping, user) - - user_id = None - for key in self.users: - if user.lower() == key.lower(): - user_id = self.users[key] - user_name = key - break - elif user_other and user_other.lower() == key.lower(): - user_id = self.users[key] - user_name = key - break - - if not user_id or not user_name: - logger.info(f"{user} {user_other} not found in Jellyfin") + libraries = {} + for user_id in self.users.values(): + views = self.query(f"/Users/{user_id}/Views", "get") + if not views: continue + for lib in views.get("Items", []): + lib_type = lib.get("CollectionType") + if lib_type in ["movies", "tvshows"]: + libraries[lib["Name"]] = lib_type + return libraries - jellyfin_libraries = self.query( - f"/Users/{user_id}/Views", - "get", + def get_user_library_watched(self, user_name: str, user_id: str, library_type: str, library_id: str, library_title: str) -> LibraryData: + logger.info(f"{self.server_type}: Generating watched for {user_name} in library {library_title}") + watched = LibraryData(title=library_title) + + fields = "ItemCounts,ProviderIds,MediaSources,DateLastSaved,UserDataLastPlayedDate" + + if library_type == "movies": + items = [] + for f in ["IsPlayed", "IsResumable"]: + res = self.query(f"/Users/{user_id}/Items?ParentId={library_id}&Filters={f}&IncludeItemTypes=Movie&Recursive=True&Fields={fields}", "get") + if res and res.get("Items"): + items.extend(res["Items"]) + + for item in items: + if item.get("UserData") and (item["UserData"].get("Played") or item["UserData"].get("PlaybackPositionTicks", 0) > 600000000): + watched.movies.append(get_mediaitem(self, user_id, item, self.generate_guids, self.generate_locations)) + + elif library_type == "tvshows": + shows = self.query(f"/Users/{user_id}/Items?ParentId={library_id}&IncludeItemTypes=Series&Recursive=True&Fields=ProviderIds,Path,RecursiveItemCount,DateLastSaved", "get") + if not shows: return watched + + for show in shows.get("Items", []): + episodes = self.query(f"/Shows/{show['Id']}/Episodes?userId={user_id}&Fields={fields}", "get") + if not episodes: continue + + episode_mediaitems = [] + for episode in episodes.get("Items", []): + if episode.get("UserData") and (episode["UserData"].get("Played") or episode["UserData"].get("PlaybackPositionTicks", 0) > 600000000): + episode_mediaitems.append(get_mediaitem(self, user_id, episode, self.generate_guids, self.generate_locations)) + + if episode_mediaitems: + watched.series.append(Series( + identifiers=extract_identifiers_from_item(self, user_id, show, self.generate_guids, self.generate_locations), + episodes=episode_mediaitems + )) + + return watched + + def get_watched(self, users: dict[str, str], sync_libraries: list[str], users_watched: dict[str, UserData] = None) -> dict[str, UserData]: + if not users_watched: users_watched = {} + for user_name, user_id in users.items(): + user_name_lower = user_name.lower() + if user_name_lower not in users_watched: + users_watched[user_name_lower] = UserData() + + views = self.query(f"/Users/{user_id}/Views", "get") + if not views: continue + + for lib in views.get("Items", []): + if lib.get("Name") in sync_libraries: + if lib.get("Name") in users_watched[user_name_lower].libraries: + continue + library_data = self.get_user_library_watched(user_name, user_id, lib["CollectionType"], lib["Id"], lib["Name"]) + users_watched[user_name_lower].libraries[lib["Name"]] = library_data + return users_watched + + def mark_watched(self, user_id: str, item_id: str, viewed_date: str): + payload = {"Played": True, "LastPlayedDate": viewed_date} + self.query(f"/Users/{user_id}/PlayedItems/{item_id}", "post", json=payload) + + def mark_unwatched(self, user_id: str, item_id: str): + self.query(f"/Users/{user_id}/PlayedItems/{item_id}", "delete") + + def update_watched(self, *args, **kwargs): + # This function is now deprecated. + pass + + def is_partial_update_supported(self, server_version: Version) -> bool: + if not server_version >= parse("10.9.0"): + logger.info( + f"{self.server_type}: Server version {server_version} does not support updating playback position.", ) + return False - if not jellyfin_libraries or not isinstance(jellyfin_libraries, dict): - logger.debug( - f"{self.server_type}: Failed to get libraries for {user_name}" - ) - continue - - jellyfin_libraries = [x for x in jellyfin_libraries.get("Items", [])] - - for library_name in user_data.libraries: - library_data = user_data.libraries[library_name] - library_other = None - if library_mapping: - if library_name in library_mapping.keys(): - library_other = library_mapping[library_name] - elif library_name in library_mapping.values(): - library_other = search_mapping(library_mapping, library_name) - - if library_name.lower() not in [ - x["Name"].lower() for x in jellyfin_libraries - ]: - if library_other: - if library_other.lower() in [ - x["Name"].lower() for x in jellyfin_libraries - ]: - logger.info( - f"{self.server_type}: Library {library_name} not found, but {library_other} found, using {library_other}", - ) - library_name = library_other - else: - logger.info( - f"{self.server_type}: Library {library_name} or {library_other} not found in library list", - ) - continue - else: - logger.info( - f"{self.server_type}: Library {library_name} not found in library list", - ) - continue - - library_id = None - for jellyfin_library in jellyfin_libraries: - if jellyfin_library["Name"].lower() == library_name.lower(): - library_id = jellyfin_library["Id"] - continue - - if library_id: - try: - self.update_user_watched( - user_name, - user_id, - library_data, - library_name, - library_id, - dryrun, - ) - except Exception as e: - logger.error( - f"{self.server_type}: Error updating watched for {user_name} in library {library_name}, {e}", - ) + return True diff --git a/src/main.py b/src/main.py index 6c7f214..b93d26f 100644 --- a/src/main.py +++ b/src/main.py @@ -1,323 +1,157 @@ -import os -import traceback -import json -import sys -from dotenv import dotenv_values -from time import sleep, perf_counter -from loguru import logger - -from src.emby import Emby -from src.jellyfin import Jellyfin -from src.plex import Plex -from src.library import setup_libraries -from src.functions import ( - parse_string_to_list, - str_to_bool, - get_env_value, -) -from src.users import setup_users -from src.watched import ( - cleanup_watched, - merge_server_watched, -) -from src.black_white import setup_black_white_lists -from src.connection import generate_server_connections - - -def configure_logger(log_file: str = "log.log", debug_level: str = "INFO") -> None: - # Remove default logger to configure our own - logger.remove() - - # Choose log level based on environment - # If in debug mode with a "debug" level, use DEBUG; otherwise, default to INFO. - - if debug_level not in ["INFO", "DEBUG", "TRACE"]: - logger.add(sys.stdout) - raise Exception( - f"Invalid DEBUG_LEVEL {debug_level}, please choose between INFO, DEBUG, TRACE" - ) - - # Add a sink for file logging and the console. - logger.add(log_file, level=debug_level, mode="w") - logger.add(sys.stdout, level=debug_level) - - -def should_sync_server( - env, - server_1: Plex | Jellyfin | Emby, - server_2: Plex | Jellyfin | Emby, -) -> bool: - sync_from_plex_to_jellyfin = str_to_bool( - get_env_value(env, "SYNC_FROM_PLEX_TO_JELLYFIN", "True") - ) - sync_from_plex_to_plex = str_to_bool( - get_env_value(env, "SYNC_FROM_PLEX_TO_PLEX", "True") - ) - sync_from_plex_to_emby = str_to_bool( - get_env_value(env, "SYNC_FROM_PLEX_TO_EMBY", "True") - ) - - sync_from_jelly_to_plex = str_to_bool( - get_env_value(env, "SYNC_FROM_JELLYFIN_TO_PLEX", "True") - ) - sync_from_jelly_to_jellyfin = str_to_bool( - get_env_value(env, "SYNC_FROM_JELLYFIN_TO_JELLYFIN", "True") - ) - sync_from_jelly_to_emby = str_to_bool( - get_env_value(env, "SYNC_FROM_JELLYFIN_TO_EMBY", "True") - ) - - sync_from_emby_to_plex = str_to_bool( - get_env_value(env, "SYNC_FROM_EMBY_TO_PLEX", "True") - ) - sync_from_emby_to_jellyfin = str_to_bool( - get_env_value(env, "SYNC_FROM_EMBY_TO_JELLYFIN", "True") - ) - sync_from_emby_to_emby = str_to_bool( - get_env_value(env, "SYNC_FROM_EMBY_TO_EMBY", "True") - ) - - if isinstance(server_1, Plex): - if isinstance(server_2, Jellyfin) and not sync_from_plex_to_jellyfin: - logger.info("Sync from plex -> jellyfin is disabled") - return False - - if isinstance(server_2, Emby) and not sync_from_plex_to_emby: - logger.info("Sync from plex -> emby is disabled") - return False - - if isinstance(server_2, Plex) and not sync_from_plex_to_plex: - logger.info("Sync from plex -> plex is disabled") - return False - - if isinstance(server_1, Jellyfin): - if isinstance(server_2, Plex) and not sync_from_jelly_to_plex: - logger.info("Sync from jellyfin -> plex is disabled") - return False - - if isinstance(server_2, Jellyfin) and not sync_from_jelly_to_jellyfin: - logger.info("Sync from jellyfin -> jellyfin is disabled") - return False - - if isinstance(server_2, Emby) and not sync_from_jelly_to_emby: - logger.info("Sync from jellyfin -> emby is disabled") - return False - - if isinstance(server_1, Emby): - if isinstance(server_2, Plex) and not sync_from_emby_to_plex: - logger.info("Sync from emby -> plex is disabled") - return False - - if isinstance(server_2, Jellyfin) and not sync_from_emby_to_jellyfin: - logger.info("Sync from emby -> jellyfin is disabled") - return False - - if isinstance(server_2, Emby) and not sync_from_emby_to_emby: - logger.info("Sync from emby -> emby is disabled") - return False - - return True - - -def main_loop(env) -> None: - dryrun = str_to_bool(get_env_value(env, "DRYRUN", "False")) - logger.info(f"Dryrun: {dryrun}") - - user_mapping_env = get_env_value(env, "USER_MAPPING", None) - user_mapping = None - if user_mapping_env: - user_mapping = json.loads(user_mapping_env.lower()) - logger.info(f"User Mapping: {user_mapping}") - - library_mapping_env = get_env_value(env, "LIBRARY_MAPPING", None) - library_mapping = None - if library_mapping_env: - library_mapping = json.loads(library_mapping_env) - logger.info(f"Library Mapping: {library_mapping}") - - # Create (black/white)lists - logger.info("Creating (black/white)lists") - blacklist_library = parse_string_to_list( - get_env_value(env, "BLACKLIST_LIBRARY", None) - ) - whitelist_library = parse_string_to_list( - get_env_value(env, "WHITELIST_LIBRARY", None) - ) - blacklist_library_type = parse_string_to_list( - get_env_value(env, "BLACKLIST_LIBRARY_TYPE", None) - ) - whitelist_library_type = parse_string_to_list( - get_env_value(env, "WHITELIST_LIBRARY_TYPE", None) - ) - blacklist_users = parse_string_to_list(get_env_value(env, "BLACKLIST_USERS", None)) - whitelist_users = parse_string_to_list(get_env_value(env, "WHITELIST_USERS", None)) - - ( - blacklist_library, - whitelist_library, - blacklist_library_type, - whitelist_library_type, - blacklist_users, - whitelist_users, - ) = setup_black_white_lists( - blacklist_library, - whitelist_library, - blacklist_library_type, - whitelist_library_type, - blacklist_users, - whitelist_users, - library_mapping, - user_mapping, - ) - - # Create server connections - logger.info("Creating server connections") - servers = generate_server_connections(env) - - for server_1 in servers: - # If server is the final server in the list, then we are done with the loop - if server_1 == servers[-1]: - break - - # Store a copy of server_1_watched that way it can be used multiple times without having to regather everyones watch history every single time - server_1_watched = None - - # Start server_2 at the next server in the list - for server_2 in servers[servers.index(server_1) + 1 :]: - # Check if server 1 and server 2 are going to be synced in either direction, skip if not - if not should_sync_server( - env, server_1, server_2 - ) and not should_sync_server(env, server_2, server_1): - continue - - logger.info(f"Server 1: {type(server_1)}: {server_1.info()}") - logger.info(f"Server 2: {type(server_2)}: {server_2.info()}") - - # Create users list - logger.info("Creating users list") - server_1_users, server_2_users = setup_users( - server_1, server_2, blacklist_users, whitelist_users, user_mapping - ) - - server_1_libraries, server_2_libraries = setup_libraries( - server_1, - server_2, - blacklist_library, - blacklist_library_type, - whitelist_library, - whitelist_library_type, - library_mapping, - ) - logger.info(f"Server 1 syncing libraries: {server_1_libraries}") - logger.info(f"Server 2 syncing libraries: {server_2_libraries}") - - logger.info("Creating watched lists", 1) - server_1_watched = server_1.get_watched( - server_1_users, server_1_libraries, server_1_watched - ) - logger.info("Finished creating watched list server 1") - - server_2_watched = server_2.get_watched(server_2_users, server_2_libraries) - logger.info("Finished creating watched list server 2") - - logger.trace(f"Server 1 watched: {server_1_watched}") - logger.trace(f"Server 2 watched: {server_2_watched}") - - logger.info("Cleaning Server 1 Watched", 1) - server_1_watched_filtered = cleanup_watched( - server_1_watched, server_2_watched, user_mapping, library_mapping - ) - - logger.info("Cleaning Server 2 Watched", 1) - server_2_watched_filtered = cleanup_watched( - server_2_watched, server_1_watched, user_mapping, library_mapping - ) - - logger.debug( - f"server 1 watched that needs to be synced to server 2:\n{server_1_watched_filtered}", - ) - logger.debug( - f"server 2 watched that needs to be synced to server 1:\n{server_2_watched_filtered}", - ) - - if should_sync_server(env, server_2, server_1): - logger.info(f"Syncing {server_2.info()} -> {server_1.info()}") - - # Add server_2_watched_filtered to server_1_watched that way the stored version isn't stale for the next server - if not dryrun: - server_1_watched = merge_server_watched( - server_1_watched, - server_2_watched_filtered, - user_mapping, - library_mapping, - ) - - server_1.update_watched( - server_2_watched_filtered, - user_mapping, - library_mapping, - dryrun, - ) - - if should_sync_server(env, server_1, server_2): - logger.info(f"Syncing {server_1.info()} -> {server_2.info()}") - server_2.update_watched( - server_1_watched_filtered, - user_mapping, - library_mapping, - dryrun, - ) - - -@logger.catch -def main() -> None: - # Get environment variables - env_file = get_env_value(None, "ENV_FILE", ".env") - env = dotenv_values(env_file) - - run_only_once = str_to_bool(get_env_value(env, "RUN_ONLY_ONCE", "False")) - sleep_duration = float(get_env_value(env, "SLEEP_DURATION", "3600")) - log_file = get_env_value(env, "LOG_FILE", "log.log") - debug_level = get_env_value(env, "DEBUG_LEVEL", "INFO") - if debug_level: - debug_level = debug_level.upper() - - times: list[float] = [] - while True: - try: - start = perf_counter() - # Reconfigure the logger on each loop so the logs are rotated on each run - configure_logger(log_file, debug_level) - main_loop(env) - end = perf_counter() - times.append(end - start) - - if len(times) > 0: - logger.info(f"Average time: {sum(times) / len(times)}") - - if run_only_once: - break - - logger.info(f"Looping in {sleep_duration}") - sleep(sleep_duration) - - except Exception as error: - if isinstance(error, list): - for message in error: - logger.error(message) - else: - logger.error(error) - - logger.error(traceback.format_exc()) - - if run_only_once: - break - - logger.info(f"Retrying in {sleep_duration}") - sleep(sleep_duration) - - except KeyboardInterrupt: - if len(times) > 0: - logger.info(f"Average time: {sum(times) / len(times)}") - logger.info("Exiting") - os._exit(0) +import os +import traceback +import json +import sys +from dotenv import dotenv_values +from time import sleep, perf_counter +from loguru import logger + +from src.emby import Emby +from src.jellyfin import Jellyfin +from src.plex import Plex +from src.library import setup_libraries +from src.functions import ( + parse_string_to_list, + str_to_bool, + get_env_value, +) +from src.users import setup_users +from src.watched import sync_watched_lists +from src.black_white import setup_black_white_lists +from src.connection import generate_server_connections + + +def configure_logger(log_file: str = "log.log", debug_level: str = "INFO") -> None: + logger.remove() + if debug_level not in ["INFO", "DEBUG", "TRACE"]: + logger.add(sys.stdout) + raise Exception(f"Invalid DEBUG_LEVEL {debug_level}, please choose between INFO, DEBUG, TRACE") + logger.add(log_file, level=debug_level, mode="w") + logger.add(sys.stdout, level=debug_level) + + +def should_sync_server(env, server_1: Plex | Jellyfin | Emby, server_2: Plex | Jellyfin | Emby) -> bool: + sync_map = { + (Plex, Jellyfin): "SYNC_FROM_PLEX_TO_JELLYFIN", + (Plex, Emby): "SYNC_FROM_PLEX_TO_EMBY", + (Plex, Plex): "SYNC_FROM_PLEX_TO_PLEX", + (Jellyfin, Plex): "SYNC_FROM_JELLYFIN_TO_PLEX", + (Jellyfin, Jellyfin): "SYNC_FROM_JELLYFIN_TO_JELLYFIN", + (Jellyfin, Emby): "SYNC_FROM_JELLYFIN_TO_EMBY", + (Emby, Plex): "SYNC_FROM_EMBY_TO_PLEX", + (Emby, Jellyfin): "SYNC_FROM_EMBY_TO_JELLYFIN", + (Emby, Emby): "SYNC_FROM_EMBY_TO_EMBY", + } + key = (type(server_1), type(server_2)) + env_var = sync_map.get(key) + if env_var and not str_to_bool(get_env_value(env, env_var, "True")): + logger.info(f"Sync from {server_1.server_type} -> {server_2.server_type} is disabled") + return False + return True + + +def main_loop(env) -> None: + dryrun = str_to_bool(get_env_value(env, "DRYRUN", "False")) + logger.info(f"Dryrun: {dryrun}") + + user_mapping = json.loads(get_env_value(env, "USER_MAPPING", "{}").lower()) + logger.info(f"User Mapping: {user_mapping}") + + library_mapping = json.loads(get_env_value(env, "LIBRARY_MAPPING", "{}")) + logger.info(f"Library Mapping: {library_mapping}") + + ( + blacklist_library, + whitelist_library, + blacklist_library_type, + whitelist_library_type, + blacklist_users, + whitelist_users, + ) = setup_black_white_lists( + parse_string_to_list(get_env_value(env, "BLACKLIST_LIBRARY", None)), + parse_string_to_list(get_env_value(env, "WHITELIST_LIBRARY", None)), + parse_string_to_list(get_env_value(env, "BLACKLIST_LIBRARY_TYPE", None)), + parse_string_to_list(get_env_value(env, "WHITELIST_LIBRARY_TYPE", None)), + parse_string_to_list(get_env_value(env, "BLACKLIST_USERS", None)), + parse_string_to_list(get_env_value(env, "WHITELIST_USERS", None)), + library_mapping, + user_mapping, + ) + + servers = generate_server_connections(env) + + for i, server_1 in enumerate(servers): + for j in range(i + 1, len(servers)): + server_2 = servers[j] + + if not should_sync_server(env, server_1, server_2) and not should_sync_server(env, server_2, server_1): + continue + + logger.info(f"Comparing Server 1: {server_1.info()} with Server 2: {server_2.info()}") + + server_1_users, server_2_users = setup_users(server_1, server_2, blacklist_users, whitelist_users, user_mapping) + server_1_libraries, server_2_libraries = setup_libraries(server_1, server_2, blacklist_library, blacklist_library_type, whitelist_library, whitelist_library_type, library_mapping) + + logger.info("Gathering watched content from servers...") + server_1_watched = server_1.get_watched(server_1_users, server_1_libraries) + server_2_watched = server_2.get_watched(server_2_users, server_2_libraries) + + logger.info("Comparing watched content and generating sync actions...") + actions = sync_watched_lists(server_1_watched, server_2_watched, user_mapping, library_mapping) + + if not actions: + logger.info("No sync actions needed.") + continue + + logger.info(f"Found {len(actions)} actions to perform.") + for action_type, server, user_id, item_id, viewed_date in actions: + if dryrun: + logger.info(f"[DRYRUN] Would perform {action_type} for item {item_id} for user {user_id} on {server.server_type}") + continue + + try: + if action_type == "mark_watched": + server.mark_watched(user_id, item_id, viewed_date) + logger.success(f"Marked item {item_id} as watched for user {user_id} on {server.server_type}") + elif action_type == "mark_unwatched": + server.mark_unwatched(user_id, item_id) + logger.success(f"Marked item {item_id} as unwatched for user {user_id} on {server.server_type}") + except Exception as e: + logger.error(f"Failed to perform action {action_type} for item {item_id} on {server.server_type}: {e}") + + +@logger.catch +def main() -> None: + env_file = get_env_value(None, "ENV_FILE", ".env") + env = dotenv_values(env_file) + run_only_once = str_to_bool(get_env_value(env, "RUN_ONLY_ONCE", "False")) + sleep_duration = float(get_env_value(env, "SLEEP_DURATION", "3600")) + log_file = get_env_value(env, "LOG_FILE", "log.log") + debug_level = get_env_value(env, "DEBUG_LEVEL", "INFO",).upper() + + times = [] + while True: + try: + start = perf_counter() + configure_logger(log_file, debug_level) + main_loop(env) + end = perf_counter() + times.append(end - start) + if times: + logger.info(f"Average execution time: {sum(times) / len(times):.2f}s") + if run_only_once: + break + logger.info(f"Sleeping for {sleep_duration} seconds.") + sleep(sleep_duration) + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + logger.error(traceback.format_exc()) + if run_only_once: + break + logger.info(f"Retrying in {sleep_duration} seconds.") + sleep(sleep_duration) + except KeyboardInterrupt: + if times: + logger.info(f"Average execution time: {sum(times) / len(times):.2f}s") + logger.info("Exiting.") + os._exit(0) diff --git a/src/plex.py b/src/plex.py index 5947342..382fb80 100644 --- a/src/plex.py +++ b/src/plex.py @@ -1,622 +1,347 @@ -from datetime import datetime, timezone -import requests -from loguru import logger - -from urllib3.poolmanager import PoolManager -from math import floor - -from requests.adapters import HTTPAdapter as RequestsHTTPAdapter - -from plexapi.video import Show, Episode, Movie -from plexapi.server import PlexServer -from plexapi.myplex import MyPlexAccount, MyPlexUser -from plexapi.library import MovieSection, ShowSection - -from src.functions import ( - filename_from_any_path, - search_mapping, - log_marked, - str_to_bool, - get_env_value, -) -from src.watched import ( - LibraryData, - MediaIdentifiers, - MediaItem, - WatchedStatus, - Series, - UserData, - check_same_identifiers, -) - - -# Bypass hostname validation for ssl. Taken from https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186 -class HostNameIgnoringAdapter(RequestsHTTPAdapter): - def init_poolmanager( - self, connections: int, maxsize: int | None, block=..., **pool_kwargs - ) -> None: - self.poolmanager = PoolManager( - num_pools=connections, - maxsize=maxsize, - block=block, - assert_hostname=False, - **pool_kwargs, - ) - - -def extract_guids_from_item( - item: Movie | Show | Episode, generate_guids: bool -) -> dict[str, str]: - # If GENERATE_GUIDS is set to False, then return an empty dict - if not generate_guids: - return {} - - guids: dict[str, str] = dict( - guid.id.split("://") - for guid in item.guids - if guid.id and len(guid.id.strip()) > 0 - ) - - return guids - - -def extract_identifiers_from_item( - item: Movie | Show | Episode, - generate_guids: bool, - generate_locations: bool, -) -> MediaIdentifiers: - guids = extract_guids_from_item(item, generate_guids) - locations = ( - tuple([filename_from_any_path(loc) for loc in item.locations]) - if generate_locations - else tuple() - ) - - if generate_guids: - if not guids: - logger.debug( - f"Plex: {item.title} has no guids{f', locations: {" ".join(item.locations)}' if generate_locations else ''}", - ) - - if generate_locations: - if not locations: - logger.debug( - f"Plex: {item.title} has no locations{f', guids: {guids}' if generate_guids else ''}", - ) - - return MediaIdentifiers( - title=item.title, - locations=locations, - imdb_id=guids.get("imdb"), - tvdb_id=guids.get("tvdb"), - tmdb_id=guids.get("tmdb"), - ) - - -def get_mediaitem( - item: Movie | Episode, - completed: bool, - generate_guids: bool = True, - generate_locations: bool = True, -) -> MediaItem: - last_viewed_at = item.lastViewedAt - viewed_date = datetime.today() - - if last_viewed_at: - viewed_date = last_viewed_at.replace(tzinfo=timezone.utc) - - return MediaItem( - identifiers=extract_identifiers_from_item( - item, generate_guids, generate_locations - ), - status=WatchedStatus( - completed=completed, time=item.viewOffset, viewed_date=viewed_date - ), - ) - - -# class plex accept base url and token and username and password but default with none -class Plex: - def __init__( - self, - env, - base_url: str | None = None, - token: str | None = None, - user_name: str | None = None, - password: str | None = None, - server_name: str | None = None, - ssl_bypass: bool = False, - session: requests.Session | None = None, - ) -> None: - self.env = env - - self.server_type: str = "Plex" - self.ssl_bypass: bool = ssl_bypass - if ssl_bypass: - # Session for ssl bypass - session = requests.Session() - # By pass ssl hostname check https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186 - session.mount("https://", HostNameIgnoringAdapter()) - self.session = session - self.plex: PlexServer = self.login( - base_url, token, user_name, password, server_name - ) - - self.base_url: str = self.plex._baseurl - - self.admin_user: MyPlexAccount = self.plex.myPlexAccount() - self.users: list[MyPlexUser | MyPlexAccount] = self.get_users() - self.generate_guids: bool = str_to_bool( - get_env_value(self.env, "GENERATE_GUIDS", "True") - ) - self.generate_locations: bool = str_to_bool( - get_env_value(self.env, "GENERATE_LOCATIONS", "True") - ) - - def login( - self, - base_url: str | None, - token: str | None, - user_name: str | None, - password: str | None, - server_name: str | None, - ) -> PlexServer: - try: - if base_url and token: - plex: PlexServer = PlexServer(base_url, token, session=self.session) - elif user_name and password and server_name: - # Login via plex account - account = MyPlexAccount(user_name, password) - plex = account.resource(server_name).connect() - else: - raise Exception("No complete plex credentials provided") - - return plex - except Exception as e: - if user_name: - msg = f"Failed to login via plex account {user_name}" - logger.error(f"Plex: Failed to login, {msg}, Error: {e}") - else: - logger.error(f"Plex: Failed to login, Error: {e}") - raise Exception(e) - - def info(self) -> str: - return f"Plex {self.plex.friendlyName}: {self.plex.version}" - - def get_users(self) -> list[MyPlexUser | MyPlexAccount]: - try: - users: list[MyPlexUser | MyPlexAccount] = self.plex.myPlexAccount().users() - - # append self to users - users.append(self.plex.myPlexAccount()) - - return users - except Exception as e: - logger.error(f"Plex: Failed to get users, Error: {e}") - raise Exception(e) - - def get_libraries(self) -> dict[str, str]: - try: - output = {} - - libraries = self.plex.library.sections() - logger.debug( - f"Plex: All Libraries {[library.title for library in libraries]}" - ) - - for library in libraries: - library_title = library.title - library_type = library.type - - if library_type not in ["movie", "show"]: - logger.debug( - f"Plex: Skipping Library {library_title} found type {library_type}", - ) - continue - - output[library_title] = library_type - - return output - except Exception as e: - logger.error(f"Plex: Failed to get libraries, Error: {e}") - raise Exception(e) - - def get_user_library_watched( - self, user_name: str, user_plex: PlexServer, library: MovieSection | ShowSection - ) -> LibraryData: - try: - logger.info( - f"Plex: Generating watched for {user_name} in library {library.title}", - ) - watched = LibraryData(title=library.title) - - library_videos = user_plex.library.section(library.title) - - if library.type == "movie": - for video in library_videos.search( - unwatched=False - ) + library_videos.search(inProgress=True): - if video.isWatched or video.viewOffset >= 60000: - watched.movies.append( - get_mediaitem( - video, - video.isWatched, - self.generate_guids, - self.generate_locations, - ) - ) - - elif library.type == "show": - # Keep track of processed shows to reduce duplicate shows - processed_shows = [] - for show in library_videos.search( - unwatched=False - ) + library_videos.search(inProgress=True): - if show.key in processed_shows: - continue - processed_shows.append(show.key) - show_guids = extract_guids_from_item(show, self.generate_guids) - episode_mediaitem = [] - - # Fetch watched or partially watched episodes - for episode in show.watched() + show.episodes( - viewOffset__gte=60_000 - ): - episode_mediaitem.append( - get_mediaitem( - episode, - episode.isWatched, - self.generate_guids, - self.generate_locations, - ) - ) - - if episode_mediaitem: - watched.series.append( - Series( - identifiers=MediaIdentifiers( - title=show.title, - locations=( - tuple( - [ - filename_from_any_path(location) - for location in show.locations - ] - ) - if self.generate_locations - else tuple() - ), - imdb_id=show_guids.get("imdb"), - tvdb_id=show_guids.get("tvdb"), - tmdb_id=show_guids.get("tmdb"), - ), - episodes=episode_mediaitem, - ) - ) - - return watched - - except Exception as e: - logger.error( - f"Plex: Failed to get watched for {user_name} in library {library.title}, Error: {e}", - ) - return LibraryData(title=library.title) - - def get_watched( - self, - users: list[MyPlexUser | MyPlexAccount], - sync_libraries: list[str], - users_watched: dict[str, UserData] = None, - ) -> dict[str, UserData]: - try: - if not users_watched: - users_watched: dict[str, UserData] = {} - - for user in users: - if self.admin_user == user: - user_plex = self.plex - else: - token = user.get_token(self.plex.machineIdentifier) - if token: - user_plex = self.login(self.base_url, token, None, None, None) - else: - logger.error( - f"Plex: Failed to get token for {user.title}, skipping", - ) - continue - - user_name: str = ( - user.username.lower() if user.username else user.title.lower() - ) - - libraries = user_plex.library.sections() - - for library in libraries: - if library.title not in sync_libraries: - continue - - if user_name not in users_watched: - users_watched[user_name] = UserData() - - if library.title in users_watched[user_name].libraries: - logger.info( - f"Plex: {user_name} {library.title} watched history has already been gathered, skipping" - ) - continue - - library_data = self.get_user_library_watched( - user_name, user_plex, library - ) - - users_watched[user_name].libraries[library.title] = library_data - - return users_watched - except Exception as e: - logger.error(f"Plex: Failed to get users watched, Error: {e}") - return {} - - def update_user_watched( - self, - user: MyPlexAccount, - user_plex: PlexServer, - library_data: LibraryData, - library_name: str, - dryrun: bool, - ) -> None: - # If there are no movies or shows to update, exit early. - if not library_data.series and not library_data.movies: - return - - logger.info( - f"Plex: Updating watched for {user.title} in library {library_name}" - ) - library_section = user_plex.library.section(library_name) - if not library_section: - logger.error( - f"Plex: Library {library_name} not found for {user.title}, skipping", - ) - return - - # Update movies. - if library_data.movies: - # Search for Plex movies that are currently marked as unwatched. - for plex_movie in library_section.search(unwatched=True): - plex_identifiers = extract_identifiers_from_item( - plex_movie, self.generate_guids, self.generate_locations - ) - # Check each stored movie for a match. - for stored_movie in library_data.movies: - if check_same_identifiers( - plex_identifiers, stored_movie.identifiers - ): - # If the stored movie is marked as watched (or has enough progress), - # update the Plex movie accordingly. - if stored_movie.status.completed: - msg = f"Plex: {plex_movie.title} as watched for {user.title} in {library_name}" - if not dryrun: - try: - plex_movie.markWatched() - except Exception as e: - logger.error( - f"Plex: Failed to mark {plex_movie.title} as watched, Error: {e}" - ) - continue - - logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}") - log_marked( - "Plex", - user_plex.friendlyName, - user.title, - library_name, - plex_movie.title, - None, - None, - mark_file=get_env_value( - self.env, "MARK_FILE", "mark.log" - ), - ) - else: - msg = f"Plex: {plex_movie.title} as partially watched for {floor(stored_movie.status.time / 60_000)} minutes for {user.title} in {library_name}" - if not dryrun: - try: - plex_movie.updateTimeline(stored_movie.status.time) - except Exception as e: - logger.error( - f"Plex: Failed to update {plex_movie.title} timeline, Error: {e}" - ) - continue - - logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}") - log_marked( - "Plex", - user_plex.friendlyName, - user.title, - library_name, - plex_movie.title, - duration=stored_movie.status.time, - mark_file=get_env_value( - self.env, "MARK_FILE", "mark.log" - ), - ) - # Once matched, no need to check further. - break - - # Update TV Shows (series/episodes). - if library_data.series: - # For each Plex show in the library section: - plex_shows = library_section.search(unwatched=True) - for plex_show in plex_shows: - # Extract identifiers from the Plex show. - plex_show_identifiers = extract_identifiers_from_item( - plex_show, self.generate_guids, self.generate_locations - ) - # Try to find a matching series in your stored library. - for stored_series in library_data.series: - if check_same_identifiers( - plex_show_identifiers, stored_series.identifiers - ): - logger.trace(f"Found matching show for '{plex_show.title}'") - # Now update episodes. - # Get the list of Plex episodes for this show. - plex_episodes = plex_show.episodes() - for plex_episode in plex_episodes: - plex_episode_identifiers = extract_identifiers_from_item( - plex_episode, - self.generate_guids, - self.generate_locations, - ) - for stored_ep in stored_series.episodes: - if check_same_identifiers( - plex_episode_identifiers, stored_ep.identifiers - ): - if stored_ep.status.completed: - msg = f"Plex: {plex_show.title} {plex_episode.title} as watched for {user.title} in {library_name}" - if not dryrun: - try: - plex_episode.markWatched() - except Exception as e: - logger.error( - f"Plex: Failed to mark {plex_show.title} {plex_episode.title} as watched, Error: {e}" - ) - continue - - logger.success( - f"{'[DRYRUN] ' if dryrun else ''}{msg}" - ) - log_marked( - "Plex", - user_plex.friendlyName, - user.title, - library_name, - plex_show.title, - plex_episode.title, - mark_file=get_env_value( - self.env, "MARK_FILE", "mark.log" - ), - ) - else: - msg = f"Plex: {plex_show.title} {plex_episode.title} as partially watched for {floor(stored_ep.status.time / 60_000)} minutes for {user.title} in {library_name}" - if not dryrun: - try: - plex_episode.updateTimeline( - stored_ep.status.time - ) - except Exception as e: - logger.error( - f"Plex: Failed to update {plex_show.title} {plex_episode.title} timeline, Error: {e}" - ) - continue - - logger.success( - f"{'[DRYRUN] ' if dryrun else ''}{msg}" - ) - log_marked( - "Plex", - user_plex.friendlyName, - user.title, - library_name, - plex_show.title, - plex_episode.title, - stored_ep.status.time, - mark_file=get_env_value( - self.env, "MARK_FILE", "mark.log" - ), - ) - break # Found a matching episode. - break # Found a matching show. - - def update_watched( - self, - watched_list: dict[str, UserData], - user_mapping: dict[str, str] | None = None, - library_mapping: dict[str, str] | None = None, - dryrun: bool = False, - ) -> None: - for user, user_data in watched_list.items(): - user_other = None - # If type of user is dict - if user_mapping: - user_other = search_mapping(user_mapping, user) - - for index, value in enumerate(self.users): - username_title = ( - value.username.lower() if value.username else value.title.lower() - ) - - if user.lower() == username_title: - user = self.users[index] - break - elif user_other and user_other.lower() == username_title: - user = self.users[index] - break - - if self.admin_user == user: - user_plex = self.plex - else: - if isinstance(user, str): - logger.debug( - f"Plex: {user} is not a plex object, attempting to get object for user", - ) - user = self.plex.myPlexAccount().user(user) - - if not isinstance(user, MyPlexUser): - logger.error(f"Plex: {user} failed to get PlexUser") - continue - - token = user.get_token(self.plex.machineIdentifier) - if token: - user_plex = PlexServer( - self.base_url, - token, - session=self.session, - ) - else: - logger.error( - f"Plex: Failed to get token for {user.title}, skipping", - ) - continue - - if not user_plex: - logger.error(f"Plex: {user} Failed to get PlexServer") - continue - - for library_name in user_data.libraries: - library_data = user_data.libraries[library_name] - library_other = None - if library_mapping: - library_other = search_mapping(library_mapping, library_name) - # if library in plex library list - library_list = user_plex.library.sections() - if library_name.lower() not in [x.title.lower() for x in library_list]: - if library_other: - if library_other.lower() in [ - x.title.lower() for x in library_list - ]: - logger.info( - f"Plex: Library {library_name} not found, but {library_other} found, using {library_other}", - ) - library_name = library_other - else: - logger.info( - f"Plex: Library {library_name} or {library_other} not found in library list", - ) - continue - else: - logger.info( - f"Plex: Library {library_name} not found in library list", - ) - continue - - try: - self.update_user_watched( - user, - user_plex, - library_data, - library_name, - dryrun, - ) - except Exception as e: - logger.error( - f"Plex: Failed to update watched for {user.title} in {library_name}, Error: {e}", - ) - continue +from datetime import datetime, timezone +import requests +from loguru import logger +from typing import Any + +from urllib3.poolmanager import PoolManager +from math import floor + +from requests.adapters import HTTPAdapter as RequestsHTTPAdapter + +from plexapi.video import Show, Episode, Movie +from plexapi.server import PlexServer +from plexapi.myplex import MyPlexAccount, MyPlexUser +from plexapi.library import MovieSection, ShowSection + +from src.functions import ( + filename_from_any_path, + search_mapping, + log_marked, + str_to_bool, + get_env_value, +) +from src.watched import ( + LibraryData, + MediaIdentifiers, + MediaItem, + WatchedStatus, + Series, + UserData, + check_same_identifiers, +) + + +# Bypass hostname validation for ssl. Taken from https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186 +class HostNameIgnoringAdapter(RequestsHTTPAdapter): + def init_poolmanager( + self, connections: int, maxsize: int | None, block=..., **pool_kwargs + ) -> None: + self.poolmanager = PoolManager( + num_pools=connections, + maxsize=maxsize, + block=block, + assert_hostname=False, + **pool_kwargs, + ) + + +def extract_guids_from_item( + item: Movie | Show | Episode, generate_guids: bool +) -> dict[str, str]: + # If GENERATE_GUIDS is set to False, then return an empty dict + if not generate_guids: + return {} + + guids: dict[str, str] = dict( + guid.id.split("://") + for guid in item.guids + if guid.id and len(guid.id.strip()) > 0 + ) + + return guids + + +def extract_identifiers_from_item( + server: Any, + user_id: str, + item: Movie | Show | Episode, + generate_guids: bool, + generate_locations: bool, +) -> MediaIdentifiers: + guids = extract_guids_from_item(item, generate_guids) + locations = ( + tuple([filename_from_any_path(loc) for loc in item.locations]) + if generate_locations + else tuple() + ) + + if generate_guids: + if not guids: + logger.debug( + f"Plex: {item.title} has no guids{f', locations: {" ".join(item.locations)}' if generate_locations else ''}", + ) + + if generate_locations: + if not locations: + logger.debug( + f"Plex: {item.title} has no locations{f', guids: {guids}' if generate_guids else ''}", + ) + + return MediaIdentifiers( + title=item.title, + locations=locations, + imdb_id=guids.get("imdb"), + tvdb_id=guids.get("tvdb"), + tmdb_id=guids.get("tmdb"), + id=item.ratingKey, + server=server, + user_id=user_id, + ) + + +def get_mediaitem( + server: Any, + user_id: str, + item: Movie | Episode, + completed: bool, + generate_guids: bool = True, + generate_locations: bool = True, +) -> MediaItem: + last_viewed_at = item.lastViewedAt + viewed_date = datetime.today() + + if last_viewed_at: + viewed_date = last_viewed_at.replace(tzinfo=timezone.utc) + + # updatedAt is a datetime object + last_updated_at = item.updatedAt.replace(tzinfo=timezone.utc) + + return MediaItem( + identifiers=extract_identifiers_from_item( + server, user_id, item, generate_guids, generate_locations + ), + status=WatchedStatus( + completed=completed, + time=item.viewOffset, + viewed_date=viewed_date, + last_updated_at=last_updated_at, + ), + ) + + +# class plex accept base url and token and username and password but default with none +class Plex: + def __init__( + self, + env, + base_url: str | None = None, + token: str | None = None, + user_name: str | None = None, + password: str | None = None, + server_name: str | None = None, + ssl_bypass: bool = False, + session: requests.Session | None = None, + ) -> None: + self.env = env + + self.server_type: str = "Plex" + self.ssl_bypass: bool = ssl_bypass + if ssl_bypass: + # Session for ssl bypass + session = requests.Session() + # By pass ssl hostname check https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186 + session.mount("https://", HostNameIgnoringAdapter()) + self.session = session + self.plex: PlexServer = self.login( + base_url, token, user_name, password, server_name + ) + + self.base_url: str = self.plex._baseurl + + self.admin_user: MyPlexAccount = self.plex.myPlexAccount() + self.users: list[MyPlexUser | MyPlexAccount] = self.get_users() + self.generate_guids: bool = str_to_bool( + get_env_value(self.env, "GENERATE_GUIDS", "True") + ) + self.generate_locations: bool = str_to_bool( + get_env_value(self.env, "GENERATE_LOCATIONS", "True") + ) + + def login( + self, + base_url: str | None, + token: str | None, + user_name: str | None, + password: str | None, + server_name: str | None, + ) -> PlexServer: + try: + if base_url and token: + plex: PlexServer = PlexServer(base_url, token, session=self.session) + elif user_name and password and server_name: + # Login via plex account + account = MyPlexAccount(user_name, password) + plex = account.resource(server_name).connect() + else: + raise Exception("No complete plex credentials provided") + + return plex + except Exception as e: + if user_name: + msg = f"Failed to login via plex account {user_name}" + logger.error(f"Plex: Failed to login, {msg}, Error: {e}") + else: + logger.error(f"Plex: Failed to login, Error: {e}") + raise Exception(e) + + def info(self) -> str: + return f"Plex {self.plex.friendlyName}: {self.plex.version}" + + def get_users(self) -> list[MyPlexUser | MyPlexAccount]: + try: + users: list[MyPlexUser | MyPlexAccount] = self.plex.myPlexAccount().users() + users.append(self.plex.myPlexAccount()) + return users + except Exception as e: + logger.error(f"Plex: Failed to get users, Error: {e}") + raise Exception(e) + + def get_libraries(self) -> dict[str, str]: + try: + output = {} + libraries = self.plex.library.sections() + logger.debug( + f"Plex: All Libraries {[library.title for library in libraries]}" + ) + for library in libraries: + if library.type in ["movie", "show"]: + output[library.title] = library.type + else: + logger.debug( + f"Plex: Skipping Library {library.title} found type {library.type}", + ) + return output + except Exception as e: + logger.error(f"Plex: Failed to get libraries, Error: {e}") + raise Exception(e) + + def get_user_library_watched( + self, user_id: str, user_plex: PlexServer, library: MovieSection | ShowSection + ) -> LibraryData: + try: + logger.info( + f"Plex: Generating watched for {user_id} in library {library.title}", + ) + watched = LibraryData(title=library.title) + library_videos = user_plex.library.section(library.title) + + if library.type == "movie": + for video in library_videos.search(unwatched=False) + library_videos.search(inProgress=True): + if video.isWatched or video.viewOffset >= 60000: + watched.movies.append( + get_mediaitem( + self, user_id, video, video.isWatched, self.generate_guids, self.generate_locations + ) + ) + elif library.type == "show": + processed_shows = [] + for show in library_videos.search(unwatched=False) + library_videos.search(inProgress=True): + if show.key in processed_shows: + continue + processed_shows.append(show.key) + show_guids = extract_guids_from_item(show, self.generate_guids) + episode_mediaitem = [] + for episode in show.watched() + show.episodes(viewOffset__gte=60_000): + episode_mediaitem.append( + get_mediaitem( + self, user_id, episode, episode.isWatched, self.generate_guids, self.generate_locations + ) + ) + if episode_mediaitem: + watched.series.append( + Series( + identifiers=extract_identifiers_from_item(self, user_id, show, self.generate_guids, self.generate_locations), + episodes=episode_mediaitem, + ) + ) + return watched + except Exception as e: + logger.error( + f"Plex: Failed to get watched for {user_id} in library {library.title}, Error: {e}", + ) + return LibraryData(title=library.title) + + def get_watched( + self, + users: list[MyPlexUser | MyPlexAccount], + sync_libraries: list[str], + users_watched: dict[str, UserData] = None, + ) -> dict[str, UserData]: + try: + if not users_watched: + users_watched = {} + + for user in users: + user_plex = self.plex if self.admin_user == user else self.login(self.base_url, user.get_token(self.plex.machineIdentifier), None, None, None) + if not user_plex: + logger.error(f"Plex: Failed to get token for {user.title}, skipping") + continue + + user_name = user.username.lower() if user.username else user.title.lower() + if user_name not in users_watched: + users_watched[user_name] = UserData() + + for library in user_plex.library.sections(): + if library.title not in sync_libraries: + continue + if library.title in users_watched[user_name].libraries: + logger.info(f"Plex: {user_name} {library.title} watched history has already been gathered, skipping") + continue + + library_data = self.get_user_library_watched(user_name, user_plex, library) + users_watched[user_name].libraries[library.title] = library_data + + return users_watched + except Exception as e: + logger.error(f"Plex: Failed to get users watched, Error: {e}") + return {} + + def get_plex_user_from_id(self, user_id: str) -> MyPlexUser | MyPlexAccount | None: + for u in self.users: + username = u.username.lower() if u.username else u.title.lower() + if username == user_id.lower(): + return u + return None + + def mark_watched(self, user_id: str, item_id: str): + user = self.get_plex_user_from_id(user_id) + if not user: + logger.error(f"Plex: User {user_id} not found.") + return + + user_plex = self.plex if self.admin_user == user else self.login(self.base_url, user.get_token(self.plex.machineIdentifier), None, None, None) + item = user_plex.fetchItem(int(item_id)) + if item: + item.markWatched() + + def mark_unwatched(self, user_id: str, item_id: str): + user = self.get_plex_user_from_id(user_id) + if not user: + logger.error(f"Plex: User {user_id} not found.") + return + + user_plex = self.plex if self.admin_user == user else self.login(self.base_url, user.get_token(self.plex.machineIdentifier), None, None, None) + item = user_plex.fetchItem(int(item_id)) + if item: + item.markUnwatched() + + def update_watched( + self, + watched_list: dict[str, UserData], + user_mapping: dict[str, str] | None = None, + library_mapping: dict[str, str] | None = None, + dryrun: bool = False, + ) -> None: + # This function is now deprecated and will be removed. + # The new sync logic in watched.py will be used instead. + pass diff --git a/src/watched.py b/src/watched.py index c2b883a..2af6e76 100644 --- a/src/watched.py +++ b/src/watched.py @@ -1,28 +1,27 @@ -import copy from datetime import datetime from pydantic import BaseModel, Field from loguru import logger -from typing import Any +from typing import Any, Literal from src.functions import search_mapping class MediaIdentifiers(BaseModel): title: str | None = None - - # File information, will be folder for series and media file for episode/movie locations: tuple[str, ...] = tuple() - - # Guids imdb_id: str | None = None tvdb_id: str | None = None tmdb_id: str | None = None + id: str | None = None + server: Any | None = None + user_id: str | None = None class WatchedStatus(BaseModel): completed: bool time: int viewed_date: datetime + last_updated_at: datetime class MediaItem(BaseModel): @@ -45,279 +44,83 @@ class UserData(BaseModel): libraries: dict[str, LibraryData] = Field(default_factory=dict) -def merge_mediaitem_data(ep1: MediaItem, ep2: MediaItem) -> MediaItem: - """ - Merge two MediaItem episodes by comparing their watched status. - If one is completed while the other isn't, choose the completed one. - If both are completed or both are not, choose the one with the higher time. - """ - if ep1.status.completed != ep2.status.completed: - return ep1 if ep1.status.completed else ep2 - return ep1 if ep1.status.time >= ep2.status.time else ep2 - - -def merge_series_data(series1: Series, series2: Series) -> Series: - """ - Merge two Series objects by combining their episodes. - For duplicate episodes (determined by check_same_identifiers), merge their watched status. - """ - merged_series = copy.deepcopy(series1) - for ep in series2.episodes: - for idx, merged_ep in enumerate(merged_series.episodes): - if check_same_identifiers(ep.identifiers, merged_ep.identifiers): - merged_series.episodes[idx] = merge_mediaitem_data(merged_ep, ep) - break - else: - merged_series.episodes.append(copy.deepcopy(ep)) - return merged_series - - -def merge_library_data(lib1: LibraryData, lib2: LibraryData) -> LibraryData: - """ - Merge two LibraryData objects by extending movies and merging series. - For series, duplicates are determined using check_same_identifiers. - """ - merged = copy.deepcopy(lib1) - - # Merge movies. - for movie in lib2.movies: - for idx, merged_movie in enumerate(merged.movies): - if check_same_identifiers(movie.identifiers, merged_movie.identifiers): - merged.movies[idx] = merge_mediaitem_data(merged_movie, movie) - break - else: - merged.movies.append(copy.deepcopy(movie)) - - # Merge series. - for series2 in lib2.series: - for idx, series1 in enumerate(merged.series): - if check_same_identifiers(series1.identifiers, series2.identifiers): - merged.series[idx] = merge_series_data(series1, series2) - break - else: - merged.series.append(copy.deepcopy(series2)) - - return merged - - -def merge_user_data(user1: UserData, user2: UserData) -> UserData: - """ - Merge two UserData objects by merging their libraries. - If a library exists in both, merge its content; otherwise, add the new library. - """ - merged_libraries = copy.deepcopy(user1.libraries) - for lib_key, lib_data in user2.libraries.items(): - if lib_key in merged_libraries: - merged_libraries[lib_key] = merge_library_data( - merged_libraries[lib_key], lib_data - ) - else: - merged_libraries[lib_key] = copy.deepcopy(lib_data) - return UserData(libraries=merged_libraries) - - -def merge_server_watched( - watched_list_1: dict[str, UserData], - watched_list_2: dict[str, UserData], - user_mapping: dict[str, str] | None = None, - library_mapping: dict[str, str] | None = None, -) -> dict[str, UserData]: - """ - Merge two dictionaries of UserData while taking into account possible - differences in user and library keys via the provided mappings. - """ - merged_watched = copy.deepcopy(watched_list_1) - - for user_2, user_data in watched_list_2.items(): - # Determine matching user key. - user_key = user_mapping.get(user_2, user_2) if user_mapping else user_2 - if user_key not in merged_watched: - merged_watched[user_2] = copy.deepcopy(user_data) - continue - - for lib_key, lib_data in user_data.libraries.items(): - mapped_lib_key = ( - library_mapping.get(lib_key, lib_key) if library_mapping else lib_key - ) - if mapped_lib_key not in merged_watched[user_key].libraries: - merged_watched[user_key].libraries[lib_key] = copy.deepcopy(lib_data) - else: - merged_watched[user_key].libraries[mapped_lib_key] = merge_library_data( - merged_watched[user_key].libraries[mapped_lib_key], - lib_data, - ) - - return merged_watched - - def check_same_identifiers(item1: MediaIdentifiers, item2: MediaIdentifiers) -> bool: - # Check for duplicate based on file locations: if item1.locations and item2.locations: if set(item1.locations) & set(item2.locations): return True - - # Check for duplicate based on GUIDs: if ( (item1.imdb_id and item2.imdb_id and item1.imdb_id == item2.imdb_id) or (item1.tvdb_id and item2.tvdb_id and item1.tvdb_id == item2.tvdb_id) or (item1.tmdb_id and item2.tmdb_id and item1.tmdb_id == item2.tmdb_id) ): return True - return False - -def check_remove_entry(item1: MediaItem, item2: MediaItem) -> bool: - """ - Returns True if item1 (from watched_list_1) should be removed - in favor of item2 (from watched_list_2), based on: - - Duplicate criteria: - * They match if any file location is shared OR - at least one of imdb_id, tvdb_id, or tmdb_id matches. - - Watched status: - * If one is complete and the other is not, remove the incomplete one. - * If both are incomplete, remove the one with lower progress (time). - * If both are complete, remove item1 as duplicate. - """ - if not check_same_identifiers(item1.identifiers, item2.identifiers): - return False - - # Compare watched statuses. - status1 = item1.status - status2 = item2.status - - # If one is complete and the other isn't, remove the one that's not complete. - if status1.completed != status2.completed: - if not status1.completed and status2.completed: - return True # Remove item1 since it's not complete. - else: - return False # Do not remove item1; it's complete. - - # Both have the same completed status. - if not status1.completed and not status2.completed: - # Both incomplete: remove the one with lower progress (time) - if status1.time < status2.time: - return True # Remove item1 because it has watched less. - elif status1.time > status2.time: - return False # Keep item1 because it has more progress. - else: - # Same progress; Remove duplicate - return True - - # If both are complete, consider item1 the duplicate and remove it. - return True - - -def cleanup_watched( - watched_list_1: dict[str, UserData], - watched_list_2: dict[str, UserData], +def sync_watched_lists( + server1_data: dict[str, UserData], + server2_data: dict[str, UserData], user_mapping: dict[str, str] | None = None, library_mapping: dict[str, str] | None = None, -) -> dict[str, UserData]: - modified_watched_list_1 = copy.deepcopy(watched_list_1) +) -> list[tuple[Literal["mark_watched", "mark_unwatched"], Any, str, str, str]]: + actions = [] - # remove entries from watched_list_1 that are in watched_list_2 - for user_1 in watched_list_1: - user_other = None - if user_mapping: - user_other = search_mapping(user_mapping, user_1) - user_2 = get_other(watched_list_2, user_1, user_other) - if user_2 is None: + for user1_name, user1_data in server1_data.items(): + user2_name = search_mapping(user_mapping, user1_name) if user_mapping else user1_name + if user2_name not in server2_data: continue - for library_1_key in watched_list_1[user_1].libraries: - library_other = None - if library_mapping: - library_other = search_mapping(library_mapping, library_1_key) - library_2_key = get_other( - watched_list_2[user_2].libraries, library_1_key, library_other - ) - if library_2_key is None: + user2_data = server2_data[user2_name] + + for lib1_name, lib1_data in user1_data.libraries.items(): + lib2_name = search_mapping(library_mapping, lib1_name) if library_mapping else lib1_name + if lib2_name not in user2_data.libraries: continue - library_1 = watched_list_1[user_1].libraries[library_1_key] - library_2 = watched_list_2[user_2].libraries[library_2_key] + lib2_data = user2_data.libraries[lib2_name] - filtered_movies = [] - for movie in library_1.movies: - remove_flag = False - for movie2 in library_2.movies: - if check_remove_entry(movie, movie2): - logger.trace(f"Removing movie: {movie.identifiers.title}") - remove_flag = True + # Sync movies + for movie1 in lib1_data.movies: + for movie2 in lib2_data.movies: + if check_same_identifiers(movie1.identifiers, movie2.identifiers): + action = compare_and_get_action(movie1, movie2) + if action: + actions.append(action) break - if not remove_flag: - filtered_movies.append(movie) - - modified_watched_list_1[user_1].libraries[ - library_1_key - ].movies = filtered_movies - - # TV Shows - filtered_series_list = [] - for series1 in library_1.series: - matching_series = None - for series2 in library_2.series: + # Sync series (episodes) + for series1 in lib1_data.series: + for series2 in lib2_data.series: if check_same_identifiers(series1.identifiers, series2.identifiers): - matching_series = series2 + for episode1 in series1.episodes: + for episode2 in series2.episodes: + if check_same_identifiers(episode1.identifiers, episode2.identifiers): + action = compare_and_get_action(episode1, episode2) + if action: + actions.append(action) + break break - - if matching_series is None: - # No matching show in watched_list_2; keep the series as is. - filtered_series_list.append(series1) - else: - # We have a matching show; now clean up the episodes. - filtered_episodes = [] - for ep1 in series1.episodes: - remove_flag = False - for ep2 in matching_series.episodes: - if check_remove_entry(ep1, ep2): - logger.trace( - f"Removing episode '{ep1.identifiers.title}' from show '{series1.identifiers.title}'", - ) - remove_flag = True - break - if not remove_flag: - filtered_episodes.append(ep1) - - # Only keep the series if there are remaining episodes. - if filtered_episodes: - modified_series1 = copy.deepcopy(series1) - modified_series1.episodes = filtered_episodes - filtered_series_list.append(modified_series1) - else: - logger.trace( - f"Removing entire show '{series1.identifiers.title}' as no episodes remain after cleanup.", - ) - modified_watched_list_1[user_1].libraries[ - library_1_key - ].series = filtered_series_list - - # After processing, remove any library that is completely empty. - for user, user_data in modified_watched_list_1.items(): - new_libraries = {} - for lib_key, library in user_data.libraries.items(): - if library.movies or library.series: - new_libraries[lib_key] = library - else: - logger.trace(f"Removing empty library '{lib_key}' for user '{user}'") - user_data.libraries = new_libraries - - return modified_watched_list_1 + return actions -def get_other( - watched_list: dict[str, Any], object_1: str, object_2: str | None -) -> str | None: - if object_1 in watched_list: - return object_1 +def compare_and_get_action(item1: MediaItem, item2: MediaItem): + if item1.status.completed == item2.status.completed: + return None - if object_2 and object_2 in watched_list: - return object_2 + if item1.status.last_updated_at > item2.status.last_updated_at: + source_item, dest_item = item1, item2 + elif item2.status.last_updated_at > item1.status.last_updated_at: + source_item, dest_item = item2, item1 + else: + return None - logger.info( - f"{object_1}{' and ' + object_2 if object_2 else ''} not found in watched list 2" + action_type = "mark_watched" if source_item.status.completed else "mark_unwatched" + + logger.info(f"Scheduling action: {action_type} for item {dest_item.identifiers.title} on server {dest_item.identifiers.server.server_type}") + + return ( + action_type, + dest_item.identifiers.server, + dest_item.identifiers.user_id, + dest_item.identifiers.id, + source_item.status.viewed_date.isoformat().replace("+00:00", "Z") ) - - return None diff --git a/test/test_sync.py b/test/test_sync.py new file mode 100644 index 0000000..9ebcf16 --- /dev/null +++ b/test/test_sync.py @@ -0,0 +1,133 @@ +from datetime import datetime, timedelta +import sys +import os +from unittest.mock import Mock + +# Add parent directory to sys.path +current = os.path.dirname(os.path.realpath(__file__)) +parent = os.path.dirname(current) +sys.path.append(parent) + +from src.watched import ( + LibraryData, + MediaIdentifiers, + MediaItem, + Series, + UserData, + WatchedStatus, + sync_watched_lists, +) + +# --- Mock Data Setup --- +now = datetime.now() +time_new = now +time_old = now - timedelta(days=1) + +# Mock server objects +mock_server1 = Mock() +mock_server1.server_type = "Plex" +mock_server2 = Mock() +mock_server2.server_type = "Jellyfin" + +# --- Test Case 1: Sync "watched" from Server 1 to Server 2 --- +movie_s1_watched = MediaItem( + identifiers=MediaIdentifiers(title="Movie A", id="1", server=mock_server1, user_id="user1", imdb_id="tt1"), + status=WatchedStatus(completed=True, time=0, viewed_date=time_new, last_updated_at=time_new), +) +movie_s2_unwatched = MediaItem( + identifiers=MediaIdentifiers(title="Movie A", id="a", server=mock_server2, user_id="user1", imdb_id="tt1"), + status=WatchedStatus(completed=False, time=0, viewed_date=time_old, last_updated_at=time_old), +) + +# --- Test Case 2: Sync "unwatched" from Server 2 to Server 1 --- +movie_s1_unwatched_old = MediaItem( + identifiers=MediaIdentifiers(title="Movie B", id="2", server=mock_server1, user_id="user1", imdb_id="tt2"), + status=WatchedStatus(completed=True, time=0, viewed_date=time_old, last_updated_at=time_old), +) +movie_s2_unwatched_new = MediaItem( + identifiers=MediaIdentifiers(title="Movie B", id="b", server=mock_server2, user_id="user1", imdb_id="tt2"), + status=WatchedStatus(completed=False, time=0, viewed_date=time_new, last_updated_at=time_new), +) + +# --- Test Case 3: No sync needed (already in sync) --- +movie_s1_synced = MediaItem( + identifiers=MediaIdentifiers(title="Movie C", id="3", server=mock_server1, user_id="user1", imdb_id="tt3"), + status=WatchedStatus(completed=True, time=0, viewed_date=time_new, last_updated_at=time_new), +) +movie_s2_synced = MediaItem( + identifiers=MediaIdentifiers(title="Movie C", id="c", server=mock_server2, user_id="user1", imdb_id="tt3"), + status=WatchedStatus(completed=True, time=0, viewed_date=time_new, last_updated_at=time_new), +) + +# --- Test Case 4: No sync needed (timestamps equal) --- +movie_s1_equal_ts = MediaItem( + identifiers=MediaIdentifiers(title="Movie D", id="4", server=mock_server1, user_id="user1", imdb_id="tt4"), + status=WatchedStatus(completed=True, time=0, viewed_date=time_new, last_updated_at=time_new), +) +movie_s2_equal_ts = MediaItem( + identifiers=MediaIdentifiers(title="Movie D", id="d", server=mock_server2, user_id="user1", imdb_id="tt4"), + status=WatchedStatus(completed=False, time=0, viewed_date=time_new, last_updated_at=time_new), +) + + +def build_test_data(movies1, movies2): + return ( + {"user1": UserData(libraries={"Movies": LibraryData(title="Movies", movies=movies1, series=[])})}, + {"user1": UserData(libraries={"Movies": LibraryData(title="Movies", movies=movies2, series=[])})}, + ) + +def test_sync_watched_from_s1_to_s2(): + server1_data, server2_data = build_test_data([movie_s1_watched], [movie_s2_unwatched]) + actions = sync_watched_lists(server1_data, server2_data) + + assert len(actions) == 1 + action = actions[0] + assert action[0] == "mark_watched" + assert action[1] == mock_server2 + assert action[2] == "user1" + assert action[3] == "a" + +def test_sync_unwatched_from_s2_to_s1(): + server1_data, server2_data = build_test_data([movie_s1_unwatched_old], [movie_s2_unwatched_new]) + actions = sync_watched_lists(server1_data, server2_data) + + assert len(actions) == 1 + action = actions[0] + assert action[0] == "mark_unwatched" + assert action[1] == mock_server1 + assert action[2] == "user1" + assert action[3] == "2" + +def test_no_sync_when_already_synced(): + server1_data, server2_data = build_test_data([movie_s1_synced], [movie_s2_synced]) + actions = sync_watched_lists(server1_data, server2_data) + assert len(actions) == 0 + +def test_no_sync_when_timestamps_equal(): + server1_data, server2_data = build_test_data([movie_s1_equal_ts], [movie_s2_equal_ts]) + actions = sync_watched_lists(server1_data, server2_data) + assert len(actions) == 0 + +def test_sync_with_user_mapping(): + server1_data = {"plex_user": UserData(libraries={"Movies": LibraryData(title="Movies", movies=[movie_s1_watched], series=[])})} + server2_data = {"jellyfin_user": UserData(libraries={"Movies": LibraryData(title="Movies", movies=[movie_s2_unwatched], series=[])})} + user_mapping = {"plex_user": "jellyfin_user"} + + actions = sync_watched_lists(server1_data, server2_data, user_mapping=user_mapping) + + assert len(actions) == 1 + action = actions[0] + assert action[0] == "mark_watched" + assert action[1] == mock_server2 + +def test_sync_with_library_mapping(): + server1_data = {"user1": UserData(libraries={"Plex Movies": LibraryData(title="Plex Movies", movies=[movie_s1_watched], series=[])})} + server2_data = {"user1": UserData(libraries={"Jellyfin Movies": LibraryData(title="Jellyfin Movies", movies=[movie_s2_unwatched], series=[])})} + library_mapping = {"Plex Movies": "Jellyfin Movies"} + + actions = sync_watched_lists(server1_data, server2_data, library_mapping=library_mapping) + + assert len(actions) == 1 + action = actions[0] + assert action[0] == "mark_watched" + assert action[1] == mock_server2 diff --git a/test/test_watched.py b/test/test_watched.py deleted file mode 100644 index e4245ab..0000000 --- a/test/test_watched.py +++ /dev/null @@ -1,724 +0,0 @@ -from datetime import datetime -import sys -import os - -# getting the name of the directory -# where the this file is present. -current = os.path.dirname(os.path.realpath(__file__)) - -# Getting the parent directory name -# where the current directory is present. -parent = os.path.dirname(current) - -# adding the parent directory to -# the sys.path. -sys.path.append(parent) - -from src.watched import ( - LibraryData, - MediaIdentifiers, - MediaItem, - Series, - UserData, - WatchedStatus, - cleanup_watched, -) - -viewed_date = datetime.today() - -tv_shows_watched_list_1: list[Series] = [ - Series( - identifiers=MediaIdentifiers( - title="Doctor Who (2005)", - locations=("Doctor Who (2005) {tvdb-78804} {imdb-tt0436992}",), - imdb_id="tt0436992", - tmdb_id="57243", - tvdb_id="78804", - ), - episodes=[ - MediaItem( - identifiers=MediaIdentifiers( - title="The Unquiet Dead", - locations=("S01E03.mkv",), - imdb_id="tt0563001", - tmdb_id="968589", - tvdb_id="295296", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="Aliens of London (1)", - locations=("S01E04.mkv",), - imdb_id="tt0562985", - tmdb_id="968590", - tvdb_id="295297", - ), - status=WatchedStatus( - completed=False, time=240000, viewed_date=viewed_date - ), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="World War Three (2)", - locations=("S01E05.mkv",), - imdb_id="tt0563003", - tmdb_id="968592", - tvdb_id="295298", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - ], - ), - Series( - identifiers=MediaIdentifiers( - title="Monarch: Legacy of Monsters", - locations=("Monarch - Legacy of Monsters {tvdb-422598} {imdb-tt17220216}",), - imdb_id="tt17220216", - tmdb_id="202411", - tvdb_id="422598", - ), - episodes=[ - MediaItem( - identifiers=MediaIdentifiers( - title="Secrets and Lies", - locations=("S01E03.mkv",), - imdb_id="tt21255044", - tmdb_id="4661246", - tvdb_id="10009418", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="Parallels and Interiors", - locations=("S01E04.mkv",), - imdb_id="tt21255050", - tmdb_id="4712059", - tvdb_id="10009419", - ), - status=WatchedStatus( - completed=False, time=240000, viewed_date=viewed_date - ), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="The Way Out", - locations=("S01E05.mkv",), - imdb_id="tt23787572", - tmdb_id="4712061", - tvdb_id="10009420", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - ], - ), - Series( - identifiers=MediaIdentifiers( - title="My Adventures with Superman", - locations=("My Adventures with Superman {tvdb-403172} {imdb-tt14681924}",), - imdb_id="tt14681924", - tmdb_id="125928", - tvdb_id="403172", - ), - episodes=[ - MediaItem( - identifiers=MediaIdentifiers( - title="Adventures of a Normal Man (1)", - locations=("S01E01.mkv",), - imdb_id="tt15699926", - tmdb_id="3070048", - tvdb_id="8438181", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="Adventures of a Normal Man (2)", - locations=("S01E02.mkv",), - imdb_id="tt20413322", - tmdb_id="4568681", - tvdb_id="9829910", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="My Interview with Superman", - locations=("S01E03.mkv",), - imdb_id="tt20413328", - tmdb_id="4497012", - tvdb_id="9870382", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - ], - ), -] - -# ───────────────────────────────────────────────────────────── -# TV Shows Watched list 2 - -tv_shows_watched_list_2: list[Series] = [ - Series( - identifiers=MediaIdentifiers( - title="Doctor Who", - locations=("Doctor Who (2005) {tvdb-78804} {imdb-tt0436992}",), - imdb_id="tt0436992", - tmdb_id="57243", - tvdb_id="78804", - ), - episodes=[ - MediaItem( - identifiers=MediaIdentifiers( - title="Rose", - locations=("S01E01.mkv",), - imdb_id="tt0562992", - tvdb_id="295294", - tmdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="The End of the World", - locations=("S01E02.mkv",), - imdb_id="tt0562997", - tvdb_id="295295", - tmdb_id=None, - ), - status=WatchedStatus( - completed=False, time=300670, viewed_date=viewed_date - ), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="World War Three (2)", - locations=("S01E05.mkv",), - imdb_id="tt0563003", - tvdb_id="295298", - tmdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - ], - ), - Series( - identifiers=MediaIdentifiers( - title="Monarch: Legacy of Monsters", - locations=("Monarch - Legacy of Monsters {tvdb-422598} {imdb-tt17220216}",), - imdb_id="tt17220216", - tmdb_id="202411", - tvdb_id="422598", - ), - episodes=[ - MediaItem( - identifiers=MediaIdentifiers( - title="Aftermath", - locations=("S01E01.mkv",), - imdb_id="tt20412166", - tvdb_id="9959300", - tmdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="Departure", - locations=("S01E02.mkv",), - imdb_id="tt22866594", - tvdb_id="10009417", - tmdb_id=None, - ), - status=WatchedStatus( - completed=False, time=300741, viewed_date=viewed_date - ), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="The Way Out", - locations=("S01E05.mkv",), - imdb_id="tt23787572", - tvdb_id="10009420", - tmdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - ], - ), - Series( - identifiers=MediaIdentifiers( - title="My Adventures with Superman", - locations=("My Adventures with Superman {tvdb-403172} {imdb-tt14681924}",), - imdb_id="tt14681924", - tmdb_id="125928", - tvdb_id="403172", - ), - episodes=[ - MediaItem( - identifiers=MediaIdentifiers( - title="Adventures of a Normal Man (1)", - locations=("S01E01.mkv",), - imdb_id="tt15699926", - tvdb_id="8438181", - tmdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="Adventures of a Normal Man (2)", - locations=("S01E02.mkv",), - imdb_id="tt20413322", - tvdb_id="9829910", - tmdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="My Interview with Superman", - locations=("S01E03.mkv",), - imdb_id="tt20413328", - tvdb_id="9870382", - tmdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - ], - ), -] - -# ───────────────────────────────────────────────────────────── -# Expected TV Shows Watched list 1 (after cleanup) - -expected_tv_show_watched_list_1: list[Series] = [ - Series( - identifiers=MediaIdentifiers( - title="Doctor Who (2005)", - locations=("Doctor Who (2005) {tvdb-78804} {imdb-tt0436992}",), - imdb_id="tt0436992", - tmdb_id="57243", - tvdb_id="78804", - ), - episodes=[ - MediaItem( - identifiers=MediaIdentifiers( - title="The Unquiet Dead", - locations=("S01E03.mkv",), - imdb_id="tt0563001", - tmdb_id="968589", - tvdb_id="295296", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="Aliens of London (1)", - locations=("S01E04.mkv",), - imdb_id="tt0562985", - tmdb_id="968590", - tvdb_id="295297", - ), - status=WatchedStatus( - completed=False, time=240000, viewed_date=viewed_date - ), - ), - ], - ), - Series( - identifiers=MediaIdentifiers( - title="Monarch: Legacy of Monsters", - locations=("Monarch - Legacy of Monsters {tvdb-422598} {imdb-tt17220216}",), - imdb_id="tt17220216", - tmdb_id="202411", - tvdb_id="422598", - ), - episodes=[ - MediaItem( - identifiers=MediaIdentifiers( - title="Secrets and Lies", - locations=("S01E03.mkv",), - imdb_id="tt21255044", - tmdb_id="4661246", - tvdb_id="10009418", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="Parallels and Interiors", - locations=("S01E04.mkv",), - imdb_id="tt21255050", - tmdb_id="4712059", - tvdb_id="10009419", - ), - status=WatchedStatus( - completed=False, time=240000, viewed_date=viewed_date - ), - ), - ], - ), -] - -# ───────────────────────────────────────────────────────────── -# Expected TV Shows Watched list 2 (after cleanup) - -expected_tv_show_watched_list_2: list[Series] = [ - Series( - identifiers=MediaIdentifiers( - title="Doctor Who", - locations=("Doctor Who (2005) {tvdb-78804} {imdb-tt0436992}",), - imdb_id="tt0436992", - tmdb_id="57243", - tvdb_id="78804", - ), - episodes=[ - MediaItem( - identifiers=MediaIdentifiers( - title="Rose", - locations=("S01E01.mkv",), - imdb_id="tt0562992", - tvdb_id="295294", - tmdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="The End of the World", - locations=("S01E02.mkv",), - imdb_id="tt0562997", - tvdb_id="295295", - tmdb_id=None, - ), - status=WatchedStatus( - completed=False, time=300670, viewed_date=viewed_date - ), - ), - ], - ), - Series( - identifiers=MediaIdentifiers( - title="Monarch: Legacy of Monsters", - locations=("Monarch - Legacy of Monsters {tvdb-422598} {imdb-tt17220216}",), - imdb_id="tt17220216", - tmdb_id="202411", - tvdb_id="422598", - ), - episodes=[ - MediaItem( - identifiers=MediaIdentifiers( - title="Aftermath", - locations=("S01E01.mkv",), - imdb_id="tt20412166", - tvdb_id="9959300", - tmdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="Departure", - locations=("S01E02.mkv",), - imdb_id="tt22866594", - tvdb_id="10009417", - tmdb_id=None, - ), - status=WatchedStatus( - completed=False, time=300741, viewed_date=viewed_date - ), - ), - ], - ), -] - -# ───────────────────────────────────────────────────────────── -# Movies Watched list 1 - -movies_watched_list_1: list[MediaItem] = [ - MediaItem( - identifiers=MediaIdentifiers( - title="Big Buck Bunny", - locations=("Big Buck Bunny.mkv",), - imdb_id="tt1254207", - tmdb_id="10378", - tvdb_id="12352", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="The Family Plan", - locations=("The Family Plan (2023).mkv",), - imdb_id="tt16431870", - tmdb_id="1029575", - tvdb_id="351194", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="Killers of the Flower Moon", - locations=("Killers of the Flower Moon (2023).mkv",), - imdb_id="tt5537002", - tmdb_id="466420", - tvdb_id="135852", - ), - status=WatchedStatus(completed=False, time=240000, viewed_date=viewed_date), - ), -] - -# ───────────────────────────────────────────────────────────── -# Movies Watched list 2 - -movies_watched_list_2: list[MediaItem] = [ - MediaItem( - identifiers=MediaIdentifiers( - title="The Family Plan", - locations=("The Family Plan (2023).mkv",), - imdb_id="tt16431870", - tmdb_id="1029575", - tvdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="Five Nights at Freddy's", - locations=("Five Nights at Freddy's (2023).mkv",), - imdb_id="tt4589218", - tmdb_id="507089", - tvdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="The Hunger Games: The Ballad of Songbirds & Snakes", - locations=("The Hunger Games The Ballad of Songbirds & Snakes (2023).mkv",), - imdb_id="tt10545296", - tmdb_id="695721", - tvdb_id=None, - ), - status=WatchedStatus(completed=False, time=301215, viewed_date=viewed_date), - ), -] - -# ───────────────────────────────────────────────────────────── -# Expected Movies Watched list 1 - -expected_movie_watched_list_1: list[MediaItem] = [ - MediaItem( - identifiers=MediaIdentifiers( - title="Big Buck Bunny", - locations=("Big Buck Bunny.mkv",), - imdb_id="tt1254207", - tmdb_id="10378", - tvdb_id="12352", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="Killers of the Flower Moon", - locations=("Killers of the Flower Moon (2023).mkv",), - imdb_id="tt5537002", - tmdb_id="466420", - tvdb_id="135852", - ), - status=WatchedStatus(completed=False, time=240000, viewed_date=viewed_date), - ), -] - -# ───────────────────────────────────────────────────────────── -# Expected Movies Watched list 2 - -expected_movie_watched_list_2: list[MediaItem] = [ - MediaItem( - identifiers=MediaIdentifiers( - title="Five Nights at Freddy's", - locations=("Five Nights at Freddy's (2023).mkv",), - imdb_id="tt4589218", - tmdb_id="507089", - tvdb_id=None, - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ), - MediaItem( - identifiers=MediaIdentifiers( - title="The Hunger Games: The Ballad of Songbirds & Snakes", - locations=("The Hunger Games The Ballad of Songbirds & Snakes (2023).mkv",), - imdb_id="tt10545296", - tmdb_id="695721", - tvdb_id=None, - ), - status=WatchedStatus(completed=False, time=301215, viewed_date=viewed_date), - ), -] - -# ───────────────────────────────────────────────────────────── -# TV Shows 2 Watched list 1 (for testing deletion up to the root) -# Here we use a single Series entry for "Criminal Minds" - -tv_shows_2_watched_list_1: list[Series] = [ - Series( - identifiers=MediaIdentifiers( - title="Criminal Minds", - locations=("Criminal Minds",), - imdb_id="tt0452046", - tmdb_id="4057", - tvdb_id="75710", - ), - episodes=[ - MediaItem( - identifiers=MediaIdentifiers( - title="Extreme Aggressor", - locations=( - "Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv", - ), - imdb_id="tt0550489", - tmdb_id="282843", - tvdb_id="176357", - ), - status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date), - ) - ], - ) -] - - -def test_simple_cleanup_watched(): - user_watched_list_1: dict[str, UserData] = { - "user1": UserData( - libraries={ - "TV Shows": LibraryData( - title="TV Shows", - movies=[], - series=tv_shows_watched_list_1, - ), - "Movies": LibraryData( - title="Movies", - movies=movies_watched_list_1, - series=[], - ), - "Other Shows": LibraryData( - title="Other Shows", - movies=[], - series=tv_shows_2_watched_list_1, - ), - } - ) - } - - user_watched_list_2: dict[str, UserData] = { - "user1": UserData( - libraries={ - "TV Shows": LibraryData( - title="TV Shows", - movies=[], - series=tv_shows_watched_list_2, - ), - "Movies": LibraryData( - title="Movies", - movies=movies_watched_list_2, - series=[], - ), - "Other Shows": LibraryData( - title="Other Shows", - movies=[], - series=tv_shows_2_watched_list_1, - ), - } - ) - } - - expected_watched_list_1: dict[str, UserData] = { - "user1": UserData( - libraries={ - "TV Shows": LibraryData( - title="TV Shows", - movies=[], - series=expected_tv_show_watched_list_1, - ), - "Movies": LibraryData( - title="Movies", - movies=expected_movie_watched_list_1, - series=[], - ), - } - ) - } - - expected_watched_list_2: dict[str, UserData] = { - "user1": UserData( - libraries={ - "TV Shows": LibraryData( - title="TV Shows", - movies=[], - series=expected_tv_show_watched_list_2, - ), - "Movies": LibraryData( - title="Movies", - movies=expected_movie_watched_list_2, - series=[], - ), - } - ) - } - - return_watched_list_1 = cleanup_watched(user_watched_list_1, user_watched_list_2) - return_watched_list_2 = cleanup_watched(user_watched_list_2, user_watched_list_1) - - assert return_watched_list_1 == expected_watched_list_1 - assert return_watched_list_2 == expected_watched_list_2 - - -# def test_mapping_cleanup_watched(): -# user_watched_list_1 = { -# "user1": { -# "TV Shows": tv_shows_watched_list_1, -# "Movies": movies_watched_list_1, -# "Other Shows": tv_shows_2_watched_list_1, -# }, -# } -# user_watched_list_2 = { -# "user2": { -# "Shows": tv_shows_watched_list_2, -# "Movies": movies_watched_list_2, -# "Other Shows": tv_shows_2_watched_list_1, -# } -# } -# -# expected_watched_list_1 = { -# "user1": { -# "TV Shows": expected_tv_show_watched_list_1, -# "Movies": expected_movie_watched_list_1, -# } -# } -# -# expected_watched_list_2 = { -# "user2": { -# "Shows": expected_tv_show_watched_list_2, -# "Movies": expected_movie_watched_list_2, -# } -# } -# -# user_mapping = {"user1": "user2"} -# library_mapping = {"TV Shows": "Shows"} -# -# return_watched_list_1 = cleanup_watched( -# user_watched_list_1, -# user_watched_list_2, -# user_mapping=user_mapping, -# library_mapping=library_mapping, -# ) -# return_watched_list_2 = cleanup_watched( -# user_watched_list_2, -# user_watched_list_1, -# user_mapping=user_mapping, -# library_mapping=library_mapping, -# ) -# -# assert return_watched_list_1 == expected_watched_list_1 -# assert return_watched_list_2 == expected_watched_list_2