diff --git a/src/functions.py b/src/functions.py index fdba5e9..51e4293 100644 --- a/src/functions.py +++ b/src/functions.py @@ -1,4 +1,4 @@ -import os +import os, copy from concurrent.futures import ThreadPoolExecutor from dotenv import load_dotenv @@ -100,6 +100,10 @@ def generate_library_guids_dict(user_list: dict): episode_output_dict = {} movies_output_dict = {} + # Handle the case where user_list is empty or does not contain the expected keys and values + if not user_list: + return show_output_dict, episode_output_dict, movies_output_dict + try: show_output_keys = user_list.keys() show_output_keys = [dict(x) for x in list(show_output_keys)] @@ -162,7 +166,14 @@ def combine_watched_dicts(dicts: list): if key not in combined_dict: combined_dict[key] = {} for subkey, subvalue in value.items(): - combined_dict[key][subkey] = subvalue + if subkey in combined_dict[key]: + # If the subkey already exists in the combined dictionary, + # check if the values are different and raise an exception if they are + if combined_dict[key][subkey] != subvalue: + raise ValueError(f"Conflicting values for subkey '{subkey}' under key '{key}'") + else: + # If the subkey does not exist in the combined dictionary, add it + combined_dict[key][subkey] = subvalue return combined_dict @@ -187,3 +198,132 @@ def future_thread_executor(args: list, workers: int = -1): raise Exception(e) return results + + +def cleanup_watched( + watched_list_1, watched_list_2, user_mapping=None, library_mapping=None +): + modified_watched_list_1 = copy.deepcopy(watched_list_1) + + # remove entries from watched_list_1 that are in watched_list_2 + for user_1 in watched_list_1: + user_other = None + if user_mapping: + user_other = search_mapping(user_mapping, user_1) + user_2 = get_other(watched_list_2, user_1, user_other) + if user_2 is None: + continue + + for library_1 in watched_list_1[user_1]: + library_other = None + if library_mapping: + library_other = search_mapping(library_mapping, library_1) + library_2 = get_other(watched_list_2[user_2], library_1, library_other) + if library_2 is None: + continue + + ( + _, + episode_watched_list_2_keys_dict, + movies_watched_list_2_keys_dict, + ) = generate_library_guids_dict(watched_list_2[user_2][library_2]) + + # Movies + if isinstance(watched_list_1[user_1][library_1], list): + for movie in watched_list_1[user_1][library_1]: + if is_movie_in_dict(movie, movies_watched_list_2_keys_dict): + logger(f"Removing {movie} from {library_1}", 3) + modified_watched_list_1[user_1][library_1].remove(movie) + + # TV Shows + elif isinstance(watched_list_1[user_1][library_1], dict): + for show_key_1 in watched_list_1[user_1][library_1].keys(): + show_key_dict = dict(show_key_1) + for season in watched_list_1[user_1][library_1][show_key_1]: + for episode in watched_list_1[user_1][library_1][show_key_1][season]: + if is_episode_in_dict(episode, episode_watched_list_2_keys_dict): + if episode in modified_watched_list_1[user_1][library_1][show_key_1][season]: + logger(f"Removing {episode} from {show_key_dict['title']}", 3) + modified_watched_list_1[user_1][library_1][show_key_1][season].remove(episode) + + # Remove empty seasons + if len(modified_watched_list_1[user_1][library_1][show_key_1][season]) == 0: + if season in modified_watched_list_1[user_1][library_1][show_key_1]: + logger(f"Removing {season} from {show_key_dict['title']} because it is empty", 3) + del modified_watched_list_1[user_1][library_1][show_key_1][season] + + # Remove empty shows + if len(modified_watched_list_1[user_1][library_1][show_key_1]) == 0: + if show_key_1 in modified_watched_list_1[user_1][library_1]: + logger(f"Removing {show_key_dict['title']} because it is empty", 3) + del modified_watched_list_1[user_1][library_1][show_key_1] + + for user_1 in watched_list_1: + for library_1 in watched_list_1[user_1]: + if library_1 in modified_watched_list_1[user_1]: + # If library is empty then remove it + if len(modified_watched_list_1[user_1][library_1]) == 0: + logger(f"Removing {library_1} from {user_1} because it is empty", 1) + del modified_watched_list_1[user_1][library_1] + + if user_1 in modified_watched_list_1: + # If user is empty delete user + if len(modified_watched_list_1[user_1]) == 0: + logger(f"Removing {user_1} from watched list 1 because it is empty", 1) + del modified_watched_list_1[user_1] + + return modified_watched_list_1 + + +def get_other(watched_list_2, object_1, object_2): + if object_1 in watched_list_2: + return object_1 + elif object_2 in watched_list_2: + return object_2 + else: + logger(f"{object_1} and {object_2} not found in watched list 2", 1) + return None + + +def is_movie_in_dict(movie, movies_watched_list_2_keys_dict): + # Iterate through the keys and values of the movie dictionary + for movie_key, movie_value in movie.items(): + # If the key is "locations", check if the "locations" key is present in the movies_watched_list_2_keys_dict dictionary + if movie_key == "locations": + if "locations" in movies_watched_list_2_keys_dict.keys(): + # Iterate through the locations in the movie dictionary + for location in movie_value: + # If the location is in the movies_watched_list_2_keys_dict dictionary, return True + if location in movies_watched_list_2_keys_dict["locations"]: + return True + # If the key is not "locations", check if the movie_key is present in the movies_watched_list_2_keys_dict dictionary + else: + if movie_key in movies_watched_list_2_keys_dict.keys(): + # If the movie_value is in the movies_watched_list_2_keys_dict dictionary, return True + if movie_value in movies_watched_list_2_keys_dict[movie_key]: + return True + + # If the loop completes without finding a match, return False + return False + + +def is_episode_in_dict(episode, episode_watched_list_2_keys_dict): + # Iterate through the keys and values of the episode dictionary + for episode_key, episode_value in episode.items(): + # If the key is "locations", check if the "locations" key is present in the episode_watched_list_2_keys_dict dictionary + if episode_key == "locations": + if "locations" in episode_watched_list_2_keys_dict.keys(): + # Iterate through the locations in the episode dictionary + for location in episode_value: + # If the location is in the episode_watched_list_2_keys_dict dictionary, return True + if location in episode_watched_list_2_keys_dict["locations"]: + return True + # If the key is not "locations", check if the episode_key is present in the episode_watched_list_2_keys_dict dictionary + else: + if episode_key in episode_watched_list_2_keys_dict.keys(): + # If the episode_value is in the episode_watched_list_2_keys_dict dictionary, return True + if episode_value in episode_watched_list_2_keys_dict[episode_key]: + return True + + # If the loop completes without finding a match, return False + return False \ No newline at end of file diff --git a/src/main.py b/src/main.py index ade6a3d..b758d03 100644 --- a/src/main.py +++ b/src/main.py @@ -1,4 +1,4 @@ -import copy, os, traceback, json, asyncio +import os, traceback, json, asyncio from dotenv import load_dotenv from time import sleep, perf_counter @@ -6,212 +6,13 @@ from src.functions import ( logger, str_to_bool, search_mapping, - generate_library_guids_dict, + cleanup_watched, ) from src.plex import Plex from src.jellyfin import Jellyfin load_dotenv(override=True) - -def cleanup_watched( - watched_list_1, watched_list_2, user_mapping=None, library_mapping=None -): - modified_watched_list_1 = copy.deepcopy(watched_list_1) - - # remove entries from plex_watched that are in jellyfin_watched - for user_1 in watched_list_1: - user_other = None - if user_mapping: - user_other = search_mapping(user_mapping, user_1) - if user_1 in modified_watched_list_1: - if user_1 in watched_list_2: - user_2 = user_1 - elif user_other in watched_list_2: - user_2 = user_other - else: - logger(f"User {user_1} and {user_other} not found in watched list 2", 1) - continue - - for library_1 in watched_list_1[user_1]: - library_other = None - if library_mapping: - library_other = search_mapping(library_mapping, library_1) - if library_1 in modified_watched_list_1[user_1]: - if library_1 in watched_list_2[user_2]: - library_2 = library_1 - elif library_other in watched_list_2[user_2]: - library_2 = library_other - else: - logger( - f"library {library_1} and {library_other} not found in watched list 2", - 1, - ) - continue - - ( - _, - episode_watched_list_2_keys_dict, - movies_watched_list_2_keys_dict, - ) = generate_library_guids_dict(watched_list_2[user_2][library_2]) - - # Movies - if isinstance(watched_list_1[user_1][library_1], list): - for movie in watched_list_1[user_1][library_1]: - movie_found = False - for movie_key, movie_value in movie.items(): - if movie_key == "locations": - if ( - "locations" - in movies_watched_list_2_keys_dict.keys() - ): - for location in movie_value: - if ( - location - in movies_watched_list_2_keys_dict[ - "locations" - ] - ): - movie_found = True - break - else: - if ( - movie_key - in movies_watched_list_2_keys_dict.keys() - ): - if ( - movie_value - in movies_watched_list_2_keys_dict[ - movie_key - ] - ): - movie_found = True - - if movie_found: - logger(f"Removing {movie} from {library_1}", 3) - modified_watched_list_1[user_1][library_1].remove( - movie - ) - break - - # TV Shows - elif isinstance(watched_list_1[user_1][library_1], dict): - # Generate full list of provider ids for episodes in watch_list_2 to easily compare if they exist in watch_list_1 - - for show_key_1 in watched_list_1[user_1][library_1].keys(): - show_key_dict = dict(show_key_1) - for season in watched_list_1[user_1][library_1][show_key_1]: - for episode in watched_list_1[user_1][library_1][ - show_key_1 - ][season]: - episode_found = False - for episode_key, episode_value in episode.items(): - # If episode_key and episode_value are in episode_watched_list_2_keys_dict exactly, then remove from watch_list_1 - if episode_key == "locations": - if ( - "locations" - in episode_watched_list_2_keys_dict.keys() - ): - for location in episode_value: - if ( - location - in episode_watched_list_2_keys_dict[ - "locations" - ] - ): - episode_found = True - break - - else: - if ( - episode_key - in episode_watched_list_2_keys_dict.keys() - ): - if ( - episode_value - in episode_watched_list_2_keys_dict[ - episode_key - ] - ): - episode_found = True - - if episode_found: - if ( - episode - in modified_watched_list_1[user_1][ - library_1 - ][show_key_1][season] - ): - logger( - f"Removing {episode} from {show_key_dict['title']}", - 3, - ) - modified_watched_list_1[user_1][ - library_1 - ][show_key_1][season].remove(episode) - break - - # Remove empty seasons - if ( - len( - modified_watched_list_1[user_1][library_1][ - show_key_1 - ][season] - ) - == 0 - ): - if ( - season - in modified_watched_list_1[user_1][library_1][ - show_key_1 - ] - ): - logger( - f"Removing {season} from {show_key_dict['title']} because it is empty", - 3, - ) - del modified_watched_list_1[user_1][library_1][ - show_key_1 - ][season] - - # If the show is empty, remove the show - if ( - len( - modified_watched_list_1[user_1][library_1][ - show_key_1 - ] - ) - == 0 - ): - if ( - show_key_1 - in modified_watched_list_1[user_1][library_1] - ): - logger( - f"Removing {show_key_dict['title']} from {library_1} because it is empty", - 1, - ) - del modified_watched_list_1[user_1][library_1][ - show_key_1 - ] - - for user_1 in watched_list_1: - for library_1 in watched_list_1[user_1]: - if library_1 in modified_watched_list_1[user_1]: - # If library is empty then remove it - if len(modified_watched_list_1[user_1][library_1]) == 0: - logger(f"Removing {library_1} from {user_1} because it is empty", 1) - del modified_watched_list_1[user_1][library_1] - - if user_1 in modified_watched_list_1: - # If user is empty delete user - if len(modified_watched_list_1[user_1]) == 0: - logger(f"Removing {user_1} from watched list 1 because it is empty", 1) - del modified_watched_list_1[user_1] - - return modified_watched_list_1 - - def setup_black_white_lists( blacklist_library: str, whitelist_library: str,