Merge pull request #49 from luigi311/seperate_functions

Seperate functions
This commit is contained in:
Luigi311
2023-03-08 23:55:53 -07:00
committed by GitHub
14 changed files with 1491 additions and 802 deletions

View File

@@ -81,7 +81,6 @@ JELLYFIN_BASEURL = "http://localhost:8096, http://nas:8096"
JELLYFIN_TOKEN = "SuperSecretToken, SuperSecretToken2" JELLYFIN_TOKEN = "SuperSecretToken, SuperSecretToken2"
``` ```
## Installation ## Installation
### Baremetal ### Baremetal

139
src/black_white.py Normal file
View File

@@ -0,0 +1,139 @@
from src.functions import logger, search_mapping
def setup_black_white_lists(
blacklist_library: str,
whitelist_library: str,
blacklist_library_type: str,
whitelist_library_type: str,
blacklist_users: str,
whitelist_users: str,
library_mapping=None,
user_mapping=None,
):
blacklist_library, blacklist_library_type, blacklist_users = setup_black_lists(
blacklist_library,
blacklist_library_type,
blacklist_users,
library_mapping,
user_mapping,
)
whitelist_library, whitelist_library_type, whitelist_users = setup_white_lists(
whitelist_library,
whitelist_library_type,
whitelist_users,
library_mapping,
user_mapping,
)
return (
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
blacklist_users,
whitelist_users,
)
def setup_black_lists(
blacklist_library,
blacklist_library_type,
blacklist_users,
library_mapping=None,
user_mapping=None,
):
if blacklist_library:
if len(blacklist_library) > 0:
blacklist_library = blacklist_library.split(",")
blacklist_library = [x.strip() for x in blacklist_library]
if library_mapping:
temp_library = []
for library in blacklist_library:
library_other = search_mapping(library_mapping, library)
if library_other:
temp_library.append(library_other)
blacklist_library = blacklist_library + temp_library
else:
blacklist_library = []
logger(f"Blacklist Library: {blacklist_library}", 1)
if blacklist_library_type:
if len(blacklist_library_type) > 0:
blacklist_library_type = blacklist_library_type.split(",")
blacklist_library_type = [x.lower().strip() for x in blacklist_library_type]
else:
blacklist_library_type = []
logger(f"Blacklist Library Type: {blacklist_library_type}", 1)
if blacklist_users:
if len(blacklist_users) > 0:
blacklist_users = blacklist_users.split(",")
blacklist_users = [x.lower().strip() for x in blacklist_users]
if user_mapping:
temp_users = []
for user in blacklist_users:
user_other = search_mapping(user_mapping, user)
if user_other:
temp_users.append(user_other)
blacklist_users = blacklist_users + temp_users
else:
blacklist_users = []
logger(f"Blacklist Users: {blacklist_users}", 1)
return blacklist_library, blacklist_library_type, blacklist_users
def setup_white_lists(
whitelist_library,
whitelist_library_type,
whitelist_users,
library_mapping=None,
user_mapping=None,
):
if whitelist_library:
if len(whitelist_library) > 0:
whitelist_library = whitelist_library.split(",")
whitelist_library = [x.strip() for x in whitelist_library]
if library_mapping:
temp_library = []
for library in whitelist_library:
library_other = search_mapping(library_mapping, library)
if library_other:
temp_library.append(library_other)
whitelist_library = whitelist_library + temp_library
else:
whitelist_library = []
logger(f"Whitelist Library: {whitelist_library}", 1)
if whitelist_library_type:
if len(whitelist_library_type) > 0:
whitelist_library_type = whitelist_library_type.split(",")
whitelist_library_type = [x.lower().strip() for x in whitelist_library_type]
else:
whitelist_library_type = []
logger(f"Whitelist Library Type: {whitelist_library_type}", 1)
if whitelist_users:
if len(whitelist_users) > 0:
whitelist_users = whitelist_users.split(",")
whitelist_users = [x.lower().strip() for x in whitelist_users]
if user_mapping:
temp_users = []
for user in whitelist_users:
user_other = search_mapping(user_mapping, user)
if user_other:
temp_users.append(user_other)
whitelist_users = whitelist_users + temp_users
else:
whitelist_users = []
else:
whitelist_users = []
logger(f"Whitelist Users: {whitelist_users}", 1)
return whitelist_library, whitelist_library_type, whitelist_users

View File

@@ -1,4 +1,4 @@
import os, copy import os
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -55,406 +55,6 @@ def search_mapping(dictionary: dict, key_value: str):
return None return None
def setup_black_white_lists(
blacklist_library: str,
whitelist_library: str,
blacklist_library_type: str,
whitelist_library_type: str,
blacklist_users: str,
whitelist_users: str,
library_mapping=None,
user_mapping=None,
):
if blacklist_library:
if len(blacklist_library) > 0:
blacklist_library = blacklist_library.split(",")
blacklist_library = [x.strip() for x in blacklist_library]
if library_mapping:
temp_library = []
for library in blacklist_library:
library_other = search_mapping(library_mapping, library)
if library_other:
temp_library.append(library_other)
blacklist_library = blacklist_library + temp_library
else:
blacklist_library = []
logger(f"Blacklist Library: {blacklist_library}", 1)
if whitelist_library:
if len(whitelist_library) > 0:
whitelist_library = whitelist_library.split(",")
whitelist_library = [x.strip() for x in whitelist_library]
if library_mapping:
temp_library = []
for library in whitelist_library:
library_other = search_mapping(library_mapping, library)
if library_other:
temp_library.append(library_other)
whitelist_library = whitelist_library + temp_library
else:
whitelist_library = []
logger(f"Whitelist Library: {whitelist_library}", 1)
if blacklist_library_type:
if len(blacklist_library_type) > 0:
blacklist_library_type = blacklist_library_type.split(",")
blacklist_library_type = [x.lower().strip() for x in blacklist_library_type]
else:
blacklist_library_type = []
logger(f"Blacklist Library Type: {blacklist_library_type}", 1)
if whitelist_library_type:
if len(whitelist_library_type) > 0:
whitelist_library_type = whitelist_library_type.split(",")
whitelist_library_type = [x.lower().strip() for x in whitelist_library_type]
else:
whitelist_library_type = []
logger(f"Whitelist Library Type: {whitelist_library_type}", 1)
if blacklist_users:
if len(blacklist_users) > 0:
blacklist_users = blacklist_users.split(",")
blacklist_users = [x.lower().strip() for x in blacklist_users]
if user_mapping:
temp_users = []
for user in blacklist_users:
user_other = search_mapping(user_mapping, user)
if user_other:
temp_users.append(user_other)
blacklist_users = blacklist_users + temp_users
else:
blacklist_users = []
logger(f"Blacklist Users: {blacklist_users}", 1)
if whitelist_users:
if len(whitelist_users) > 0:
whitelist_users = whitelist_users.split(",")
whitelist_users = [x.lower().strip() for x in whitelist_users]
if user_mapping:
temp_users = []
for user in whitelist_users:
user_other = search_mapping(user_mapping, user)
if user_other:
temp_users.append(user_other)
whitelist_users = whitelist_users + temp_users
else:
whitelist_users = []
else:
whitelist_users = []
logger(f"Whitelist Users: {whitelist_users}", 1)
return (
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
blacklist_users,
whitelist_users,
)
def check_skip_logic(
library_title,
library_type,
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
library_mapping,
):
skip_reason = None
if isinstance(library_type, (list, tuple, set)):
for library_type_item in library_type:
if library_type_item.lower() in blacklist_library_type:
skip_reason = "is blacklist_library_type"
else:
if library_type.lower() in blacklist_library_type:
skip_reason = "is blacklist_library_type"
if library_title.lower() in [x.lower() for x in blacklist_library]:
skip_reason = "is blacklist_library"
library_other = None
if library_mapping:
library_other = search_mapping(library_mapping, library_title)
if library_other:
if library_other.lower() in [x.lower() for x in blacklist_library]:
skip_reason = "is blacklist_library"
if len(whitelist_library_type) > 0:
if isinstance(library_type, (list, tuple, set)):
for library_type_item in library_type:
if library_type_item.lower() not in whitelist_library_type:
skip_reason = "is not whitelist_library_type"
else:
if library_type.lower() not in whitelist_library_type:
skip_reason = "is not whitelist_library_type"
# if whitelist is not empty and library is not in whitelist
if len(whitelist_library) > 0:
if library_title.lower() not in [x.lower() for x in whitelist_library]:
skip_reason = "is not whitelist_library"
if library_other:
if library_other.lower() not in [x.lower() for x in whitelist_library]:
skip_reason = "is not whitelist_library"
return skip_reason
def generate_library_guids_dict(user_list: dict):
show_output_dict = {}
episode_output_dict = {}
movies_output_dict = {}
# Handle the case where user_list is empty or does not contain the expected keys and values
if not user_list:
return show_output_dict, episode_output_dict, movies_output_dict
try:
show_output_keys = user_list.keys()
show_output_keys = [dict(x) for x in list(show_output_keys)]
for show_key in show_output_keys:
for provider_key, provider_value in show_key.items():
# Skip title
if provider_key.lower() == "title":
continue
if provider_key.lower() not in show_output_dict:
show_output_dict[provider_key.lower()] = []
if provider_key.lower() == "locations":
for show_location in provider_value:
show_output_dict[provider_key.lower()].append(show_location)
else:
show_output_dict[provider_key.lower()].append(
provider_value.lower()
)
except Exception:
logger("Generating show_output_dict failed, skipping", 1)
try:
for show in user_list:
for season in user_list[show]:
for episode in user_list[show][season]:
for episode_key, episode_value in episode.items():
if episode_key.lower() not in episode_output_dict:
episode_output_dict[episode_key.lower()] = []
if episode_key == "locations":
for episode_location in episode_value:
episode_output_dict[episode_key.lower()].append(
episode_location
)
else:
episode_output_dict[episode_key.lower()].append(
episode_value.lower()
)
except Exception:
logger("Generating episode_output_dict failed, skipping", 1)
try:
for movie in user_list:
for movie_key, movie_value in movie.items():
if movie_key.lower() not in movies_output_dict:
movies_output_dict[movie_key.lower()] = []
if movie_key == "locations":
for movie_location in movie_value:
movies_output_dict[movie_key.lower()].append(movie_location)
else:
movies_output_dict[movie_key.lower()].append(movie_value.lower())
except Exception:
logger("Generating movies_output_dict failed, skipping", 1)
return show_output_dict, episode_output_dict, movies_output_dict
def combine_watched_dicts(dicts: list):
combined_dict = {}
for single_dict in dicts:
for key, value in single_dict.items():
if key not in combined_dict:
combined_dict[key] = {}
for subkey, subvalue in value.items():
if subkey in combined_dict[key]:
# If the subkey already exists in the combined dictionary,
# check if the values are different and raise an exception if they are
if combined_dict[key][subkey] != subvalue:
raise ValueError(
f"Conflicting values for subkey '{subkey}' under key '{key}'"
)
else:
# If the subkey does not exist in the combined dictionary, add it
combined_dict[key][subkey] = subvalue
return combined_dict
def cleanup_watched(
watched_list_1, watched_list_2, user_mapping=None, library_mapping=None
):
modified_watched_list_1 = copy.deepcopy(watched_list_1)
# remove entries from watched_list_1 that are in watched_list_2
for user_1 in watched_list_1:
user_other = None
if user_mapping:
user_other = search_mapping(user_mapping, user_1)
user_2 = get_other(watched_list_2, user_1, user_other)
if user_2 is None:
continue
for library_1 in watched_list_1[user_1]:
library_other = None
if library_mapping:
library_other = search_mapping(library_mapping, library_1)
library_2 = get_other(watched_list_2[user_2], library_1, library_other)
if library_2 is None:
continue
(
_,
episode_watched_list_2_keys_dict,
movies_watched_list_2_keys_dict,
) = generate_library_guids_dict(watched_list_2[user_2][library_2])
# Movies
if isinstance(watched_list_1[user_1][library_1], list):
for movie in watched_list_1[user_1][library_1]:
if is_movie_in_dict(movie, movies_watched_list_2_keys_dict):
logger(f"Removing {movie} from {library_1}", 3)
modified_watched_list_1[user_1][library_1].remove(movie)
# TV Shows
elif isinstance(watched_list_1[user_1][library_1], dict):
for show_key_1 in watched_list_1[user_1][library_1].keys():
show_key_dict = dict(show_key_1)
for season in watched_list_1[user_1][library_1][show_key_1]:
for episode in watched_list_1[user_1][library_1][show_key_1][
season
]:
if is_episode_in_dict(
episode, episode_watched_list_2_keys_dict
):
if (
episode
in modified_watched_list_1[user_1][library_1][
show_key_1
][season]
):
logger(
f"Removing {episode} from {show_key_dict['title']}",
3,
)
modified_watched_list_1[user_1][library_1][
show_key_1
][season].remove(episode)
# Remove empty seasons
if (
len(
modified_watched_list_1[user_1][library_1][show_key_1][
season
]
)
== 0
):
if (
season
in modified_watched_list_1[user_1][library_1][
show_key_1
]
):
logger(
f"Removing {season} from {show_key_dict['title']} because it is empty",
3,
)
del modified_watched_list_1[user_1][library_1][
show_key_1
][season]
# Remove empty shows
if len(modified_watched_list_1[user_1][library_1][show_key_1]) == 0:
if show_key_1 in modified_watched_list_1[user_1][library_1]:
logger(
f"Removing {show_key_dict['title']} because it is empty",
3,
)
del modified_watched_list_1[user_1][library_1][show_key_1]
for user_1 in watched_list_1:
for library_1 in watched_list_1[user_1]:
if library_1 in modified_watched_list_1[user_1]:
# If library is empty then remove it
if len(modified_watched_list_1[user_1][library_1]) == 0:
logger(f"Removing {library_1} from {user_1} because it is empty", 1)
del modified_watched_list_1[user_1][library_1]
if user_1 in modified_watched_list_1:
# If user is empty delete user
if len(modified_watched_list_1[user_1]) == 0:
logger(f"Removing {user_1} from watched list 1 because it is empty", 1)
del modified_watched_list_1[user_1]
return modified_watched_list_1
def get_other(watched_list_2, object_1, object_2):
if object_1 in watched_list_2:
return object_1
elif object_2 in watched_list_2:
return object_2
else:
logger(f"{object_1} and {object_2} not found in watched list 2", 1)
return None
def is_movie_in_dict(movie, movies_watched_list_2_keys_dict):
# Iterate through the keys and values of the movie dictionary
for movie_key, movie_value in movie.items():
# If the key is "locations", check if the "locations" key is present in the movies_watched_list_2_keys_dict dictionary
if movie_key == "locations":
if "locations" in movies_watched_list_2_keys_dict.keys():
# Iterate through the locations in the movie dictionary
for location in movie_value:
# If the location is in the movies_watched_list_2_keys_dict dictionary, return True
if location in movies_watched_list_2_keys_dict["locations"]:
return True
# If the key is not "locations", check if the movie_key is present in the movies_watched_list_2_keys_dict dictionary
else:
if movie_key in movies_watched_list_2_keys_dict.keys():
# If the movie_value is in the movies_watched_list_2_keys_dict dictionary, return True
if movie_value in movies_watched_list_2_keys_dict[movie_key]:
return True
# If the loop completes without finding a match, return False
return False
def is_episode_in_dict(episode, episode_watched_list_2_keys_dict):
# Iterate through the keys and values of the episode dictionary
for episode_key, episode_value in episode.items():
# If the key is "locations", check if the "locations" key is present in the episode_watched_list_2_keys_dict dictionary
if episode_key == "locations":
if "locations" in episode_watched_list_2_keys_dict.keys():
# Iterate through the locations in the episode dictionary
for location in episode_value:
# If the location is in the episode_watched_list_2_keys_dict dictionary, return True
if location in episode_watched_list_2_keys_dict["locations"]:
return True
# If the key is not "locations", check if the episode_key is present in the episode_watched_list_2_keys_dict dictionary
else:
if episode_key in episode_watched_list_2_keys_dict.keys():
# If the episode_value is in the episode_watched_list_2_keys_dict dictionary, return True
if episode_value in episode_watched_list_2_keys_dict[episode_key]:
return True
# If the loop completes without finding a match, return False
return False
def future_thread_executor(args: list, workers: int = -1): def future_thread_executor(args: list, workers: int = -1):
futures_list = [] futures_list = []
results = [] results = []

View File

@@ -1,9 +1,14 @@
import asyncio, aiohttp, traceback import asyncio, aiohttp, traceback
from src.functions import ( from src.functions import (
logger, logger,
search_mapping, search_mapping,
)
from src.library import (
check_skip_logic, check_skip_logic,
generate_library_guids_dict, generate_library_guids_dict,
)
from src.watched import (
combine_watched_dicts, combine_watched_dicts,
) )
@@ -393,7 +398,7 @@ class Jellyfin:
if skip_reason: if skip_reason:
logger( logger(
f"Jellyfin: Skipping library {library_title} {skip_reason}", f"Jellyfin: Skipping library {library_title}: {skip_reason}",
1, 1,
) )
continue continue

212
src/library.py Normal file
View File

@@ -0,0 +1,212 @@
from src.functions import (
logger,
search_mapping,
)
def check_skip_logic(
library_title,
library_type,
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
library_mapping=None,
):
skip_reason = None
library_other = None
if library_mapping:
library_other = search_mapping(library_mapping, library_title)
skip_reason_black = check_blacklist_logic(
library_title,
library_type,
blacklist_library,
blacklist_library_type,
library_other,
)
skip_reason_white = check_whitelist_logic(
library_title,
library_type,
whitelist_library,
whitelist_library_type,
library_other,
)
# Combine skip reasons
if skip_reason_black:
skip_reason = skip_reason_black
if skip_reason_white:
if skip_reason:
skip_reason = skip_reason + " and " + skip_reason_white
else:
skip_reason = skip_reason_white
return skip_reason
def check_blacklist_logic(
library_title,
library_type,
blacklist_library,
blacklist_library_type,
library_other=None,
):
skip_reason = None
if isinstance(library_type, (list, tuple, set)):
for library_type_item in library_type:
if library_type_item.lower() in blacklist_library_type:
skip_reason = f"{library_type_item} is in blacklist_library_type"
else:
if library_type.lower() in blacklist_library_type:
skip_reason = f"{library_type} is in blacklist_library_type"
if library_title.lower() in [x.lower() for x in blacklist_library]:
if skip_reason:
skip_reason = (
skip_reason + " and " + f"{library_title} is in blacklist_library"
)
else:
skip_reason = f"{library_title} is in blacklist_library"
if library_other:
if library_other.lower() in [x.lower() for x in blacklist_library]:
if skip_reason:
skip_reason = (
skip_reason + " and " + f"{library_other} is in blacklist_library"
)
else:
skip_reason = f"{library_other} is in blacklist_library"
return skip_reason
def check_whitelist_logic(
library_title,
library_type,
whitelist_library,
whitelist_library_type,
library_other=None,
):
skip_reason = None
if len(whitelist_library_type) > 0:
if isinstance(library_type, (list, tuple, set)):
for library_type_item in library_type:
if library_type_item.lower() not in whitelist_library_type:
skip_reason = (
f"{library_type_item} is not in whitelist_library_type"
)
else:
if library_type.lower() not in whitelist_library_type:
skip_reason = f"{library_type} is not in whitelist_library_type"
# if whitelist is not empty and library is not in whitelist
if len(whitelist_library) > 0:
if library_other:
if library_title.lower() not in [
x.lower() for x in whitelist_library
] and library_other.lower() not in [x.lower() for x in whitelist_library]:
if skip_reason:
skip_reason = (
skip_reason
+ " and "
+ f"{library_title} is not in whitelist_library"
)
else:
skip_reason = f"{library_title} is not in whitelist_library"
else:
if library_title.lower() not in [x.lower() for x in whitelist_library]:
if skip_reason:
skip_reason = (
skip_reason
+ " and "
+ f"{library_title} is not in whitelist_library"
)
else:
skip_reason = f"{library_title} is not in whitelist_library"
return skip_reason
def show_title_dict(user_list: dict):
try:
show_output_dict = {}
show_output_keys = user_list.keys()
show_output_keys = [dict(x) for x in list(show_output_keys)]
for show_key in show_output_keys:
for provider_key, provider_value in show_key.items():
# Skip title
if provider_key.lower() == "title":
continue
if provider_key.lower() not in show_output_dict:
show_output_dict[provider_key.lower()] = []
if provider_key.lower() == "locations":
for show_location in provider_value:
show_output_dict[provider_key.lower()].append(show_location)
else:
show_output_dict[provider_key.lower()].append(
provider_value.lower()
)
return show_output_dict
except Exception:
logger("Generating show_output_dict failed, skipping", 1)
return {}
def episode_title_dict(user_list: dict):
try:
episode_output_dict = {}
for show in user_list:
for season in user_list[show]:
for episode in user_list[show][season]:
for episode_key, episode_value in episode.items():
if episode_key.lower() not in episode_output_dict:
episode_output_dict[episode_key.lower()] = []
if episode_key == "locations":
for episode_location in episode_value:
episode_output_dict[episode_key.lower()].append(
episode_location
)
else:
episode_output_dict[episode_key.lower()].append(
episode_value.lower()
)
return episode_output_dict
except Exception:
logger("Generating episode_output_dict failed, skipping", 1)
return {}
def movies_title_dict(user_list: dict):
try:
movies_output_dict = {}
for movie in user_list:
for movie_key, movie_value in movie.items():
if movie_key.lower() not in movies_output_dict:
movies_output_dict[movie_key.lower()] = []
if movie_key == "locations":
for movie_location in movie_value:
movies_output_dict[movie_key.lower()].append(movie_location)
else:
movies_output_dict[movie_key.lower()].append(movie_value.lower())
return movies_output_dict
except Exception:
logger("Generating movies_output_dict failed, skipping", 1)
return {}
def generate_library_guids_dict(user_list: dict):
# Handle the case where user_list is empty or does not contain the expected keys and values
if not user_list:
return {}, {}, {}
show_output_dict = show_title_dict(user_list)
episode_output_dict = episode_title_dict(user_list)
movies_output_dict = movies_title_dict(user_list)
return show_output_dict, episode_output_dict, movies_output_dict

View File

@@ -5,10 +5,18 @@ from time import sleep, perf_counter
from src.functions import ( from src.functions import (
logger, logger,
str_to_bool, str_to_bool,
search_mapping,
cleanup_watched,
setup_black_white_lists,
) )
from src.users import (
generate_user_list,
combine_user_lists,
filter_user_lists,
generate_server_users,
)
from src.watched import (
cleanup_watched,
)
from src.black_white import setup_black_white_lists
from src.plex import Plex from src.plex import Plex
from src.jellyfin import Jellyfin from src.jellyfin import Jellyfin
@@ -18,106 +26,27 @@ load_dotenv(override=True)
def setup_users( def setup_users(
server_1, server_2, blacklist_users, whitelist_users, user_mapping=None server_1, server_2, blacklist_users, whitelist_users, user_mapping=None
): ):
# generate list of users from server 1 and server 2 server_1_users = generate_user_list(server_1)
server_1_type = server_1[0] server_2_users = generate_user_list(server_2)
server_1_connection = server_1[1]
server_2_type = server_2[0]
server_2_connection = server_2[1]
logger(f"Server 1: {server_1_type} {server_1_connection}", 0)
logger(f"Server 2: {server_2_type} {server_2_connection}", 0)
server_1_users = []
if server_1_type == "plex":
server_1_users = [x.title.lower() for x in server_1_connection.users]
elif server_1_type == "jellyfin":
server_1_users = [key.lower() for key in server_1_connection.users.keys()]
server_2_users = []
if server_2_type == "plex":
server_2_users = [x.title.lower() for x in server_2_connection.users]
elif server_2_type == "jellyfin":
server_2_users = [key.lower() for key in server_2_connection.users.keys()]
# combined list of overlapping users from plex and jellyfin
users = {}
for server_1_user in server_1_users:
if user_mapping:
jellyfin_plex_mapped_user = search_mapping(user_mapping, server_1_user)
if jellyfin_plex_mapped_user:
users[server_1_user] = jellyfin_plex_mapped_user
continue
if server_1_user in server_2_users:
users[server_1_user] = server_1_user
for server_2_user in server_2_users:
if user_mapping:
plex_jellyfin_mapped_user = search_mapping(user_mapping, server_2_user)
if plex_jellyfin_mapped_user:
users[plex_jellyfin_mapped_user] = server_2_user
continue
if server_2_user in server_1_users:
users[server_2_user] = server_2_user
users = combine_user_lists(server_1_users, server_2_users, user_mapping)
logger(f"User list that exist on both servers {users}", 1) logger(f"User list that exist on both servers {users}", 1)
users_filtered = {} users_filtered = filter_user_lists(users, blacklist_users, whitelist_users)
for user in users:
# whitelist_user is not empty and user lowercase is not in whitelist lowercase
if len(whitelist_users) > 0:
if user not in whitelist_users and users[user] not in whitelist_users:
logger(f"{user} or {users[user]} is not in whitelist", 1)
continue
if user not in blacklist_users and users[user] not in blacklist_users:
users_filtered[user] = users[user]
logger(f"Filtered user list {users_filtered}", 1) logger(f"Filtered user list {users_filtered}", 1)
if server_1_type == "plex": output_server_1_users = generate_server_users(server_1, users_filtered)
output_server_1_users = [] output_server_2_users = generate_server_users(server_2, users_filtered)
for plex_user in server_1_connection.users:
if (
plex_user.title.lower() in users_filtered.keys()
or plex_user.title.lower() in users_filtered.values()
):
output_server_1_users.append(plex_user)
elif server_1_type == "jellyfin":
output_server_1_users = {}
for jellyfin_user, jellyfin_id in server_1_connection.users.items():
if (
jellyfin_user.lower() in users_filtered.keys()
or jellyfin_user.lower() in users_filtered.values()
):
output_server_1_users[jellyfin_user] = jellyfin_id
if server_2_type == "plex": # Check if users is none or empty
output_server_2_users = [] if output_server_1_users is None or len(output_server_1_users) == 0:
for plex_user in server_2_connection.users:
if (
plex_user.title.lower() in users_filtered.keys()
or plex_user.title.lower() in users_filtered.values()
):
output_server_2_users.append(plex_user)
elif server_2_type == "jellyfin":
output_server_2_users = {}
for jellyfin_user, jellyfin_id in server_2_connection.users.items():
if (
jellyfin_user.lower() in users_filtered.keys()
or jellyfin_user.lower() in users_filtered.values()
):
output_server_2_users[jellyfin_user] = jellyfin_id
if len(output_server_1_users) == 0:
raise Exception( raise Exception(
f"No users found for server 1 {server_1_type}, users found {users}, filtered users {users_filtered}, server 1 users {server_1_connection.users}" f"No users found for server 1 {server_1[0]}, users found {users}, filtered users {users_filtered}, server 1 users {server_1[1].users}"
) )
if len(output_server_2_users) == 0: if output_server_2_users is None or len(output_server_2_users) == 0:
raise Exception( raise Exception(
f"No users found for server 2 {server_2_type}, users found {users} filtered users {users_filtered}, server 2 users {server_2_connection.users}" f"No users found for server 2 {server_2[0]}, users found {users} filtered users {users_filtered}, server 2 users {server_2[1].users}"
) )
logger(f"Server 1 users: {output_server_1_users}", 1) logger(f"Server 1 users: {output_server_1_users}", 1)

View File

@@ -7,9 +7,11 @@ from plexapi.myplex import MyPlexAccount
from src.functions import ( from src.functions import (
logger, logger,
search_mapping, search_mapping,
future_thread_executor,
)
from src.library import (
check_skip_logic, check_skip_logic,
generate_library_guids_dict, generate_library_guids_dict,
future_thread_executor,
) )
@@ -50,7 +52,7 @@ def get_user_library_watched_show(show):
m = re.match(r"(.*)://(.*)", guid.id) m = re.match(r"(.*)://(.*)", guid.id)
guid_source, guid_id = m.group(1).lower(), m.group(2) guid_source, guid_id = m.group(1).lower(), m.group(2)
episode_guids_temp[guid_source] = guid_id episode_guids_temp[guid_source] = guid_id
except: except Exception:
logger( logger(
f"Plex: Failed to get guids for {episode.title} in {show.title}, Using location only", f"Plex: Failed to get guids for {episode.title} in {show.title}, Using location only",
1, 1,
@@ -67,7 +69,7 @@ def get_user_library_watched_show(show):
return show_guids, episode_guids return show_guids, episode_guids
except Exception as e: except Exception:
return {}, {} return {}, {}
@@ -385,7 +387,7 @@ class Plex:
if skip_reason: if skip_reason:
logger( logger(
f"Plex: Skipping library {library_title} {skip_reason}", 1 f"Plex: Skipping library {library_title}: {skip_reason}", 1
) )
continue continue

83
src/users.py Normal file
View File

@@ -0,0 +1,83 @@
from src.functions import (
logger,
search_mapping,
)
def generate_user_list(server):
# generate list of users from server 1 and server 2
server_type = server[0]
server_connection = server[1]
server_users = []
if server_type == "plex":
server_users = [x.title.lower() for x in server_connection.users]
elif server_type == "jellyfin":
server_users = [key.lower() for key in server_connection.users.keys()]
return server_users
def combine_user_lists(server_1_users, server_2_users, user_mapping):
# combined list of overlapping users from plex and jellyfin
users = {}
for server_1_user in server_1_users:
if user_mapping:
mapped_user = search_mapping(user_mapping, server_1_user)
if mapped_user in server_2_users:
users[server_1_user] = mapped_user
continue
if server_1_user in server_2_users:
users[server_1_user] = server_1_user
for server_2_user in server_2_users:
if user_mapping:
mapped_user = search_mapping(user_mapping, server_2_user)
if mapped_user in server_1_users:
users[mapped_user] = server_2_user
continue
if server_2_user in server_1_users:
users[server_2_user] = server_2_user
return users
def filter_user_lists(users, blacklist_users, whitelist_users):
users_filtered = {}
for user in users:
# whitelist_user is not empty and user lowercase is not in whitelist lowercase
if len(whitelist_users) > 0:
if user not in whitelist_users and users[user] not in whitelist_users:
logger(f"{user} or {users[user]} is not in whitelist", 1)
continue
if user not in blacklist_users and users[user] not in blacklist_users:
users_filtered[user] = users[user]
return users_filtered
def generate_server_users(server, users):
server_users = None
if server[0] == "plex":
server_users = []
for plex_user in server[1].users:
if (
plex_user.title.lower() in users.keys()
or plex_user.title.lower() in users.values()
):
server_users.append(plex_user)
elif server[0] == "jellyfin":
server_users = {}
for jellyfin_user, jellyfin_id in server[1].users.items():
if (
jellyfin_user.lower() in users.keys()
or jellyfin_user.lower() in users.values()
):
server_users[jellyfin_user] = jellyfin_id
return server_users

192
src/watched.py Normal file
View File

@@ -0,0 +1,192 @@
import copy
from src.functions import (
logger,
search_mapping,
)
from src.library import generate_library_guids_dict
def combine_watched_dicts(dicts: list):
combined_dict = {}
for single_dict in dicts:
for key, value in single_dict.items():
if key not in combined_dict:
combined_dict[key] = {}
for subkey, subvalue in value.items():
if subkey in combined_dict[key]:
# If the subkey already exists in the combined dictionary,
# check if the values are different and raise an exception if they are
if combined_dict[key][subkey] != subvalue:
raise ValueError(
f"Conflicting values for subkey '{subkey}' under key '{key}'"
)
else:
# If the subkey does not exist in the combined dictionary, add it
combined_dict[key][subkey] = subvalue
return combined_dict
def cleanup_watched(
watched_list_1, watched_list_2, user_mapping=None, library_mapping=None
):
modified_watched_list_1 = copy.deepcopy(watched_list_1)
# remove entries from watched_list_1 that are in watched_list_2
for user_1 in watched_list_1:
user_other = None
if user_mapping:
user_other = search_mapping(user_mapping, user_1)
user_2 = get_other(watched_list_2, user_1, user_other)
if user_2 is None:
continue
for library_1 in watched_list_1[user_1]:
library_other = None
if library_mapping:
library_other = search_mapping(library_mapping, library_1)
library_2 = get_other(watched_list_2[user_2], library_1, library_other)
if library_2 is None:
continue
(
_,
episode_watched_list_2_keys_dict,
movies_watched_list_2_keys_dict,
) = generate_library_guids_dict(watched_list_2[user_2][library_2])
# Movies
if isinstance(watched_list_1[user_1][library_1], list):
for movie in watched_list_1[user_1][library_1]:
if is_movie_in_dict(movie, movies_watched_list_2_keys_dict):
logger(f"Removing {movie} from {library_1}", 3)
modified_watched_list_1[user_1][library_1].remove(movie)
# TV Shows
elif isinstance(watched_list_1[user_1][library_1], dict):
for show_key_1 in watched_list_1[user_1][library_1].keys():
show_key_dict = dict(show_key_1)
for season in watched_list_1[user_1][library_1][show_key_1]:
for episode in watched_list_1[user_1][library_1][show_key_1][
season
]:
if is_episode_in_dict(
episode, episode_watched_list_2_keys_dict
):
if (
episode
in modified_watched_list_1[user_1][library_1][
show_key_1
][season]
):
logger(
f"Removing {episode} from {show_key_dict['title']}",
3,
)
modified_watched_list_1[user_1][library_1][
show_key_1
][season].remove(episode)
# Remove empty seasons
if (
len(
modified_watched_list_1[user_1][library_1][show_key_1][
season
]
)
== 0
):
if (
season
in modified_watched_list_1[user_1][library_1][
show_key_1
]
):
logger(
f"Removing {season} from {show_key_dict['title']} because it is empty",
3,
)
del modified_watched_list_1[user_1][library_1][
show_key_1
][season]
# Remove empty shows
if len(modified_watched_list_1[user_1][library_1][show_key_1]) == 0:
if show_key_1 in modified_watched_list_1[user_1][library_1]:
logger(
f"Removing {show_key_dict['title']} because it is empty",
3,
)
del modified_watched_list_1[user_1][library_1][show_key_1]
for user_1 in watched_list_1:
for library_1 in watched_list_1[user_1]:
if library_1 in modified_watched_list_1[user_1]:
# If library is empty then remove it
if len(modified_watched_list_1[user_1][library_1]) == 0:
logger(f"Removing {library_1} from {user_1} because it is empty", 1)
del modified_watched_list_1[user_1][library_1]
if user_1 in modified_watched_list_1:
# If user is empty delete user
if len(modified_watched_list_1[user_1]) == 0:
logger(f"Removing {user_1} from watched list 1 because it is empty", 1)
del modified_watched_list_1[user_1]
return modified_watched_list_1
def get_other(watched_list, object_1, object_2):
if object_1 in watched_list:
return object_1
elif object_2 in watched_list:
return object_2
else:
logger(f"{object_1} and {object_2} not found in watched list 2", 1)
return None
def is_movie_in_dict(movie, movies_watched_list_2_keys_dict):
# Iterate through the keys and values of the movie dictionary
for movie_key, movie_value in movie.items():
# If the key is "locations", check if the "locations" key is present in the movies_watched_list_2_keys_dict dictionary
if movie_key == "locations":
if "locations" in movies_watched_list_2_keys_dict.keys():
# Iterate through the locations in the movie dictionary
for location in movie_value:
# If the location is in the movies_watched_list_2_keys_dict dictionary, return True
if location in movies_watched_list_2_keys_dict["locations"]:
return True
# If the key is not "locations", check if the movie_key is present in the movies_watched_list_2_keys_dict dictionary
else:
if movie_key in movies_watched_list_2_keys_dict.keys():
# If the movie_value is in the movies_watched_list_2_keys_dict dictionary, return True
if movie_value in movies_watched_list_2_keys_dict[movie_key]:
return True
# If the loop completes without finding a match, return False
return False
def is_episode_in_dict(episode, episode_watched_list_2_keys_dict):
# Iterate through the keys and values of the episode dictionary
for episode_key, episode_value in episode.items():
# If the key is "locations", check if the "locations" key is present in the episode_watched_list_2_keys_dict dictionary
if episode_key == "locations":
if "locations" in episode_watched_list_2_keys_dict.keys():
# Iterate through the locations in the episode dictionary
for location in episode_value:
# If the location is in the episode_watched_list_2_keys_dict dictionary, return True
if location in episode_watched_list_2_keys_dict["locations"]:
return True
# If the key is not "locations", check if the episode_key is present in the episode_watched_list_2_keys_dict dictionary
else:
if episode_key in episode_watched_list_2_keys_dict.keys():
# If the episode_value is in the episode_watched_list_2_keys_dict dictionary, return True
if episode_value in episode_watched_list_2_keys_dict[episode_key]:
return True
# If the loop completes without finding a match, return False
return False

78
test/test_black_white.py Normal file
View File

@@ -0,0 +1,78 @@
import sys
import os
# getting the name of the directory
# where the this file is present.
current = os.path.dirname(os.path.realpath(__file__))
# Getting the parent directory name
# where the current directory is present.
parent = os.path.dirname(current)
# adding the parent directory to
# the sys.path.
sys.path.append(parent)
from src.black_white import setup_black_white_lists
def test_setup_black_white_lists():
# Simple
blacklist_library = "library1, library2"
whitelist_library = "library1, library2"
blacklist_library_type = "library_type1, library_type2"
whitelist_library_type = "library_type1, library_type2"
blacklist_users = "user1, user2"
whitelist_users = "user1, user2"
(
results_blacklist_library,
return_whitelist_library,
return_blacklist_library_type,
return_whitelist_library_type,
return_blacklist_users,
return_whitelist_users,
) = setup_black_white_lists(
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
blacklist_users,
whitelist_users,
)
assert results_blacklist_library == ["library1", "library2"]
assert return_whitelist_library == ["library1", "library2"]
assert return_blacklist_library_type == ["library_type1", "library_type2"]
assert return_whitelist_library_type == ["library_type1", "library_type2"]
assert return_blacklist_users == ["user1", "user2"]
assert return_whitelist_users == ["user1", "user2"]
# Library Mapping and user mapping
library_mapping = {"library1": "library3"}
user_mapping = {"user1": "user3"}
(
results_blacklist_library,
return_whitelist_library,
return_blacklist_library_type,
return_whitelist_library_type,
return_blacklist_users,
return_whitelist_users,
) = setup_black_white_lists(
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
blacklist_users,
whitelist_users,
library_mapping,
user_mapping,
)
assert results_blacklist_library == ["library1", "library2", "library3"]
assert return_whitelist_library == ["library1", "library2", "library3"]
assert return_blacklist_library_type == ["library_type1", "library_type2"]
assert return_whitelist_library_type == ["library_type1", "library_type2"]
assert return_blacklist_users == ["user1", "user2", "user3"]
assert return_whitelist_users == ["user1", "user2", "user3"]

302
test/test_library.py Normal file
View File

@@ -0,0 +1,302 @@
import sys
import os
# getting the name of the directory
# where the this file is present.
current = os.path.dirname(os.path.realpath(__file__))
# Getting the parent directory name
# where the current directory is present.
parent = os.path.dirname(current)
# adding the parent directory to
# the sys.path.
sys.path.append(parent)
from src.functions import (
search_mapping,
)
from src.library import (
check_skip_logic,
check_blacklist_logic,
check_whitelist_logic,
show_title_dict,
episode_title_dict,
movies_title_dict,
generate_library_guids_dict,
)
blacklist_library = ["TV Shows"]
whitelist_library = ["Movies"]
blacklist_library_type = ["episodes"]
whitelist_library_type = ["movies"]
library_mapping = {"Shows": "TV Shows", "Movie": "Movies"}
show_list = {
frozenset(
{
("locations", ("The Last of Us",)),
("tmdb", "100088"),
("imdb", "tt3581920"),
("tvdb", "392256"),
("title", "The Last of Us"),
}
): {
"Season 1": [
{
"imdb": "tt11957006",
"tmdb": "2181581",
"tvdb": "8444132",
"locations": (
"The Last of Us - S01E01 - When You're Lost in the Darkness WEBDL-1080p.mkv",
),
}
]
}
}
movie_list = [
{
"title": "Coco",
"imdb": "tt2380307",
"tmdb": "354912",
"locations": ("Coco (2017) Remux-2160p.mkv", "Coco (2017) Remux-1080p.mkv"),
}
]
show_titles = {
"imdb": ["tt3581920"],
"locations": ["The Last of Us"],
"tmdb": ["100088"],
"tvdb": ["392256"],
}
episode_titles = {
"imdb": ["tt11957006"],
"locations": [
"The Last of Us - S01E01 - When You're Lost in the Darkness WEBDL-1080p.mkv"
],
"tmdb": ["2181581"],
"tvdb": ["8444132"],
}
movie_titles = {
"imdb": ["tt2380307"],
"locations": ["Coco (2017) Remux-2160p.mkv", "Coco (2017) Remux-1080p.mkv"],
"title": ["coco"],
"tmdb": ["354912"],
}
def test_check_skip_logic():
# Failes
library_title = "Test"
library_type = "movies"
skip_reason = check_skip_logic(
library_title,
library_type,
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
library_mapping,
)
assert skip_reason == "Test is not in whitelist_library"
library_title = "Shows"
library_type = "episodes"
skip_reason = check_skip_logic(
library_title,
library_type,
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
library_mapping,
)
assert (
skip_reason
== "episodes is in blacklist_library_type and TV Shows is in blacklist_library and "
+ "episodes is not in whitelist_library_type and Shows is not in whitelist_library"
)
# Passes
library_title = "Movie"
library_type = "movies"
skip_reason = check_skip_logic(
library_title,
library_type,
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
library_mapping,
)
assert skip_reason == None
def test_check_blacklist_logic():
# Fails
library_title = "Shows"
library_type = "episodes"
library_other = search_mapping(library_mapping, library_title)
skip_reason = check_blacklist_logic(
library_title,
library_type,
blacklist_library,
blacklist_library_type,
library_other,
)
assert (
skip_reason
== "episodes is in blacklist_library_type and TV Shows is in blacklist_library"
)
library_title = "TV Shows"
library_type = "episodes"
library_other = search_mapping(library_mapping, library_title)
skip_reason = check_blacklist_logic(
library_title,
library_type,
blacklist_library,
blacklist_library_type,
library_other,
)
assert (
skip_reason
== "episodes is in blacklist_library_type and TV Shows is in blacklist_library"
)
# Passes
library_title = "Movie"
library_type = "movies"
library_other = search_mapping(library_mapping, library_title)
skip_reason = check_blacklist_logic(
library_title,
library_type,
blacklist_library,
blacklist_library_type,
library_other,
)
assert skip_reason == None
library_title = "Movies"
library_type = "movies"
library_other = search_mapping(library_mapping, library_title)
skip_reason = check_blacklist_logic(
library_title,
library_type,
blacklist_library,
blacklist_library_type,
library_other,
)
assert skip_reason == None
def test_check_whitelist_logic():
# Fails
library_title = "Shows"
library_type = "episodes"
library_other = search_mapping(library_mapping, library_title)
skip_reason = check_whitelist_logic(
library_title,
library_type,
whitelist_library,
whitelist_library_type,
library_other,
)
assert (
skip_reason
== "episodes is not in whitelist_library_type and Shows is not in whitelist_library"
)
library_title = "TV Shows"
library_type = "episodes"
library_other = search_mapping(library_mapping, library_title)
skip_reason = check_whitelist_logic(
library_title,
library_type,
whitelist_library,
whitelist_library_type,
library_other,
)
assert (
skip_reason
== "episodes is not in whitelist_library_type and TV Shows is not in whitelist_library"
)
# Passes
library_title = "Movie"
library_type = "movies"
library_other = search_mapping(library_mapping, library_title)
skip_reason = check_whitelist_logic(
library_title,
library_type,
whitelist_library,
whitelist_library_type,
library_other,
)
assert skip_reason == None
library_title = "Movies"
library_type = "movies"
library_other = search_mapping(library_mapping, library_title)
skip_reason = check_whitelist_logic(
library_title,
library_type,
whitelist_library,
whitelist_library_type,
library_other,
)
assert skip_reason == None
def test_show_title_dict():
show_titles_dict = show_title_dict(show_list)
assert show_titles_dict == show_titles
def test_episode_title_dict():
episode_titles_dict = episode_title_dict(show_list)
assert episode_titles_dict == episode_titles
def test_movies_title_dict():
movies_titles_dict = movies_title_dict(movie_list)
assert movies_titles_dict == movie_titles
def test_generate_library_guids_dict():
# Test with shows
(
show_titles_dict,
episode_titles_dict,
movies_titles_dict,
) = generate_library_guids_dict(show_list)
assert show_titles_dict == show_titles
assert episode_titles_dict == episode_titles
assert movies_titles_dict == {}
# Test with movies
(
show_titles_dict,
episode_titles_dict,
movies_titles_dict,
) = generate_library_guids_dict(movie_list)
assert show_titles_dict == {}
assert episode_titles_dict == {}
assert movies_titles_dict == movie_titles

View File

@@ -13,7 +13,7 @@ parent = os.path.dirname(current)
# the sys.path. # the sys.path.
sys.path.append(parent) sys.path.append(parent)
from src.main import setup_black_white_lists from src.black_white import setup_black_white_lists
def test_setup_black_white_lists(): def test_setup_black_white_lists():

39
test/test_users.py Normal file
View File

@@ -0,0 +1,39 @@
import sys
import os
# getting the name of the directory
# where the this file is present.
current = os.path.dirname(os.path.realpath(__file__))
# Getting the parent directory name
# where the current directory is present.
parent = os.path.dirname(current)
# adding the parent directory to
# the sys.path.
sys.path.append(parent)
from src.users import (
combine_user_lists,
filter_user_lists,
)
def test_combine_user_lists():
server_1_users = ["test", "test3", "luigi311"]
server_2_users = ["luigi311", "test2", "test3"]
user_mapping = {"test2": "test"}
combined = combine_user_lists(server_1_users, server_2_users, user_mapping)
assert combined == {"luigi311": "luigi311", "test": "test2", "test3": "test3"}
def test_filter_user_lists():
users = {"luigi311": "luigi311", "test": "test2", "test3": "test3"}
blacklist_users = ["test3"]
whitelist_users = ["test", "luigi311"]
filtered = filter_user_lists(users, blacklist_users, whitelist_users)
assert filtered == {"test": "test2", "luigi311": "luigi311"}

View File

@@ -1,301 +1,410 @@
import sys import sys
import os import os
# getting the name of the directory # getting the name of the directory
# where the this file is present. # where the this file is present.
current = os.path.dirname(os.path.realpath(__file__)) current = os.path.dirname(os.path.realpath(__file__))
# Getting the parent directory name # Getting the parent directory name
# where the current directory is present. # where the current directory is present.
parent = os.path.dirname(current) parent = os.path.dirname(current)
# adding the parent directory to # adding the parent directory to
# the sys.path. # the sys.path.
sys.path.append(parent) sys.path.append(parent)
from src.main import cleanup_watched from src.watched import cleanup_watched, combine_watched_dicts
tv_shows_watched_list_1 = { tv_shows_watched_list_1 = {
frozenset( frozenset(
{ {
("tvdb", "75710"), ("tvdb", "75710"),
("title", "Criminal Minds"), ("title", "Criminal Minds"),
("imdb", "tt0452046"), ("imdb", "tt0452046"),
("locations", ("Criminal Minds",)), ("locations", ("Criminal Minds",)),
("tmdb", "4057"), ("tmdb", "4057"),
} }
): { ): {
"Season 1": [ "Season 1": [
{ {
"imdb": "tt0550489", "imdb": "tt0550489",
"tmdb": "282843", "tmdb": "282843",
"tvdb": "176357", "tvdb": "176357",
"locations": ( "locations": (
"Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv", "Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv",
), ),
}, },
{ {
"imdb": "tt0550487", "imdb": "tt0550487",
"tmdb": "282861", "tmdb": "282861",
"tvdb": "300385", "tvdb": "300385",
"locations": ("Criminal Minds S01E02 Compulsion WEBDL-720p.mkv",), "locations": ("Criminal Minds S01E02 Compulsion WEBDL-720p.mkv",),
}, },
] ]
}, },
frozenset({("title", "Test"), ("locations", ("Test",))}): { frozenset({("title", "Test"), ("locations", ("Test",))}): {
"Season 1": [ "Season 1": [
{"locations": ("Test S01E01.mkv",)}, {"locations": ("Test S01E01.mkv",)},
{"locations": ("Test S01E02.mkv",)}, {"locations": ("Test S01E02.mkv",)},
] ]
}, },
} }
movies_watched_list_1 = [ movies_watched_list_1 = [
{ {
"imdb": "tt2380307", "imdb": "tt2380307",
"tmdb": "354912", "tmdb": "354912",
"title": "Coco", "title": "Coco",
"locations": ("Coco (2017) Remux-1080p.mkv",), "locations": ("Coco (2017) Remux-1080p.mkv",),
}, },
{ {
"tmdbcollection": "448150", "tmdbcollection": "448150",
"imdb": "tt1431045", "imdb": "tt1431045",
"tmdb": "293660", "tmdb": "293660",
"title": "Deadpool", "title": "Deadpool",
"locations": ("Deadpool (2016) Remux-1080p.mkv",), "locations": ("Deadpool (2016) Remux-1080p.mkv",),
}, },
] ]
tv_shows_watched_list_2 = { tv_shows_watched_list_2 = {
frozenset( frozenset(
{ {
("tvdb", "75710"), ("tvdb", "75710"),
("title", "Criminal Minds"), ("title", "Criminal Minds"),
("imdb", "tt0452046"), ("imdb", "tt0452046"),
("locations", ("Criminal Minds",)), ("locations", ("Criminal Minds",)),
("tmdb", "4057"), ("tmdb", "4057"),
} }
): { ): {
"Season 1": [ "Season 1": [
{ {
"imdb": "tt0550487", "imdb": "tt0550487",
"tmdb": "282861", "tmdb": "282861",
"tvdb": "300385", "tvdb": "300385",
"locations": ("Criminal Minds S01E02 Compulsion WEBDL-720p.mkv",), "locations": ("Criminal Minds S01E02 Compulsion WEBDL-720p.mkv",),
}, },
{ {
"imdb": "tt0550498", "imdb": "tt0550498",
"tmdb": "282865", "tmdb": "282865",
"tvdb": "300474", "tvdb": "300474",
"locations": ( "locations": (
"Criminal Minds S01E03 Won't Get Fooled Again WEBDL-720p.mkv", "Criminal Minds S01E03 Won't Get Fooled Again WEBDL-720p.mkv",
), ),
}, },
] ]
}, },
frozenset({("title", "Test"), ("locations", ("Test",))}): { frozenset({("title", "Test"), ("locations", ("Test",))}): {
"Season 1": [ "Season 1": [
{"locations": ("Test S01E02.mkv",)}, {"locations": ("Test S01E02.mkv",)},
{"locations": ("Test S01E03.mkv",)}, {"locations": ("Test S01E03.mkv",)},
] ]
}, },
} }
movies_watched_list_2 = [ movies_watched_list_2 = [
{ {
"imdb": "tt2380307", "imdb": "tt2380307",
"tmdb": "354912", "tmdb": "354912",
"title": "Coco", "title": "Coco",
"locations": ("Coco (2017) Remux-1080p.mkv",), "locations": ("Coco (2017) Remux-1080p.mkv",),
}, },
{ {
"imdb": "tt0384793", "imdb": "tt0384793",
"tmdb": "9788", "tmdb": "9788",
"tvdb": "9103", "tvdb": "9103",
"title": "Accepted", "title": "Accepted",
"locations": ("Accepted (2006) Remux-1080p.mkv",), "locations": ("Accepted (2006) Remux-1080p.mkv",),
}, },
] ]
# Test to see if objects get deleted all the way up to the root. # Test to see if objects get deleted all the way up to the root.
tv_shows_2_watched_list_1 = { tv_shows_2_watched_list_1 = {
frozenset( frozenset(
{ {
("tvdb", "75710"), ("tvdb", "75710"),
("title", "Criminal Minds"), ("title", "Criminal Minds"),
("imdb", "tt0452046"), ("imdb", "tt0452046"),
("locations", ("Criminal Minds",)), ("locations", ("Criminal Minds",)),
("tmdb", "4057"), ("tmdb", "4057"),
} }
): { ): {
"Season 1": [ "Season 1": [
{ {
"imdb": "tt0550489", "imdb": "tt0550489",
"tmdb": "282843", "tmdb": "282843",
"tvdb": "176357", "tvdb": "176357",
"locations": ( "locations": (
"Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv", "Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv",
), ),
}, },
] ]
} }
} }
expected_tv_show_watched_list_1 = { expected_tv_show_watched_list_1 = {
frozenset( frozenset(
{ {
("tvdb", "75710"), ("tvdb", "75710"),
("title", "Criminal Minds"), ("title", "Criminal Minds"),
("imdb", "tt0452046"), ("imdb", "tt0452046"),
("locations", ("Criminal Minds",)), ("locations", ("Criminal Minds",)),
("tmdb", "4057"), ("tmdb", "4057"),
} }
): { ): {
"Season 1": [ "Season 1": [
{ {
"imdb": "tt0550489", "imdb": "tt0550489",
"tmdb": "282843", "tmdb": "282843",
"tvdb": "176357", "tvdb": "176357",
"locations": ( "locations": (
"Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv", "Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv",
), ),
} }
] ]
}, },
frozenset({("title", "Test"), ("locations", ("Test",))}): { frozenset({("title", "Test"), ("locations", ("Test",))}): {
"Season 1": [{"locations": ("Test S01E01.mkv",)}] "Season 1": [{"locations": ("Test S01E01.mkv",)}]
}, },
} }
expected_movie_watched_list_1 = [ expected_movie_watched_list_1 = [
{ {
"tmdbcollection": "448150", "tmdbcollection": "448150",
"imdb": "tt1431045", "imdb": "tt1431045",
"tmdb": "293660", "tmdb": "293660",
"title": "Deadpool", "title": "Deadpool",
"locations": ("Deadpool (2016) Remux-1080p.mkv",), "locations": ("Deadpool (2016) Remux-1080p.mkv",),
} }
] ]
expected_tv_show_watched_list_2 = { expected_tv_show_watched_list_2 = {
frozenset( frozenset(
{ {
("tvdb", "75710"), ("tvdb", "75710"),
("title", "Criminal Minds"), ("title", "Criminal Minds"),
("imdb", "tt0452046"), ("imdb", "tt0452046"),
("locations", ("Criminal Minds",)), ("locations", ("Criminal Minds",)),
("tmdb", "4057"), ("tmdb", "4057"),
} }
): { ): {
"Season 1": [ "Season 1": [
{ {
"imdb": "tt0550498", "imdb": "tt0550498",
"tmdb": "282865", "tmdb": "282865",
"tvdb": "300474", "tvdb": "300474",
"locations": ( "locations": (
"Criminal Minds S01E03 Won't Get Fooled Again WEBDL-720p.mkv", "Criminal Minds S01E03 Won't Get Fooled Again WEBDL-720p.mkv",
), ),
} }
] ]
}, },
frozenset({("title", "Test"), ("locations", ("Test",))}): { frozenset({("title", "Test"), ("locations", ("Test",))}): {
"Season 1": [{"locations": ("Test S01E03.mkv",)}] "Season 1": [{"locations": ("Test S01E03.mkv",)}]
}, },
} }
expected_movie_watched_list_2 = [ expected_movie_watched_list_2 = [
{ {
"imdb": "tt0384793", "imdb": "tt0384793",
"tmdb": "9788", "tmdb": "9788",
"tvdb": "9103", "tvdb": "9103",
"title": "Accepted", "title": "Accepted",
"locations": ("Accepted (2006) Remux-1080p.mkv",), "locations": ("Accepted (2006) Remux-1080p.mkv",),
} }
] ]
def test_simple_cleanup_watched(): def test_simple_cleanup_watched():
user_watched_list_1 = { user_watched_list_1 = {
"user1": { "user1": {
"TV Shows": tv_shows_watched_list_1, "TV Shows": tv_shows_watched_list_1,
"Movies": movies_watched_list_1, "Movies": movies_watched_list_1,
"Other Shows": tv_shows_2_watched_list_1, "Other Shows": tv_shows_2_watched_list_1,
}, },
} }
user_watched_list_2 = { user_watched_list_2 = {
"user1": { "user1": {
"TV Shows": tv_shows_watched_list_2, "TV Shows": tv_shows_watched_list_2,
"Movies": movies_watched_list_2, "Movies": movies_watched_list_2,
"Other Shows": tv_shows_2_watched_list_1, "Other Shows": tv_shows_2_watched_list_1,
} }
} }
expected_watched_list_1 = { expected_watched_list_1 = {
"user1": { "user1": {
"TV Shows": expected_tv_show_watched_list_1, "TV Shows": expected_tv_show_watched_list_1,
"Movies": expected_movie_watched_list_1, "Movies": expected_movie_watched_list_1,
} }
} }
expected_watched_list_2 = { expected_watched_list_2 = {
"user1": { "user1": {
"TV Shows": expected_tv_show_watched_list_2, "TV Shows": expected_tv_show_watched_list_2,
"Movies": expected_movie_watched_list_2, "Movies": expected_movie_watched_list_2,
} }
} }
return_watched_list_1 = cleanup_watched(user_watched_list_1, user_watched_list_2) return_watched_list_1 = cleanup_watched(user_watched_list_1, user_watched_list_2)
return_watched_list_2 = cleanup_watched(user_watched_list_2, user_watched_list_1) return_watched_list_2 = cleanup_watched(user_watched_list_2, user_watched_list_1)
assert return_watched_list_1 == expected_watched_list_1 assert return_watched_list_1 == expected_watched_list_1
assert return_watched_list_2 == expected_watched_list_2 assert return_watched_list_2 == expected_watched_list_2
def test_mapping_cleanup_watched(): def test_mapping_cleanup_watched():
user_watched_list_1 = { user_watched_list_1 = {
"user1": { "user1": {
"TV Shows": tv_shows_watched_list_1, "TV Shows": tv_shows_watched_list_1,
"Movies": movies_watched_list_1, "Movies": movies_watched_list_1,
"Other Shows": tv_shows_2_watched_list_1, "Other Shows": tv_shows_2_watched_list_1,
}, },
} }
user_watched_list_2 = { user_watched_list_2 = {
"user2": { "user2": {
"Shows": tv_shows_watched_list_2, "Shows": tv_shows_watched_list_2,
"Movies": movies_watched_list_2, "Movies": movies_watched_list_2,
"Other Shows": tv_shows_2_watched_list_1, "Other Shows": tv_shows_2_watched_list_1,
} }
} }
expected_watched_list_1 = { expected_watched_list_1 = {
"user1": { "user1": {
"TV Shows": expected_tv_show_watched_list_1, "TV Shows": expected_tv_show_watched_list_1,
"Movies": expected_movie_watched_list_1, "Movies": expected_movie_watched_list_1,
} }
} }
expected_watched_list_2 = { expected_watched_list_2 = {
"user2": { "user2": {
"Shows": expected_tv_show_watched_list_2, "Shows": expected_tv_show_watched_list_2,
"Movies": expected_movie_watched_list_2, "Movies": expected_movie_watched_list_2,
} }
} }
user_mapping = {"user1": "user2"} user_mapping = {"user1": "user2"}
library_mapping = {"TV Shows": "Shows"} library_mapping = {"TV Shows": "Shows"}
return_watched_list_1 = cleanup_watched( return_watched_list_1 = cleanup_watched(
user_watched_list_1, user_watched_list_1,
user_watched_list_2, user_watched_list_2,
user_mapping=user_mapping, user_mapping=user_mapping,
library_mapping=library_mapping, library_mapping=library_mapping,
) )
return_watched_list_2 = cleanup_watched( return_watched_list_2 = cleanup_watched(
user_watched_list_2, user_watched_list_2,
user_watched_list_1, user_watched_list_1,
user_mapping=user_mapping, user_mapping=user_mapping,
library_mapping=library_mapping, library_mapping=library_mapping,
) )
assert return_watched_list_1 == expected_watched_list_1 assert return_watched_list_1 == expected_watched_list_1
assert return_watched_list_2 == expected_watched_list_2 assert return_watched_list_2 == expected_watched_list_2
def test_combine_watched_dicts():
input_watched = [
{
"test3": {
"Anime Movies": [
{
"title": "Ponyo",
"tmdb": "12429",
"imdb": "tt0876563",
"locations": ("Ponyo (2008) Bluray-1080p.mkv",),
},
{
"title": "Spirited Away",
"tmdb": "129",
"imdb": "tt0245429",
"locations": ("Spirited Away (2001) Bluray-1080p.mkv",),
},
{
"title": "Castle in the Sky",
"tmdb": "10515",
"imdb": "tt0092067",
"locations": ("Castle in the Sky (1986) Bluray-1080p.mkv",),
},
]
}
},
{"test3": {"Anime Shows": {}}},
{"test3": {"Cartoon Shows": {}}},
{
"test3": {
"Shows": {
frozenset(
{
("tmdb", "64464"),
("tvdb", "301824"),
("tvrage", "45210"),
("title", "11.22.63"),
("locations", ("11.22.63",)),
("imdb", "tt2879552"),
}
): {
"Season 1": [
{
"imdb": "tt4460418",
"title": "The Rabbit Hole",
"locations": (
"11.22.63 S01E01 The Rabbit Hole Bluray-1080p.mkv",
),
}
]
}
}
}
},
{"test3": {"Subbed Anime": {}}},
]
expected = {
"test3": {
"Anime Movies": [
{
"title": "Ponyo",
"tmdb": "12429",
"imdb": "tt0876563",
"locations": ("Ponyo (2008) Bluray-1080p.mkv",),
},
{
"title": "Spirited Away",
"tmdb": "129",
"imdb": "tt0245429",
"locations": ("Spirited Away (2001) Bluray-1080p.mkv",),
},
{
"title": "Castle in the Sky",
"tmdb": "10515",
"imdb": "tt0092067",
"locations": ("Castle in the Sky (1986) Bluray-1080p.mkv",),
},
],
"Anime Shows": {},
"Cartoon Shows": {},
"Shows": {
frozenset(
{
("tmdb", "64464"),
("tvdb", "301824"),
("tvrage", "45210"),
("title", "11.22.63"),
("locations", ("11.22.63",)),
("imdb", "tt2879552"),
}
): {
"Season 1": [
{
"imdb": "tt4460418",
"title": "The Rabbit Hole",
"locations": (
"11.22.63 S01E01 The Rabbit Hole Bluray-1080p.mkv",
),
}
]
}
},
"Subbed Anime": {},
}
}
assert combine_watched_dicts(input_watched) == expected