Merge pull request #51 from luigi311/dev
Add sync direction flags, seperate out functions, better logging for jellyfin queriespull/61/head v4.2.0
commit
fd64088bde
|
|
@ -55,6 +55,12 @@ PLEX_TOKEN = "SuperSecretToken, SuperSecretToken2"
|
|||
## Set to True if running into ssl certificate errors
|
||||
SSL_BYPASS = "False"
|
||||
|
||||
## control the direction of syncing. e.g. SYNC_FROM_PLEX_TO_JELLYFIN set to true will cause the updates from plex
|
||||
## to be updated in jellyfin. SYNC_FROM_PLEX_TO_PLEX set to true will sync updates between multiple plex servers
|
||||
SYNC_FROM_PLEX_TO_JELLYFIN = "True"
|
||||
SYNC_FROM_JELLYFIN_TO_PLEX = "True"
|
||||
SYNC_FROM_PLEX_TO_PLEX = "True"
|
||||
SYNC_FROM_JELLYFIN_TO_JELLYFIN = "True"
|
||||
|
||||
|
||||
# Jellyfin
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
---
|
||||
name: Bug report
|
||||
about: Create a report to help us improve
|
||||
title: "[BUG]"
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
||||
**Describe the bug**
|
||||
A clear and concise description of what the bug is.
|
||||
|
||||
**To Reproduce**
|
||||
Steps to reproduce the behavior:
|
||||
1. Go to '...'
|
||||
2. Click on '....'
|
||||
3. Scroll down to '....'
|
||||
4. See error
|
||||
|
||||
**Expected behavior**
|
||||
A clear and concise description of what you expected to happen.
|
||||
|
||||
**Logs**
|
||||
If applicable, add logs to help explain your problem ideally with DEBUG set to true, be sure to remove sensitive information
|
||||
|
||||
**Type:**
|
||||
- [ ] Docker
|
||||
- [ ] Native
|
||||
|
||||
**Additional context**
|
||||
Add any other context about the problem here.
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
---
|
||||
name: Feature request
|
||||
about: Suggest an idea for this project
|
||||
title: "[Feature Request]"
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
---
|
||||
|
||||
**Is your feature request related to a problem? Please describe.**
|
||||
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
|
||||
|
||||
**Describe the solution you'd like**
|
||||
A clear and concise description of what you want to happen.
|
||||
|
||||
**Describe alternatives you've considered**
|
||||
A clear and concise description of any alternative solutions or features you've considered.
|
||||
|
||||
**Additional context**
|
||||
Add any other context or screenshots about the feature request here.
|
||||
|
|
@ -1,3 +1,7 @@
|
|||
{
|
||||
"python.formatting.provider": "black"
|
||||
"[python]" : {
|
||||
"editor.formatOnSave": true,
|
||||
},
|
||||
"python.formatting.provider": "black",
|
||||
|
||||
}
|
||||
|
|
@ -69,6 +69,13 @@ PLEX_TOKEN = "SuperSecretToken, SuperSecretToken2"
|
|||
SSL_BYPASS = "False"
|
||||
|
||||
|
||||
## control the direction of syncing. e.g. SYNC_FROM_PLEX_TO_JELLYFIN set to true will cause the updates from plex
|
||||
## to be updated in jellyfin. SYNC_FROM_PLEX_TO_PLEX set to true will sync updates between multiple plex servers
|
||||
SYNC_FROM_PLEX_TO_JELLYFIN = "True"
|
||||
SYNC_FROM_JELLYFIN_TO_PLEX = "True"
|
||||
SYNC_FROM_PLEX_TO_PLEX = "True"
|
||||
SYNC_FROM_JELLYFIN_TO_JELLYFIN = "True"
|
||||
|
||||
|
||||
# Jellyfin
|
||||
|
||||
|
|
@ -81,7 +88,6 @@ JELLYFIN_BASEURL = "http://localhost:8096, http://nas:8096"
|
|||
JELLYFIN_TOKEN = "SuperSecretToken, SuperSecretToken2"
|
||||
```
|
||||
|
||||
|
||||
## Installation
|
||||
|
||||
### Baremetal
|
||||
|
|
|
|||
|
|
@ -0,0 +1,139 @@
|
|||
from src.functions import logger, search_mapping
|
||||
|
||||
|
||||
def setup_black_white_lists(
|
||||
blacklist_library: str,
|
||||
whitelist_library: str,
|
||||
blacklist_library_type: str,
|
||||
whitelist_library_type: str,
|
||||
blacklist_users: str,
|
||||
whitelist_users: str,
|
||||
library_mapping=None,
|
||||
user_mapping=None,
|
||||
):
|
||||
blacklist_library, blacklist_library_type, blacklist_users = setup_black_lists(
|
||||
blacklist_library,
|
||||
blacklist_library_type,
|
||||
blacklist_users,
|
||||
library_mapping,
|
||||
user_mapping,
|
||||
)
|
||||
|
||||
whitelist_library, whitelist_library_type, whitelist_users = setup_white_lists(
|
||||
whitelist_library,
|
||||
whitelist_library_type,
|
||||
whitelist_users,
|
||||
library_mapping,
|
||||
user_mapping,
|
||||
)
|
||||
|
||||
return (
|
||||
blacklist_library,
|
||||
whitelist_library,
|
||||
blacklist_library_type,
|
||||
whitelist_library_type,
|
||||
blacklist_users,
|
||||
whitelist_users,
|
||||
)
|
||||
|
||||
|
||||
def setup_black_lists(
|
||||
blacklist_library,
|
||||
blacklist_library_type,
|
||||
blacklist_users,
|
||||
library_mapping=None,
|
||||
user_mapping=None,
|
||||
):
|
||||
if blacklist_library:
|
||||
if len(blacklist_library) > 0:
|
||||
blacklist_library = blacklist_library.split(",")
|
||||
blacklist_library = [x.strip() for x in blacklist_library]
|
||||
if library_mapping:
|
||||
temp_library = []
|
||||
for library in blacklist_library:
|
||||
library_other = search_mapping(library_mapping, library)
|
||||
if library_other:
|
||||
temp_library.append(library_other)
|
||||
|
||||
blacklist_library = blacklist_library + temp_library
|
||||
else:
|
||||
blacklist_library = []
|
||||
logger(f"Blacklist Library: {blacklist_library}", 1)
|
||||
|
||||
if blacklist_library_type:
|
||||
if len(blacklist_library_type) > 0:
|
||||
blacklist_library_type = blacklist_library_type.split(",")
|
||||
blacklist_library_type = [x.lower().strip() for x in blacklist_library_type]
|
||||
else:
|
||||
blacklist_library_type = []
|
||||
logger(f"Blacklist Library Type: {blacklist_library_type}", 1)
|
||||
|
||||
if blacklist_users:
|
||||
if len(blacklist_users) > 0:
|
||||
blacklist_users = blacklist_users.split(",")
|
||||
blacklist_users = [x.lower().strip() for x in blacklist_users]
|
||||
if user_mapping:
|
||||
temp_users = []
|
||||
for user in blacklist_users:
|
||||
user_other = search_mapping(user_mapping, user)
|
||||
if user_other:
|
||||
temp_users.append(user_other)
|
||||
|
||||
blacklist_users = blacklist_users + temp_users
|
||||
else:
|
||||
blacklist_users = []
|
||||
logger(f"Blacklist Users: {blacklist_users}", 1)
|
||||
|
||||
return blacklist_library, blacklist_library_type, blacklist_users
|
||||
|
||||
|
||||
def setup_white_lists(
|
||||
whitelist_library,
|
||||
whitelist_library_type,
|
||||
whitelist_users,
|
||||
library_mapping=None,
|
||||
user_mapping=None,
|
||||
):
|
||||
if whitelist_library:
|
||||
if len(whitelist_library) > 0:
|
||||
whitelist_library = whitelist_library.split(",")
|
||||
whitelist_library = [x.strip() for x in whitelist_library]
|
||||
if library_mapping:
|
||||
temp_library = []
|
||||
for library in whitelist_library:
|
||||
library_other = search_mapping(library_mapping, library)
|
||||
if library_other:
|
||||
temp_library.append(library_other)
|
||||
|
||||
whitelist_library = whitelist_library + temp_library
|
||||
else:
|
||||
whitelist_library = []
|
||||
logger(f"Whitelist Library: {whitelist_library}", 1)
|
||||
|
||||
if whitelist_library_type:
|
||||
if len(whitelist_library_type) > 0:
|
||||
whitelist_library_type = whitelist_library_type.split(",")
|
||||
whitelist_library_type = [x.lower().strip() for x in whitelist_library_type]
|
||||
else:
|
||||
whitelist_library_type = []
|
||||
logger(f"Whitelist Library Type: {whitelist_library_type}", 1)
|
||||
|
||||
if whitelist_users:
|
||||
if len(whitelist_users) > 0:
|
||||
whitelist_users = whitelist_users.split(",")
|
||||
whitelist_users = [x.lower().strip() for x in whitelist_users]
|
||||
if user_mapping:
|
||||
temp_users = []
|
||||
for user in whitelist_users:
|
||||
user_other = search_mapping(user_mapping, user)
|
||||
if user_other:
|
||||
temp_users.append(user_other)
|
||||
|
||||
whitelist_users = whitelist_users + temp_users
|
||||
else:
|
||||
whitelist_users = []
|
||||
else:
|
||||
whitelist_users = []
|
||||
logger(f"Whitelist Users: {whitelist_users}", 1)
|
||||
|
||||
return whitelist_library, whitelist_library_type, whitelist_users
|
||||
402
src/functions.py
402
src/functions.py
|
|
@ -1,4 +1,4 @@
|
|||
import os, copy
|
||||
import os
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
|
@ -55,406 +55,6 @@ def search_mapping(dictionary: dict, key_value: str):
|
|||
return None
|
||||
|
||||
|
||||
def setup_black_white_lists(
|
||||
blacklist_library: str,
|
||||
whitelist_library: str,
|
||||
blacklist_library_type: str,
|
||||
whitelist_library_type: str,
|
||||
blacklist_users: str,
|
||||
whitelist_users: str,
|
||||
library_mapping=None,
|
||||
user_mapping=None,
|
||||
):
|
||||
if blacklist_library:
|
||||
if len(blacklist_library) > 0:
|
||||
blacklist_library = blacklist_library.split(",")
|
||||
blacklist_library = [x.strip() for x in blacklist_library]
|
||||
if library_mapping:
|
||||
temp_library = []
|
||||
for library in blacklist_library:
|
||||
library_other = search_mapping(library_mapping, library)
|
||||
if library_other:
|
||||
temp_library.append(library_other)
|
||||
|
||||
blacklist_library = blacklist_library + temp_library
|
||||
else:
|
||||
blacklist_library = []
|
||||
logger(f"Blacklist Library: {blacklist_library}", 1)
|
||||
|
||||
if whitelist_library:
|
||||
if len(whitelist_library) > 0:
|
||||
whitelist_library = whitelist_library.split(",")
|
||||
whitelist_library = [x.strip() for x in whitelist_library]
|
||||
if library_mapping:
|
||||
temp_library = []
|
||||
for library in whitelist_library:
|
||||
library_other = search_mapping(library_mapping, library)
|
||||
if library_other:
|
||||
temp_library.append(library_other)
|
||||
|
||||
whitelist_library = whitelist_library + temp_library
|
||||
else:
|
||||
whitelist_library = []
|
||||
logger(f"Whitelist Library: {whitelist_library}", 1)
|
||||
|
||||
if blacklist_library_type:
|
||||
if len(blacklist_library_type) > 0:
|
||||
blacklist_library_type = blacklist_library_type.split(",")
|
||||
blacklist_library_type = [x.lower().strip() for x in blacklist_library_type]
|
||||
else:
|
||||
blacklist_library_type = []
|
||||
logger(f"Blacklist Library Type: {blacklist_library_type}", 1)
|
||||
|
||||
if whitelist_library_type:
|
||||
if len(whitelist_library_type) > 0:
|
||||
whitelist_library_type = whitelist_library_type.split(",")
|
||||
whitelist_library_type = [x.lower().strip() for x in whitelist_library_type]
|
||||
else:
|
||||
whitelist_library_type = []
|
||||
logger(f"Whitelist Library Type: {whitelist_library_type}", 1)
|
||||
|
||||
if blacklist_users:
|
||||
if len(blacklist_users) > 0:
|
||||
blacklist_users = blacklist_users.split(",")
|
||||
blacklist_users = [x.lower().strip() for x in blacklist_users]
|
||||
if user_mapping:
|
||||
temp_users = []
|
||||
for user in blacklist_users:
|
||||
user_other = search_mapping(user_mapping, user)
|
||||
if user_other:
|
||||
temp_users.append(user_other)
|
||||
|
||||
blacklist_users = blacklist_users + temp_users
|
||||
else:
|
||||
blacklist_users = []
|
||||
logger(f"Blacklist Users: {blacklist_users}", 1)
|
||||
|
||||
if whitelist_users:
|
||||
if len(whitelist_users) > 0:
|
||||
whitelist_users = whitelist_users.split(",")
|
||||
whitelist_users = [x.lower().strip() for x in whitelist_users]
|
||||
if user_mapping:
|
||||
temp_users = []
|
||||
for user in whitelist_users:
|
||||
user_other = search_mapping(user_mapping, user)
|
||||
if user_other:
|
||||
temp_users.append(user_other)
|
||||
|
||||
whitelist_users = whitelist_users + temp_users
|
||||
else:
|
||||
whitelist_users = []
|
||||
else:
|
||||
whitelist_users = []
|
||||
logger(f"Whitelist Users: {whitelist_users}", 1)
|
||||
|
||||
return (
|
||||
blacklist_library,
|
||||
whitelist_library,
|
||||
blacklist_library_type,
|
||||
whitelist_library_type,
|
||||
blacklist_users,
|
||||
whitelist_users,
|
||||
)
|
||||
|
||||
|
||||
def check_skip_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
blacklist_library,
|
||||
whitelist_library,
|
||||
blacklist_library_type,
|
||||
whitelist_library_type,
|
||||
library_mapping,
|
||||
):
|
||||
skip_reason = None
|
||||
|
||||
if isinstance(library_type, (list, tuple, set)):
|
||||
for library_type_item in library_type:
|
||||
if library_type_item.lower() in blacklist_library_type:
|
||||
skip_reason = "is blacklist_library_type"
|
||||
else:
|
||||
if library_type.lower() in blacklist_library_type:
|
||||
skip_reason = "is blacklist_library_type"
|
||||
|
||||
if library_title.lower() in [x.lower() for x in blacklist_library]:
|
||||
skip_reason = "is blacklist_library"
|
||||
|
||||
library_other = None
|
||||
if library_mapping:
|
||||
library_other = search_mapping(library_mapping, library_title)
|
||||
if library_other:
|
||||
if library_other.lower() in [x.lower() for x in blacklist_library]:
|
||||
skip_reason = "is blacklist_library"
|
||||
|
||||
if len(whitelist_library_type) > 0:
|
||||
if isinstance(library_type, (list, tuple, set)):
|
||||
for library_type_item in library_type:
|
||||
if library_type_item.lower() not in whitelist_library_type:
|
||||
skip_reason = "is not whitelist_library_type"
|
||||
else:
|
||||
if library_type.lower() not in whitelist_library_type:
|
||||
skip_reason = "is not whitelist_library_type"
|
||||
|
||||
# if whitelist is not empty and library is not in whitelist
|
||||
if len(whitelist_library) > 0:
|
||||
if library_title.lower() not in [x.lower() for x in whitelist_library]:
|
||||
skip_reason = "is not whitelist_library"
|
||||
|
||||
if library_other:
|
||||
if library_other.lower() not in [x.lower() for x in whitelist_library]:
|
||||
skip_reason = "is not whitelist_library"
|
||||
|
||||
return skip_reason
|
||||
|
||||
|
||||
def generate_library_guids_dict(user_list: dict):
|
||||
show_output_dict = {}
|
||||
episode_output_dict = {}
|
||||
movies_output_dict = {}
|
||||
|
||||
# Handle the case where user_list is empty or does not contain the expected keys and values
|
||||
if not user_list:
|
||||
return show_output_dict, episode_output_dict, movies_output_dict
|
||||
|
||||
try:
|
||||
show_output_keys = user_list.keys()
|
||||
show_output_keys = [dict(x) for x in list(show_output_keys)]
|
||||
for show_key in show_output_keys:
|
||||
for provider_key, provider_value in show_key.items():
|
||||
# Skip title
|
||||
if provider_key.lower() == "title":
|
||||
continue
|
||||
if provider_key.lower() not in show_output_dict:
|
||||
show_output_dict[provider_key.lower()] = []
|
||||
if provider_key.lower() == "locations":
|
||||
for show_location in provider_value:
|
||||
show_output_dict[provider_key.lower()].append(show_location)
|
||||
else:
|
||||
show_output_dict[provider_key.lower()].append(
|
||||
provider_value.lower()
|
||||
)
|
||||
except Exception:
|
||||
logger("Generating show_output_dict failed, skipping", 1)
|
||||
|
||||
try:
|
||||
for show in user_list:
|
||||
for season in user_list[show]:
|
||||
for episode in user_list[show][season]:
|
||||
for episode_key, episode_value in episode.items():
|
||||
if episode_key.lower() not in episode_output_dict:
|
||||
episode_output_dict[episode_key.lower()] = []
|
||||
if episode_key == "locations":
|
||||
for episode_location in episode_value:
|
||||
episode_output_dict[episode_key.lower()].append(
|
||||
episode_location
|
||||
)
|
||||
else:
|
||||
episode_output_dict[episode_key.lower()].append(
|
||||
episode_value.lower()
|
||||
)
|
||||
except Exception:
|
||||
logger("Generating episode_output_dict failed, skipping", 1)
|
||||
|
||||
try:
|
||||
for movie in user_list:
|
||||
for movie_key, movie_value in movie.items():
|
||||
if movie_key.lower() not in movies_output_dict:
|
||||
movies_output_dict[movie_key.lower()] = []
|
||||
if movie_key == "locations":
|
||||
for movie_location in movie_value:
|
||||
movies_output_dict[movie_key.lower()].append(movie_location)
|
||||
else:
|
||||
movies_output_dict[movie_key.lower()].append(movie_value.lower())
|
||||
except Exception:
|
||||
logger("Generating movies_output_dict failed, skipping", 1)
|
||||
|
||||
return show_output_dict, episode_output_dict, movies_output_dict
|
||||
|
||||
|
||||
def combine_watched_dicts(dicts: list):
|
||||
combined_dict = {}
|
||||
for single_dict in dicts:
|
||||
for key, value in single_dict.items():
|
||||
if key not in combined_dict:
|
||||
combined_dict[key] = {}
|
||||
for subkey, subvalue in value.items():
|
||||
if subkey in combined_dict[key]:
|
||||
# If the subkey already exists in the combined dictionary,
|
||||
# check if the values are different and raise an exception if they are
|
||||
if combined_dict[key][subkey] != subvalue:
|
||||
raise ValueError(
|
||||
f"Conflicting values for subkey '{subkey}' under key '{key}'"
|
||||
)
|
||||
else:
|
||||
# If the subkey does not exist in the combined dictionary, add it
|
||||
combined_dict[key][subkey] = subvalue
|
||||
|
||||
return combined_dict
|
||||
|
||||
|
||||
def cleanup_watched(
|
||||
watched_list_1, watched_list_2, user_mapping=None, library_mapping=None
|
||||
):
|
||||
modified_watched_list_1 = copy.deepcopy(watched_list_1)
|
||||
|
||||
# remove entries from watched_list_1 that are in watched_list_2
|
||||
for user_1 in watched_list_1:
|
||||
user_other = None
|
||||
if user_mapping:
|
||||
user_other = search_mapping(user_mapping, user_1)
|
||||
user_2 = get_other(watched_list_2, user_1, user_other)
|
||||
if user_2 is None:
|
||||
continue
|
||||
|
||||
for library_1 in watched_list_1[user_1]:
|
||||
library_other = None
|
||||
if library_mapping:
|
||||
library_other = search_mapping(library_mapping, library_1)
|
||||
library_2 = get_other(watched_list_2[user_2], library_1, library_other)
|
||||
if library_2 is None:
|
||||
continue
|
||||
|
||||
(
|
||||
_,
|
||||
episode_watched_list_2_keys_dict,
|
||||
movies_watched_list_2_keys_dict,
|
||||
) = generate_library_guids_dict(watched_list_2[user_2][library_2])
|
||||
|
||||
# Movies
|
||||
if isinstance(watched_list_1[user_1][library_1], list):
|
||||
for movie in watched_list_1[user_1][library_1]:
|
||||
if is_movie_in_dict(movie, movies_watched_list_2_keys_dict):
|
||||
logger(f"Removing {movie} from {library_1}", 3)
|
||||
modified_watched_list_1[user_1][library_1].remove(movie)
|
||||
|
||||
# TV Shows
|
||||
elif isinstance(watched_list_1[user_1][library_1], dict):
|
||||
for show_key_1 in watched_list_1[user_1][library_1].keys():
|
||||
show_key_dict = dict(show_key_1)
|
||||
for season in watched_list_1[user_1][library_1][show_key_1]:
|
||||
for episode in watched_list_1[user_1][library_1][show_key_1][
|
||||
season
|
||||
]:
|
||||
if is_episode_in_dict(
|
||||
episode, episode_watched_list_2_keys_dict
|
||||
):
|
||||
if (
|
||||
episode
|
||||
in modified_watched_list_1[user_1][library_1][
|
||||
show_key_1
|
||||
][season]
|
||||
):
|
||||
logger(
|
||||
f"Removing {episode} from {show_key_dict['title']}",
|
||||
3,
|
||||
)
|
||||
modified_watched_list_1[user_1][library_1][
|
||||
show_key_1
|
||||
][season].remove(episode)
|
||||
|
||||
# Remove empty seasons
|
||||
if (
|
||||
len(
|
||||
modified_watched_list_1[user_1][library_1][show_key_1][
|
||||
season
|
||||
]
|
||||
)
|
||||
== 0
|
||||
):
|
||||
if (
|
||||
season
|
||||
in modified_watched_list_1[user_1][library_1][
|
||||
show_key_1
|
||||
]
|
||||
):
|
||||
logger(
|
||||
f"Removing {season} from {show_key_dict['title']} because it is empty",
|
||||
3,
|
||||
)
|
||||
del modified_watched_list_1[user_1][library_1][
|
||||
show_key_1
|
||||
][season]
|
||||
|
||||
# Remove empty shows
|
||||
if len(modified_watched_list_1[user_1][library_1][show_key_1]) == 0:
|
||||
if show_key_1 in modified_watched_list_1[user_1][library_1]:
|
||||
logger(
|
||||
f"Removing {show_key_dict['title']} because it is empty",
|
||||
3,
|
||||
)
|
||||
del modified_watched_list_1[user_1][library_1][show_key_1]
|
||||
|
||||
for user_1 in watched_list_1:
|
||||
for library_1 in watched_list_1[user_1]:
|
||||
if library_1 in modified_watched_list_1[user_1]:
|
||||
# If library is empty then remove it
|
||||
if len(modified_watched_list_1[user_1][library_1]) == 0:
|
||||
logger(f"Removing {library_1} from {user_1} because it is empty", 1)
|
||||
del modified_watched_list_1[user_1][library_1]
|
||||
|
||||
if user_1 in modified_watched_list_1:
|
||||
# If user is empty delete user
|
||||
if len(modified_watched_list_1[user_1]) == 0:
|
||||
logger(f"Removing {user_1} from watched list 1 because it is empty", 1)
|
||||
del modified_watched_list_1[user_1]
|
||||
|
||||
return modified_watched_list_1
|
||||
|
||||
|
||||
def get_other(watched_list_2, object_1, object_2):
|
||||
if object_1 in watched_list_2:
|
||||
return object_1
|
||||
elif object_2 in watched_list_2:
|
||||
return object_2
|
||||
else:
|
||||
logger(f"{object_1} and {object_2} not found in watched list 2", 1)
|
||||
return None
|
||||
|
||||
|
||||
def is_movie_in_dict(movie, movies_watched_list_2_keys_dict):
|
||||
# Iterate through the keys and values of the movie dictionary
|
||||
for movie_key, movie_value in movie.items():
|
||||
# If the key is "locations", check if the "locations" key is present in the movies_watched_list_2_keys_dict dictionary
|
||||
if movie_key == "locations":
|
||||
if "locations" in movies_watched_list_2_keys_dict.keys():
|
||||
# Iterate through the locations in the movie dictionary
|
||||
for location in movie_value:
|
||||
# If the location is in the movies_watched_list_2_keys_dict dictionary, return True
|
||||
if location in movies_watched_list_2_keys_dict["locations"]:
|
||||
return True
|
||||
# If the key is not "locations", check if the movie_key is present in the movies_watched_list_2_keys_dict dictionary
|
||||
else:
|
||||
if movie_key in movies_watched_list_2_keys_dict.keys():
|
||||
# If the movie_value is in the movies_watched_list_2_keys_dict dictionary, return True
|
||||
if movie_value in movies_watched_list_2_keys_dict[movie_key]:
|
||||
return True
|
||||
|
||||
# If the loop completes without finding a match, return False
|
||||
return False
|
||||
|
||||
|
||||
def is_episode_in_dict(episode, episode_watched_list_2_keys_dict):
|
||||
# Iterate through the keys and values of the episode dictionary
|
||||
for episode_key, episode_value in episode.items():
|
||||
# If the key is "locations", check if the "locations" key is present in the episode_watched_list_2_keys_dict dictionary
|
||||
if episode_key == "locations":
|
||||
if "locations" in episode_watched_list_2_keys_dict.keys():
|
||||
# Iterate through the locations in the episode dictionary
|
||||
for location in episode_value:
|
||||
# If the location is in the episode_watched_list_2_keys_dict dictionary, return True
|
||||
if location in episode_watched_list_2_keys_dict["locations"]:
|
||||
return True
|
||||
# If the key is not "locations", check if the episode_key is present in the episode_watched_list_2_keys_dict dictionary
|
||||
else:
|
||||
if episode_key in episode_watched_list_2_keys_dict.keys():
|
||||
# If the episode_value is in the episode_watched_list_2_keys_dict dictionary, return True
|
||||
if episode_value in episode_watched_list_2_keys_dict[episode_key]:
|
||||
return True
|
||||
|
||||
# If the loop completes without finding a match, return False
|
||||
return False
|
||||
|
||||
|
||||
def future_thread_executor(args: list, workers: int = -1):
|
||||
futures_list = []
|
||||
results = []
|
||||
|
|
|
|||
|
|
@ -1,9 +1,14 @@
|
|||
import asyncio, aiohttp, traceback
|
||||
|
||||
from src.functions import (
|
||||
logger,
|
||||
search_mapping,
|
||||
)
|
||||
from src.library import (
|
||||
check_skip_logic,
|
||||
generate_library_guids_dict,
|
||||
)
|
||||
from src.watched import (
|
||||
combine_watched_dicts,
|
||||
)
|
||||
|
||||
|
|
@ -38,17 +43,24 @@ class Jellyfin:
|
|||
async with session.get(
|
||||
self.baseurl + query, headers=headers
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise Exception(
|
||||
f"Query failed with status {response.status} {response.reason}"
|
||||
)
|
||||
results = await response.json()
|
||||
|
||||
elif query_type == "post":
|
||||
async with session.post(
|
||||
self.baseurl + query, headers=headers
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise Exception(
|
||||
f"Query failed with status {response.status} {response.reason}"
|
||||
)
|
||||
results = await response.json()
|
||||
|
||||
if type(results) is str:
|
||||
logger(f"Jellyfin: Query {query_type} {query} {results}", 2)
|
||||
raise Exception(results)
|
||||
if not isinstance(results, list) and not isinstance(results, dict):
|
||||
raise Exception("Query result is not of type list or dict")
|
||||
|
||||
# append identifiers to results
|
||||
if identifiers:
|
||||
|
|
@ -57,7 +69,7 @@ class Jellyfin:
|
|||
return results
|
||||
|
||||
except Exception as e:
|
||||
logger(f"Jellyfin: Query failed {e}", 2)
|
||||
logger(f"Jellyfin: Query {query_type} {query}\nResults {results}\n{e}", 2)
|
||||
raise Exception(e)
|
||||
|
||||
async def get_users(self):
|
||||
|
|
@ -386,19 +398,14 @@ class Jellyfin:
|
|||
|
||||
if skip_reason:
|
||||
logger(
|
||||
f"Jellyfin: Skipping library {library_title} {skip_reason}",
|
||||
f"Jellyfin: Skipping library {library_title}: {skip_reason}",
|
||||
1,
|
||||
)
|
||||
continue
|
||||
|
||||
# If there are multiple types in library raise error
|
||||
if types is None or len(types) < 1:
|
||||
all_types = set(
|
||||
[
|
||||
x["Type"]
|
||||
for x in watched["Items"]
|
||||
]
|
||||
)
|
||||
all_types = set([x["Type"] for x in watched["Items"]])
|
||||
logger(
|
||||
f"Jellyfin: Skipping Library {library_title} found types: {types}, all types: {all_types}",
|
||||
1,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,212 @@
|
|||
from src.functions import (
|
||||
logger,
|
||||
search_mapping,
|
||||
)
|
||||
|
||||
|
||||
def check_skip_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
blacklist_library,
|
||||
whitelist_library,
|
||||
blacklist_library_type,
|
||||
whitelist_library_type,
|
||||
library_mapping=None,
|
||||
):
|
||||
skip_reason = None
|
||||
library_other = None
|
||||
if library_mapping:
|
||||
library_other = search_mapping(library_mapping, library_title)
|
||||
|
||||
skip_reason_black = check_blacklist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
blacklist_library,
|
||||
blacklist_library_type,
|
||||
library_other,
|
||||
)
|
||||
skip_reason_white = check_whitelist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
whitelist_library,
|
||||
whitelist_library_type,
|
||||
library_other,
|
||||
)
|
||||
|
||||
# Combine skip reasons
|
||||
if skip_reason_black:
|
||||
skip_reason = skip_reason_black
|
||||
|
||||
if skip_reason_white:
|
||||
if skip_reason:
|
||||
skip_reason = skip_reason + " and " + skip_reason_white
|
||||
else:
|
||||
skip_reason = skip_reason_white
|
||||
|
||||
return skip_reason
|
||||
|
||||
|
||||
def check_blacklist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
blacklist_library,
|
||||
blacklist_library_type,
|
||||
library_other=None,
|
||||
):
|
||||
skip_reason = None
|
||||
if isinstance(library_type, (list, tuple, set)):
|
||||
for library_type_item in library_type:
|
||||
if library_type_item.lower() in blacklist_library_type:
|
||||
skip_reason = f"{library_type_item} is in blacklist_library_type"
|
||||
else:
|
||||
if library_type.lower() in blacklist_library_type:
|
||||
skip_reason = f"{library_type} is in blacklist_library_type"
|
||||
|
||||
if library_title.lower() in [x.lower() for x in blacklist_library]:
|
||||
if skip_reason:
|
||||
skip_reason = (
|
||||
skip_reason + " and " + f"{library_title} is in blacklist_library"
|
||||
)
|
||||
else:
|
||||
skip_reason = f"{library_title} is in blacklist_library"
|
||||
|
||||
if library_other:
|
||||
if library_other.lower() in [x.lower() for x in blacklist_library]:
|
||||
if skip_reason:
|
||||
skip_reason = (
|
||||
skip_reason + " and " + f"{library_other} is in blacklist_library"
|
||||
)
|
||||
else:
|
||||
skip_reason = f"{library_other} is in blacklist_library"
|
||||
|
||||
return skip_reason
|
||||
|
||||
|
||||
def check_whitelist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
whitelist_library,
|
||||
whitelist_library_type,
|
||||
library_other=None,
|
||||
):
|
||||
skip_reason = None
|
||||
if len(whitelist_library_type) > 0:
|
||||
if isinstance(library_type, (list, tuple, set)):
|
||||
for library_type_item in library_type:
|
||||
if library_type_item.lower() not in whitelist_library_type:
|
||||
skip_reason = (
|
||||
f"{library_type_item} is not in whitelist_library_type"
|
||||
)
|
||||
else:
|
||||
if library_type.lower() not in whitelist_library_type:
|
||||
skip_reason = f"{library_type} is not in whitelist_library_type"
|
||||
|
||||
# if whitelist is not empty and library is not in whitelist
|
||||
if len(whitelist_library) > 0:
|
||||
if library_other:
|
||||
if library_title.lower() not in [
|
||||
x.lower() for x in whitelist_library
|
||||
] and library_other.lower() not in [x.lower() for x in whitelist_library]:
|
||||
if skip_reason:
|
||||
skip_reason = (
|
||||
skip_reason
|
||||
+ " and "
|
||||
+ f"{library_title} is not in whitelist_library"
|
||||
)
|
||||
else:
|
||||
skip_reason = f"{library_title} is not in whitelist_library"
|
||||
else:
|
||||
if library_title.lower() not in [x.lower() for x in whitelist_library]:
|
||||
if skip_reason:
|
||||
skip_reason = (
|
||||
skip_reason
|
||||
+ " and "
|
||||
+ f"{library_title} is not in whitelist_library"
|
||||
)
|
||||
else:
|
||||
skip_reason = f"{library_title} is not in whitelist_library"
|
||||
|
||||
return skip_reason
|
||||
|
||||
|
||||
def show_title_dict(user_list: dict):
|
||||
try:
|
||||
show_output_dict = {}
|
||||
|
||||
show_output_keys = user_list.keys()
|
||||
show_output_keys = [dict(x) for x in list(show_output_keys)]
|
||||
for show_key in show_output_keys:
|
||||
for provider_key, provider_value in show_key.items():
|
||||
# Skip title
|
||||
if provider_key.lower() == "title":
|
||||
continue
|
||||
if provider_key.lower() not in show_output_dict:
|
||||
show_output_dict[provider_key.lower()] = []
|
||||
if provider_key.lower() == "locations":
|
||||
for show_location in provider_value:
|
||||
show_output_dict[provider_key.lower()].append(show_location)
|
||||
else:
|
||||
show_output_dict[provider_key.lower()].append(
|
||||
provider_value.lower()
|
||||
)
|
||||
|
||||
return show_output_dict
|
||||
except Exception:
|
||||
logger("Generating show_output_dict failed, skipping", 1)
|
||||
return {}
|
||||
|
||||
|
||||
def episode_title_dict(user_list: dict):
|
||||
try:
|
||||
episode_output_dict = {}
|
||||
for show in user_list:
|
||||
for season in user_list[show]:
|
||||
for episode in user_list[show][season]:
|
||||
for episode_key, episode_value in episode.items():
|
||||
if episode_key.lower() not in episode_output_dict:
|
||||
episode_output_dict[episode_key.lower()] = []
|
||||
if episode_key == "locations":
|
||||
for episode_location in episode_value:
|
||||
episode_output_dict[episode_key.lower()].append(
|
||||
episode_location
|
||||
)
|
||||
else:
|
||||
episode_output_dict[episode_key.lower()].append(
|
||||
episode_value.lower()
|
||||
)
|
||||
|
||||
return episode_output_dict
|
||||
except Exception:
|
||||
logger("Generating episode_output_dict failed, skipping", 1)
|
||||
return {}
|
||||
|
||||
|
||||
def movies_title_dict(user_list: dict):
|
||||
try:
|
||||
movies_output_dict = {}
|
||||
for movie in user_list:
|
||||
for movie_key, movie_value in movie.items():
|
||||
if movie_key.lower() not in movies_output_dict:
|
||||
movies_output_dict[movie_key.lower()] = []
|
||||
if movie_key == "locations":
|
||||
for movie_location in movie_value:
|
||||
movies_output_dict[movie_key.lower()].append(movie_location)
|
||||
else:
|
||||
movies_output_dict[movie_key.lower()].append(movie_value.lower())
|
||||
|
||||
return movies_output_dict
|
||||
except Exception:
|
||||
logger("Generating movies_output_dict failed, skipping", 1)
|
||||
return {}
|
||||
|
||||
|
||||
def generate_library_guids_dict(user_list: dict):
|
||||
# Handle the case where user_list is empty or does not contain the expected keys and values
|
||||
if not user_list:
|
||||
return {}, {}, {}
|
||||
|
||||
show_output_dict = show_title_dict(user_list)
|
||||
episode_output_dict = episode_title_dict(user_list)
|
||||
movies_output_dict = movies_title_dict(user_list)
|
||||
|
||||
return show_output_dict, episode_output_dict, movies_output_dict
|
||||
193
src/main.py
193
src/main.py
|
|
@ -5,10 +5,18 @@ from time import sleep, perf_counter
|
|||
from src.functions import (
|
||||
logger,
|
||||
str_to_bool,
|
||||
search_mapping,
|
||||
cleanup_watched,
|
||||
setup_black_white_lists,
|
||||
)
|
||||
from src.users import (
|
||||
generate_user_list,
|
||||
combine_user_lists,
|
||||
filter_user_lists,
|
||||
generate_server_users,
|
||||
)
|
||||
from src.watched import (
|
||||
cleanup_watched,
|
||||
)
|
||||
from src.black_white import setup_black_white_lists
|
||||
|
||||
from src.plex import Plex
|
||||
from src.jellyfin import Jellyfin
|
||||
|
||||
|
|
@ -18,107 +26,27 @@ load_dotenv(override=True)
|
|||
def setup_users(
|
||||
server_1, server_2, blacklist_users, whitelist_users, user_mapping=None
|
||||
):
|
||||
server_1_users = generate_user_list(server_1)
|
||||
server_2_users = generate_user_list(server_2)
|
||||
|
||||
# generate list of users from server 1 and server 2
|
||||
server_1_type = server_1[0]
|
||||
server_1_connection = server_1[1]
|
||||
server_2_type = server_2[0]
|
||||
server_2_connection = server_2[1]
|
||||
logger(f"Server 1: {server_1_type} {server_1_connection}", 0)
|
||||
logger(f"Server 2: {server_2_type} {server_2_connection}", 0)
|
||||
|
||||
server_1_users = []
|
||||
if server_1_type == "plex":
|
||||
server_1_users = [x.title.lower() for x in server_1_connection.users]
|
||||
elif server_1_type == "jellyfin":
|
||||
server_1_users = [key.lower() for key in server_1_connection.users.keys()]
|
||||
|
||||
server_2_users = []
|
||||
if server_2_type == "plex":
|
||||
server_2_users = [x.title.lower() for x in server_2_connection.users]
|
||||
elif server_2_type == "jellyfin":
|
||||
server_2_users = [key.lower() for key in server_2_connection.users.keys()]
|
||||
|
||||
# combined list of overlapping users from plex and jellyfin
|
||||
users = {}
|
||||
|
||||
for server_1_user in server_1_users:
|
||||
if user_mapping:
|
||||
jellyfin_plex_mapped_user = search_mapping(user_mapping, server_1_user)
|
||||
if jellyfin_plex_mapped_user:
|
||||
users[server_1_user] = jellyfin_plex_mapped_user
|
||||
continue
|
||||
|
||||
if server_1_user in server_2_users:
|
||||
users[server_1_user] = server_1_user
|
||||
|
||||
for server_2_user in server_2_users:
|
||||
if user_mapping:
|
||||
plex_jellyfin_mapped_user = search_mapping(user_mapping, server_2_user)
|
||||
if plex_jellyfin_mapped_user:
|
||||
users[plex_jellyfin_mapped_user] = server_2_user
|
||||
continue
|
||||
|
||||
if server_2_user in server_1_users:
|
||||
users[server_2_user] = server_2_user
|
||||
|
||||
users = combine_user_lists(server_1_users, server_2_users, user_mapping)
|
||||
logger(f"User list that exist on both servers {users}", 1)
|
||||
|
||||
users_filtered = {}
|
||||
for user in users:
|
||||
# whitelist_user is not empty and user lowercase is not in whitelist lowercase
|
||||
if len(whitelist_users) > 0:
|
||||
if user not in whitelist_users and users[user] not in whitelist_users:
|
||||
logger(f"{user} or {users[user]} is not in whitelist", 1)
|
||||
continue
|
||||
|
||||
if user not in blacklist_users and users[user] not in blacklist_users:
|
||||
users_filtered[user] = users[user]
|
||||
|
||||
users_filtered = filter_user_lists(users, blacklist_users, whitelist_users)
|
||||
logger(f"Filtered user list {users_filtered}", 1)
|
||||
|
||||
if server_1_type == "plex":
|
||||
output_server_1_users = []
|
||||
for plex_user in server_1_connection.users:
|
||||
if (
|
||||
plex_user.title.lower() in users_filtered.keys()
|
||||
or plex_user.title.lower() in users_filtered.values()
|
||||
):
|
||||
output_server_1_users.append(plex_user)
|
||||
elif server_1_type == "jellyfin":
|
||||
output_server_1_users = {}
|
||||
for jellyfin_user, jellyfin_id in server_1_connection.users.items():
|
||||
if (
|
||||
jellyfin_user.lower() in users_filtered.keys()
|
||||
or jellyfin_user.lower() in users_filtered.values()
|
||||
):
|
||||
output_server_1_users[jellyfin_user] = jellyfin_id
|
||||
output_server_1_users = generate_server_users(server_1, users_filtered)
|
||||
output_server_2_users = generate_server_users(server_2, users_filtered)
|
||||
|
||||
if server_2_type == "plex":
|
||||
output_server_2_users = []
|
||||
for plex_user in server_2_connection.users:
|
||||
if (
|
||||
plex_user.title.lower() in users_filtered.keys()
|
||||
or plex_user.title.lower() in users_filtered.values()
|
||||
):
|
||||
output_server_2_users.append(plex_user)
|
||||
elif server_2_type == "jellyfin":
|
||||
output_server_2_users = {}
|
||||
for jellyfin_user, jellyfin_id in server_2_connection.users.items():
|
||||
if (
|
||||
jellyfin_user.lower() in users_filtered.keys()
|
||||
or jellyfin_user.lower() in users_filtered.values()
|
||||
):
|
||||
output_server_2_users[jellyfin_user] = jellyfin_id
|
||||
|
||||
if len(output_server_1_users) == 0:
|
||||
# Check if users is none or empty
|
||||
if output_server_1_users is None or len(output_server_1_users) == 0:
|
||||
raise Exception(
|
||||
f"No users found for server 1 {server_1_type}, users found {users}, filtered users {users_filtered}, server 1 users {server_1_connection.users}"
|
||||
f"No users found for server 1 {server_1[0]}, users found {users}, filtered users {users_filtered}, server 1 users {server_1[1].users}"
|
||||
)
|
||||
|
||||
if len(output_server_2_users) == 0:
|
||||
if output_server_2_users is None or len(output_server_2_users) == 0:
|
||||
raise Exception(
|
||||
f"No users found for server 2 {server_2_type}, users found {users} filtered users {users_filtered}, server 2 users {server_2_connection.users}"
|
||||
f"No users found for server 2 {server_2[0]}, users found {users} filtered users {users_filtered}, server 2 users {server_2[1].users}"
|
||||
)
|
||||
|
||||
logger(f"Server 1 users: {output_server_1_users}", 1)
|
||||
|
|
@ -264,6 +192,53 @@ def update_server_watched(
|
|||
)
|
||||
|
||||
|
||||
def should_sync_server(server_1_type, server_2_type):
|
||||
sync_from_plex_to_jellyfin = str_to_bool(
|
||||
os.getenv("SYNC_FROM_PLEX_TO_JELLYFIN", "True")
|
||||
)
|
||||
sync_from_jelly_to_plex = str_to_bool(
|
||||
os.getenv("SYNC_FROM_JELLYFIN_TO_PLEX", "True")
|
||||
)
|
||||
sync_from_plex_to_plex = str_to_bool(os.getenv("SYNC_FROM_PLEX_TO_PLEX", "True"))
|
||||
sync_from_jelly_to_jellyfin = str_to_bool(
|
||||
os.getenv("SYNC_FROM_JELLYFIN_TO_JELLYFIN", "True")
|
||||
)
|
||||
|
||||
if (
|
||||
server_1_type == "plex"
|
||||
and server_2_type == "plex"
|
||||
and not sync_from_plex_to_plex
|
||||
):
|
||||
logger("Sync between plex and plex is disabled", 1)
|
||||
return False
|
||||
|
||||
if (
|
||||
server_1_type == "plex"
|
||||
and server_2_type == "jellyfin"
|
||||
and not sync_from_jelly_to_plex
|
||||
):
|
||||
logger("Sync from jellyfin to plex disabled", 1)
|
||||
return False
|
||||
|
||||
if (
|
||||
server_1_type == "jellyfin"
|
||||
and server_2_type == "jellyfin"
|
||||
and not sync_from_jelly_to_jellyfin
|
||||
):
|
||||
logger("Sync between jellyfin and jellyfin is disabled", 1)
|
||||
return False
|
||||
|
||||
if (
|
||||
server_1_type == "jellyfin"
|
||||
and server_2_type == "plex"
|
||||
and not sync_from_plex_to_jellyfin
|
||||
):
|
||||
logger("Sync from plex to jellyfin is disabled", 1)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def main_loop():
|
||||
logfile = os.getenv("LOGFILE", "log.log")
|
||||
# Delete logfile if it exists
|
||||
|
|
@ -370,21 +345,23 @@ def main_loop():
|
|||
1,
|
||||
)
|
||||
|
||||
update_server_watched(
|
||||
server_1,
|
||||
server_2_watched_filtered,
|
||||
user_mapping,
|
||||
library_mapping,
|
||||
dryrun,
|
||||
)
|
||||
if should_sync_server(server_1[0], server_2[0]):
|
||||
update_server_watched(
|
||||
server_1,
|
||||
server_2_watched_filtered,
|
||||
user_mapping,
|
||||
library_mapping,
|
||||
dryrun,
|
||||
)
|
||||
|
||||
update_server_watched(
|
||||
server_2,
|
||||
server_1_watched_filtered,
|
||||
user_mapping,
|
||||
library_mapping,
|
||||
dryrun,
|
||||
)
|
||||
if should_sync_server(server_2[0], server_1[0]):
|
||||
update_server_watched(
|
||||
server_2,
|
||||
server_1_watched_filtered,
|
||||
user_mapping,
|
||||
library_mapping,
|
||||
dryrun,
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
|
|
|
|||
113
src/plex.py
113
src/plex.py
|
|
@ -7,10 +7,13 @@ from plexapi.myplex import MyPlexAccount
|
|||
from src.functions import (
|
||||
logger,
|
||||
search_mapping,
|
||||
check_skip_logic,
|
||||
generate_library_guids_dict,
|
||||
future_thread_executor,
|
||||
)
|
||||
from src.library import (
|
||||
check_skip_logic,
|
||||
generate_library_guids_dict,
|
||||
)
|
||||
|
||||
|
||||
# Bypass hostname validation for ssl. Taken from https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186
|
||||
class HostNameIgnoringAdapter(requests.adapters.HTTPAdapter):
|
||||
|
|
@ -49,7 +52,7 @@ def get_user_library_watched_show(show):
|
|||
m = re.match(r"(.*)://(.*)", guid.id)
|
||||
guid_source, guid_id = m.group(1).lower(), m.group(2)
|
||||
episode_guids_temp[guid_source] = guid_id
|
||||
except:
|
||||
except Exception:
|
||||
logger(
|
||||
f"Plex: Failed to get guids for {episode.title} in {show.title}, Using location only",
|
||||
1,
|
||||
|
|
@ -66,7 +69,7 @@ def get_user_library_watched_show(show):
|
|||
|
||||
return show_guids, episode_guids
|
||||
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
return {}, {}
|
||||
|
||||
|
||||
|
|
@ -141,6 +144,26 @@ def get_user_library_watched(user, user_plex, library):
|
|||
return {}
|
||||
|
||||
|
||||
def find_video(plex_search, video_ids):
|
||||
try:
|
||||
for location in plex_search.locations:
|
||||
if location.split("/")[-1] in video_ids["locations"]:
|
||||
return True
|
||||
|
||||
for guid in plex_search.guids:
|
||||
guid_source = re.search(r"(.*)://", guid.id).group(1).lower()
|
||||
guid_id = re.search(r"://(.*)", guid.id).group(1)
|
||||
|
||||
# If show provider source and show provider id are in videos_shows_ids exactly, then the show is in the list
|
||||
if guid_source in video_ids.keys():
|
||||
if guid_id in video_ids[guid_source]:
|
||||
return True
|
||||
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def update_user_watched(user, user_plex, library, videos, dryrun):
|
||||
try:
|
||||
logger(f"Plex: Updating watched for {user.title} in library {library}", 1)
|
||||
|
|
@ -157,26 +180,7 @@ def update_user_watched(user, user_plex, library, videos, dryrun):
|
|||
library_videos = user_plex.library.section(library)
|
||||
if videos_movies_ids:
|
||||
for movies_search in library_videos.search(unwatched=True):
|
||||
movie_found = False
|
||||
for movie_location in movies_search.locations:
|
||||
if movie_location.split("/")[-1] in videos_movies_ids["locations"]:
|
||||
movie_found = True
|
||||
break
|
||||
|
||||
if not movie_found:
|
||||
for movie_guid in movies_search.guids:
|
||||
movie_guid_source = (
|
||||
re.search(r"(.*)://", movie_guid.id).group(1).lower()
|
||||
)
|
||||
movie_guid_id = re.search(r"://(.*)", movie_guid.id).group(1)
|
||||
|
||||
# If movie provider source and movie provider id are in videos_movie_ids exactly, then the movie is in the list
|
||||
if movie_guid_source in videos_movies_ids.keys():
|
||||
if movie_guid_id in videos_movies_ids[movie_guid_source]:
|
||||
movie_found = True
|
||||
break
|
||||
|
||||
if movie_found:
|
||||
if find_video(movies_search, videos_movies_ids):
|
||||
msg = f"{movies_search.title} as watched for {user.title} in {library} for Plex"
|
||||
if not dryrun:
|
||||
logger(f"Marked {msg}", 0)
|
||||
|
|
@ -191,64 +195,9 @@ def update_user_watched(user, user_plex, library, videos, dryrun):
|
|||
|
||||
if videos_shows_ids and videos_episodes_ids:
|
||||
for show_search in library_videos.search(unwatched=True):
|
||||
show_found = False
|
||||
for show_location in show_search.locations:
|
||||
if show_location.split("/")[-1] in videos_shows_ids["locations"]:
|
||||
show_found = True
|
||||
break
|
||||
|
||||
if not show_found:
|
||||
for show_guid in show_search.guids:
|
||||
show_guid_source = (
|
||||
re.search(r"(.*)://", show_guid.id).group(1).lower()
|
||||
)
|
||||
show_guid_id = re.search(r"://(.*)", show_guid.id).group(1)
|
||||
|
||||
# If show provider source and show provider id are in videos_shows_ids exactly, then the show is in the list
|
||||
if show_guid_source in videos_shows_ids.keys():
|
||||
if show_guid_id in videos_shows_ids[show_guid_source]:
|
||||
show_found = True
|
||||
break
|
||||
|
||||
if show_found:
|
||||
if find_video(show_search, videos_shows_ids):
|
||||
for episode_search in show_search.episodes():
|
||||
episode_found = False
|
||||
|
||||
for episode_location in episode_search.locations:
|
||||
if (
|
||||
episode_location.split("/")[-1]
|
||||
in videos_episodes_ids["locations"]
|
||||
):
|
||||
episode_found = True
|
||||
break
|
||||
|
||||
if not episode_found:
|
||||
try:
|
||||
for episode_guid in episode_search.guids:
|
||||
episode_guid_source = (
|
||||
re.search(r"(.*)://", episode_guid.id)
|
||||
.group(1)
|
||||
.lower()
|
||||
)
|
||||
episode_guid_id = re.search(
|
||||
r"://(.*)", episode_guid.id
|
||||
).group(1)
|
||||
|
||||
# If episode provider source and episode provider id are in videos_episodes_ids exactly, then the episode is in the list
|
||||
if episode_guid_source in videos_episodes_ids.keys():
|
||||
if (
|
||||
episode_guid_id
|
||||
in videos_episodes_ids[episode_guid_source]
|
||||
):
|
||||
episode_found = True
|
||||
break
|
||||
except Exception as e:
|
||||
logger(
|
||||
f"Plex: Failed to get episode guid for {episode_search.title}, Error: {e}",
|
||||
1,
|
||||
)
|
||||
|
||||
if episode_found:
|
||||
if find_video(episode_search, videos_episodes_ids):
|
||||
msg = f"{show_search.title} {episode_search.title} as watched for {user.title} in {library} for Plex"
|
||||
if not dryrun:
|
||||
logger(f"Marked {msg}", 0)
|
||||
|
|
@ -381,7 +330,7 @@ class Plex:
|
|||
|
||||
if skip_reason:
|
||||
logger(
|
||||
f"Plex: Skipping library {library_title} {skip_reason}", 1
|
||||
f"Plex: Skipping library {library_title}: {skip_reason}", 1
|
||||
)
|
||||
continue
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,83 @@
|
|||
from src.functions import (
|
||||
logger,
|
||||
search_mapping,
|
||||
)
|
||||
|
||||
|
||||
def generate_user_list(server):
|
||||
# generate list of users from server 1 and server 2
|
||||
server_type = server[0]
|
||||
server_connection = server[1]
|
||||
|
||||
server_users = []
|
||||
if server_type == "plex":
|
||||
server_users = [x.title.lower() for x in server_connection.users]
|
||||
elif server_type == "jellyfin":
|
||||
server_users = [key.lower() for key in server_connection.users.keys()]
|
||||
|
||||
return server_users
|
||||
|
||||
|
||||
def combine_user_lists(server_1_users, server_2_users, user_mapping):
|
||||
# combined list of overlapping users from plex and jellyfin
|
||||
users = {}
|
||||
|
||||
for server_1_user in server_1_users:
|
||||
if user_mapping:
|
||||
mapped_user = search_mapping(user_mapping, server_1_user)
|
||||
if mapped_user in server_2_users:
|
||||
users[server_1_user] = mapped_user
|
||||
continue
|
||||
|
||||
if server_1_user in server_2_users:
|
||||
users[server_1_user] = server_1_user
|
||||
|
||||
for server_2_user in server_2_users:
|
||||
if user_mapping:
|
||||
mapped_user = search_mapping(user_mapping, server_2_user)
|
||||
if mapped_user in server_1_users:
|
||||
users[mapped_user] = server_2_user
|
||||
continue
|
||||
|
||||
if server_2_user in server_1_users:
|
||||
users[server_2_user] = server_2_user
|
||||
|
||||
return users
|
||||
|
||||
|
||||
def filter_user_lists(users, blacklist_users, whitelist_users):
|
||||
users_filtered = {}
|
||||
for user in users:
|
||||
# whitelist_user is not empty and user lowercase is not in whitelist lowercase
|
||||
if len(whitelist_users) > 0:
|
||||
if user not in whitelist_users and users[user] not in whitelist_users:
|
||||
logger(f"{user} or {users[user]} is not in whitelist", 1)
|
||||
continue
|
||||
|
||||
if user not in blacklist_users and users[user] not in blacklist_users:
|
||||
users_filtered[user] = users[user]
|
||||
|
||||
return users_filtered
|
||||
|
||||
|
||||
def generate_server_users(server, users):
|
||||
server_users = None
|
||||
|
||||
if server[0] == "plex":
|
||||
server_users = []
|
||||
for plex_user in server[1].users:
|
||||
if (
|
||||
plex_user.title.lower() in users.keys()
|
||||
or plex_user.title.lower() in users.values()
|
||||
):
|
||||
server_users.append(plex_user)
|
||||
elif server[0] == "jellyfin":
|
||||
server_users = {}
|
||||
for jellyfin_user, jellyfin_id in server[1].users.items():
|
||||
if (
|
||||
jellyfin_user.lower() in users.keys()
|
||||
or jellyfin_user.lower() in users.values()
|
||||
):
|
||||
server_users[jellyfin_user] = jellyfin_id
|
||||
|
||||
return server_users
|
||||
|
|
@ -0,0 +1,192 @@
|
|||
import copy
|
||||
|
||||
from src.functions import (
|
||||
logger,
|
||||
search_mapping,
|
||||
)
|
||||
|
||||
from src.library import generate_library_guids_dict
|
||||
|
||||
|
||||
def combine_watched_dicts(dicts: list):
|
||||
combined_dict = {}
|
||||
for single_dict in dicts:
|
||||
for key, value in single_dict.items():
|
||||
if key not in combined_dict:
|
||||
combined_dict[key] = {}
|
||||
for subkey, subvalue in value.items():
|
||||
if subkey in combined_dict[key]:
|
||||
# If the subkey already exists in the combined dictionary,
|
||||
# check if the values are different and raise an exception if they are
|
||||
if combined_dict[key][subkey] != subvalue:
|
||||
raise ValueError(
|
||||
f"Conflicting values for subkey '{subkey}' under key '{key}'"
|
||||
)
|
||||
else:
|
||||
# If the subkey does not exist in the combined dictionary, add it
|
||||
combined_dict[key][subkey] = subvalue
|
||||
|
||||
return combined_dict
|
||||
|
||||
|
||||
def cleanup_watched(
|
||||
watched_list_1, watched_list_2, user_mapping=None, library_mapping=None
|
||||
):
|
||||
modified_watched_list_1 = copy.deepcopy(watched_list_1)
|
||||
|
||||
# remove entries from watched_list_1 that are in watched_list_2
|
||||
for user_1 in watched_list_1:
|
||||
user_other = None
|
||||
if user_mapping:
|
||||
user_other = search_mapping(user_mapping, user_1)
|
||||
user_2 = get_other(watched_list_2, user_1, user_other)
|
||||
if user_2 is None:
|
||||
continue
|
||||
|
||||
for library_1 in watched_list_1[user_1]:
|
||||
library_other = None
|
||||
if library_mapping:
|
||||
library_other = search_mapping(library_mapping, library_1)
|
||||
library_2 = get_other(watched_list_2[user_2], library_1, library_other)
|
||||
if library_2 is None:
|
||||
continue
|
||||
|
||||
(
|
||||
_,
|
||||
episode_watched_list_2_keys_dict,
|
||||
movies_watched_list_2_keys_dict,
|
||||
) = generate_library_guids_dict(watched_list_2[user_2][library_2])
|
||||
|
||||
# Movies
|
||||
if isinstance(watched_list_1[user_1][library_1], list):
|
||||
for movie in watched_list_1[user_1][library_1]:
|
||||
if is_movie_in_dict(movie, movies_watched_list_2_keys_dict):
|
||||
logger(f"Removing {movie} from {library_1}", 3)
|
||||
modified_watched_list_1[user_1][library_1].remove(movie)
|
||||
|
||||
# TV Shows
|
||||
elif isinstance(watched_list_1[user_1][library_1], dict):
|
||||
for show_key_1 in watched_list_1[user_1][library_1].keys():
|
||||
show_key_dict = dict(show_key_1)
|
||||
for season in watched_list_1[user_1][library_1][show_key_1]:
|
||||
for episode in watched_list_1[user_1][library_1][show_key_1][
|
||||
season
|
||||
]:
|
||||
if is_episode_in_dict(
|
||||
episode, episode_watched_list_2_keys_dict
|
||||
):
|
||||
if (
|
||||
episode
|
||||
in modified_watched_list_1[user_1][library_1][
|
||||
show_key_1
|
||||
][season]
|
||||
):
|
||||
logger(
|
||||
f"Removing {episode} from {show_key_dict['title']}",
|
||||
3,
|
||||
)
|
||||
modified_watched_list_1[user_1][library_1][
|
||||
show_key_1
|
||||
][season].remove(episode)
|
||||
|
||||
# Remove empty seasons
|
||||
if (
|
||||
len(
|
||||
modified_watched_list_1[user_1][library_1][show_key_1][
|
||||
season
|
||||
]
|
||||
)
|
||||
== 0
|
||||
):
|
||||
if (
|
||||
season
|
||||
in modified_watched_list_1[user_1][library_1][
|
||||
show_key_1
|
||||
]
|
||||
):
|
||||
logger(
|
||||
f"Removing {season} from {show_key_dict['title']} because it is empty",
|
||||
3,
|
||||
)
|
||||
del modified_watched_list_1[user_1][library_1][
|
||||
show_key_1
|
||||
][season]
|
||||
|
||||
# Remove empty shows
|
||||
if len(modified_watched_list_1[user_1][library_1][show_key_1]) == 0:
|
||||
if show_key_1 in modified_watched_list_1[user_1][library_1]:
|
||||
logger(
|
||||
f"Removing {show_key_dict['title']} because it is empty",
|
||||
3,
|
||||
)
|
||||
del modified_watched_list_1[user_1][library_1][show_key_1]
|
||||
|
||||
for user_1 in watched_list_1:
|
||||
for library_1 in watched_list_1[user_1]:
|
||||
if library_1 in modified_watched_list_1[user_1]:
|
||||
# If library is empty then remove it
|
||||
if len(modified_watched_list_1[user_1][library_1]) == 0:
|
||||
logger(f"Removing {library_1} from {user_1} because it is empty", 1)
|
||||
del modified_watched_list_1[user_1][library_1]
|
||||
|
||||
if user_1 in modified_watched_list_1:
|
||||
# If user is empty delete user
|
||||
if len(modified_watched_list_1[user_1]) == 0:
|
||||
logger(f"Removing {user_1} from watched list 1 because it is empty", 1)
|
||||
del modified_watched_list_1[user_1]
|
||||
|
||||
return modified_watched_list_1
|
||||
|
||||
|
||||
def get_other(watched_list, object_1, object_2):
|
||||
if object_1 in watched_list:
|
||||
return object_1
|
||||
elif object_2 in watched_list:
|
||||
return object_2
|
||||
else:
|
||||
logger(f"{object_1} and {object_2} not found in watched list 2", 1)
|
||||
return None
|
||||
|
||||
|
||||
def is_movie_in_dict(movie, movies_watched_list_2_keys_dict):
|
||||
# Iterate through the keys and values of the movie dictionary
|
||||
for movie_key, movie_value in movie.items():
|
||||
# If the key is "locations", check if the "locations" key is present in the movies_watched_list_2_keys_dict dictionary
|
||||
if movie_key == "locations":
|
||||
if "locations" in movies_watched_list_2_keys_dict.keys():
|
||||
# Iterate through the locations in the movie dictionary
|
||||
for location in movie_value:
|
||||
# If the location is in the movies_watched_list_2_keys_dict dictionary, return True
|
||||
if location in movies_watched_list_2_keys_dict["locations"]:
|
||||
return True
|
||||
# If the key is not "locations", check if the movie_key is present in the movies_watched_list_2_keys_dict dictionary
|
||||
else:
|
||||
if movie_key in movies_watched_list_2_keys_dict.keys():
|
||||
# If the movie_value is in the movies_watched_list_2_keys_dict dictionary, return True
|
||||
if movie_value in movies_watched_list_2_keys_dict[movie_key]:
|
||||
return True
|
||||
|
||||
# If the loop completes without finding a match, return False
|
||||
return False
|
||||
|
||||
|
||||
def is_episode_in_dict(episode, episode_watched_list_2_keys_dict):
|
||||
# Iterate through the keys and values of the episode dictionary
|
||||
for episode_key, episode_value in episode.items():
|
||||
# If the key is "locations", check if the "locations" key is present in the episode_watched_list_2_keys_dict dictionary
|
||||
if episode_key == "locations":
|
||||
if "locations" in episode_watched_list_2_keys_dict.keys():
|
||||
# Iterate through the locations in the episode dictionary
|
||||
for location in episode_value:
|
||||
# If the location is in the episode_watched_list_2_keys_dict dictionary, return True
|
||||
if location in episode_watched_list_2_keys_dict["locations"]:
|
||||
return True
|
||||
# If the key is not "locations", check if the episode_key is present in the episode_watched_list_2_keys_dict dictionary
|
||||
else:
|
||||
if episode_key in episode_watched_list_2_keys_dict.keys():
|
||||
# If the episode_value is in the episode_watched_list_2_keys_dict dictionary, return True
|
||||
if episode_value in episode_watched_list_2_keys_dict[episode_key]:
|
||||
return True
|
||||
|
||||
# If the loop completes without finding a match, return False
|
||||
return False
|
||||
|
|
@ -0,0 +1,78 @@
|
|||
import sys
|
||||
import os
|
||||
|
||||
# getting the name of the directory
|
||||
# where the this file is present.
|
||||
current = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
# Getting the parent directory name
|
||||
# where the current directory is present.
|
||||
parent = os.path.dirname(current)
|
||||
|
||||
# adding the parent directory to
|
||||
# the sys.path.
|
||||
sys.path.append(parent)
|
||||
|
||||
from src.black_white import setup_black_white_lists
|
||||
|
||||
|
||||
def test_setup_black_white_lists():
|
||||
# Simple
|
||||
blacklist_library = "library1, library2"
|
||||
whitelist_library = "library1, library2"
|
||||
blacklist_library_type = "library_type1, library_type2"
|
||||
whitelist_library_type = "library_type1, library_type2"
|
||||
blacklist_users = "user1, user2"
|
||||
whitelist_users = "user1, user2"
|
||||
|
||||
(
|
||||
results_blacklist_library,
|
||||
return_whitelist_library,
|
||||
return_blacklist_library_type,
|
||||
return_whitelist_library_type,
|
||||
return_blacklist_users,
|
||||
return_whitelist_users,
|
||||
) = setup_black_white_lists(
|
||||
blacklist_library,
|
||||
whitelist_library,
|
||||
blacklist_library_type,
|
||||
whitelist_library_type,
|
||||
blacklist_users,
|
||||
whitelist_users,
|
||||
)
|
||||
|
||||
assert results_blacklist_library == ["library1", "library2"]
|
||||
assert return_whitelist_library == ["library1", "library2"]
|
||||
assert return_blacklist_library_type == ["library_type1", "library_type2"]
|
||||
assert return_whitelist_library_type == ["library_type1", "library_type2"]
|
||||
assert return_blacklist_users == ["user1", "user2"]
|
||||
assert return_whitelist_users == ["user1", "user2"]
|
||||
|
||||
# Library Mapping and user mapping
|
||||
library_mapping = {"library1": "library3"}
|
||||
user_mapping = {"user1": "user3"}
|
||||
|
||||
(
|
||||
results_blacklist_library,
|
||||
return_whitelist_library,
|
||||
return_blacklist_library_type,
|
||||
return_whitelist_library_type,
|
||||
return_blacklist_users,
|
||||
return_whitelist_users,
|
||||
) = setup_black_white_lists(
|
||||
blacklist_library,
|
||||
whitelist_library,
|
||||
blacklist_library_type,
|
||||
whitelist_library_type,
|
||||
blacklist_users,
|
||||
whitelist_users,
|
||||
library_mapping,
|
||||
user_mapping,
|
||||
)
|
||||
|
||||
assert results_blacklist_library == ["library1", "library2", "library3"]
|
||||
assert return_whitelist_library == ["library1", "library2", "library3"]
|
||||
assert return_blacklist_library_type == ["library_type1", "library_type2"]
|
||||
assert return_whitelist_library_type == ["library_type1", "library_type2"]
|
||||
assert return_blacklist_users == ["user1", "user2", "user3"]
|
||||
assert return_whitelist_users == ["user1", "user2", "user3"]
|
||||
|
|
@ -0,0 +1,302 @@
|
|||
import sys
|
||||
import os
|
||||
|
||||
# getting the name of the directory
|
||||
# where the this file is present.
|
||||
current = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
# Getting the parent directory name
|
||||
# where the current directory is present.
|
||||
parent = os.path.dirname(current)
|
||||
|
||||
# adding the parent directory to
|
||||
# the sys.path.
|
||||
sys.path.append(parent)
|
||||
|
||||
from src.functions import (
|
||||
search_mapping,
|
||||
)
|
||||
|
||||
from src.library import (
|
||||
check_skip_logic,
|
||||
check_blacklist_logic,
|
||||
check_whitelist_logic,
|
||||
show_title_dict,
|
||||
episode_title_dict,
|
||||
movies_title_dict,
|
||||
generate_library_guids_dict,
|
||||
)
|
||||
|
||||
blacklist_library = ["TV Shows"]
|
||||
whitelist_library = ["Movies"]
|
||||
blacklist_library_type = ["episodes"]
|
||||
whitelist_library_type = ["movies"]
|
||||
library_mapping = {"Shows": "TV Shows", "Movie": "Movies"}
|
||||
|
||||
show_list = {
|
||||
frozenset(
|
||||
{
|
||||
("locations", ("The Last of Us",)),
|
||||
("tmdb", "100088"),
|
||||
("imdb", "tt3581920"),
|
||||
("tvdb", "392256"),
|
||||
("title", "The Last of Us"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt11957006",
|
||||
"tmdb": "2181581",
|
||||
"tvdb": "8444132",
|
||||
"locations": (
|
||||
"The Last of Us - S01E01 - When You're Lost in the Darkness WEBDL-1080p.mkv",
|
||||
),
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
movie_list = [
|
||||
{
|
||||
"title": "Coco",
|
||||
"imdb": "tt2380307",
|
||||
"tmdb": "354912",
|
||||
"locations": ("Coco (2017) Remux-2160p.mkv", "Coco (2017) Remux-1080p.mkv"),
|
||||
}
|
||||
]
|
||||
|
||||
show_titles = {
|
||||
"imdb": ["tt3581920"],
|
||||
"locations": ["The Last of Us"],
|
||||
"tmdb": ["100088"],
|
||||
"tvdb": ["392256"],
|
||||
}
|
||||
episode_titles = {
|
||||
"imdb": ["tt11957006"],
|
||||
"locations": [
|
||||
"The Last of Us - S01E01 - When You're Lost in the Darkness WEBDL-1080p.mkv"
|
||||
],
|
||||
"tmdb": ["2181581"],
|
||||
"tvdb": ["8444132"],
|
||||
}
|
||||
movie_titles = {
|
||||
"imdb": ["tt2380307"],
|
||||
"locations": ["Coco (2017) Remux-2160p.mkv", "Coco (2017) Remux-1080p.mkv"],
|
||||
"title": ["coco"],
|
||||
"tmdb": ["354912"],
|
||||
}
|
||||
|
||||
|
||||
def test_check_skip_logic():
|
||||
# Failes
|
||||
library_title = "Test"
|
||||
library_type = "movies"
|
||||
skip_reason = check_skip_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
blacklist_library,
|
||||
whitelist_library,
|
||||
blacklist_library_type,
|
||||
whitelist_library_type,
|
||||
library_mapping,
|
||||
)
|
||||
|
||||
assert skip_reason == "Test is not in whitelist_library"
|
||||
|
||||
library_title = "Shows"
|
||||
library_type = "episodes"
|
||||
skip_reason = check_skip_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
blacklist_library,
|
||||
whitelist_library,
|
||||
blacklist_library_type,
|
||||
whitelist_library_type,
|
||||
library_mapping,
|
||||
)
|
||||
|
||||
assert (
|
||||
skip_reason
|
||||
== "episodes is in blacklist_library_type and TV Shows is in blacklist_library and "
|
||||
+ "episodes is not in whitelist_library_type and Shows is not in whitelist_library"
|
||||
)
|
||||
|
||||
# Passes
|
||||
library_title = "Movie"
|
||||
library_type = "movies"
|
||||
skip_reason = check_skip_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
blacklist_library,
|
||||
whitelist_library,
|
||||
blacklist_library_type,
|
||||
whitelist_library_type,
|
||||
library_mapping,
|
||||
)
|
||||
|
||||
assert skip_reason == None
|
||||
|
||||
|
||||
def test_check_blacklist_logic():
|
||||
# Fails
|
||||
library_title = "Shows"
|
||||
library_type = "episodes"
|
||||
library_other = search_mapping(library_mapping, library_title)
|
||||
skip_reason = check_blacklist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
blacklist_library,
|
||||
blacklist_library_type,
|
||||
library_other,
|
||||
)
|
||||
|
||||
assert (
|
||||
skip_reason
|
||||
== "episodes is in blacklist_library_type and TV Shows is in blacklist_library"
|
||||
)
|
||||
|
||||
library_title = "TV Shows"
|
||||
library_type = "episodes"
|
||||
library_other = search_mapping(library_mapping, library_title)
|
||||
skip_reason = check_blacklist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
blacklist_library,
|
||||
blacklist_library_type,
|
||||
library_other,
|
||||
)
|
||||
|
||||
assert (
|
||||
skip_reason
|
||||
== "episodes is in blacklist_library_type and TV Shows is in blacklist_library"
|
||||
)
|
||||
|
||||
# Passes
|
||||
library_title = "Movie"
|
||||
library_type = "movies"
|
||||
library_other = search_mapping(library_mapping, library_title)
|
||||
skip_reason = check_blacklist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
blacklist_library,
|
||||
blacklist_library_type,
|
||||
library_other,
|
||||
)
|
||||
|
||||
assert skip_reason == None
|
||||
|
||||
library_title = "Movies"
|
||||
library_type = "movies"
|
||||
library_other = search_mapping(library_mapping, library_title)
|
||||
skip_reason = check_blacklist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
blacklist_library,
|
||||
blacklist_library_type,
|
||||
library_other,
|
||||
)
|
||||
|
||||
assert skip_reason == None
|
||||
|
||||
|
||||
def test_check_whitelist_logic():
|
||||
# Fails
|
||||
library_title = "Shows"
|
||||
library_type = "episodes"
|
||||
library_other = search_mapping(library_mapping, library_title)
|
||||
skip_reason = check_whitelist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
whitelist_library,
|
||||
whitelist_library_type,
|
||||
library_other,
|
||||
)
|
||||
|
||||
assert (
|
||||
skip_reason
|
||||
== "episodes is not in whitelist_library_type and Shows is not in whitelist_library"
|
||||
)
|
||||
|
||||
library_title = "TV Shows"
|
||||
library_type = "episodes"
|
||||
library_other = search_mapping(library_mapping, library_title)
|
||||
skip_reason = check_whitelist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
whitelist_library,
|
||||
whitelist_library_type,
|
||||
library_other,
|
||||
)
|
||||
|
||||
assert (
|
||||
skip_reason
|
||||
== "episodes is not in whitelist_library_type and TV Shows is not in whitelist_library"
|
||||
)
|
||||
|
||||
# Passes
|
||||
library_title = "Movie"
|
||||
library_type = "movies"
|
||||
library_other = search_mapping(library_mapping, library_title)
|
||||
skip_reason = check_whitelist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
whitelist_library,
|
||||
whitelist_library_type,
|
||||
library_other,
|
||||
)
|
||||
|
||||
assert skip_reason == None
|
||||
|
||||
library_title = "Movies"
|
||||
library_type = "movies"
|
||||
library_other = search_mapping(library_mapping, library_title)
|
||||
skip_reason = check_whitelist_logic(
|
||||
library_title,
|
||||
library_type,
|
||||
whitelist_library,
|
||||
whitelist_library_type,
|
||||
library_other,
|
||||
)
|
||||
|
||||
assert skip_reason == None
|
||||
|
||||
|
||||
def test_show_title_dict():
|
||||
show_titles_dict = show_title_dict(show_list)
|
||||
|
||||
assert show_titles_dict == show_titles
|
||||
|
||||
|
||||
def test_episode_title_dict():
|
||||
episode_titles_dict = episode_title_dict(show_list)
|
||||
|
||||
assert episode_titles_dict == episode_titles
|
||||
|
||||
|
||||
def test_movies_title_dict():
|
||||
movies_titles_dict = movies_title_dict(movie_list)
|
||||
|
||||
assert movies_titles_dict == movie_titles
|
||||
|
||||
|
||||
def test_generate_library_guids_dict():
|
||||
# Test with shows
|
||||
(
|
||||
show_titles_dict,
|
||||
episode_titles_dict,
|
||||
movies_titles_dict,
|
||||
) = generate_library_guids_dict(show_list)
|
||||
|
||||
assert show_titles_dict == show_titles
|
||||
assert episode_titles_dict == episode_titles
|
||||
assert movies_titles_dict == {}
|
||||
|
||||
# Test with movies
|
||||
(
|
||||
show_titles_dict,
|
||||
episode_titles_dict,
|
||||
movies_titles_dict,
|
||||
) = generate_library_guids_dict(movie_list)
|
||||
|
||||
assert show_titles_dict == {}
|
||||
assert episode_titles_dict == {}
|
||||
assert movies_titles_dict == movie_titles
|
||||
|
|
@ -13,7 +13,7 @@ parent = os.path.dirname(current)
|
|||
# the sys.path.
|
||||
sys.path.append(parent)
|
||||
|
||||
from src.main import setup_black_white_lists
|
||||
from src.black_white import setup_black_white_lists
|
||||
|
||||
|
||||
def test_setup_black_white_lists():
|
||||
|
|
|
|||
|
|
@ -0,0 +1,39 @@
|
|||
import sys
|
||||
import os
|
||||
|
||||
# getting the name of the directory
|
||||
# where the this file is present.
|
||||
current = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
# Getting the parent directory name
|
||||
# where the current directory is present.
|
||||
parent = os.path.dirname(current)
|
||||
|
||||
# adding the parent directory to
|
||||
# the sys.path.
|
||||
sys.path.append(parent)
|
||||
|
||||
from src.users import (
|
||||
combine_user_lists,
|
||||
filter_user_lists,
|
||||
)
|
||||
|
||||
|
||||
def test_combine_user_lists():
|
||||
server_1_users = ["test", "test3", "luigi311"]
|
||||
server_2_users = ["luigi311", "test2", "test3"]
|
||||
user_mapping = {"test2": "test"}
|
||||
|
||||
combined = combine_user_lists(server_1_users, server_2_users, user_mapping)
|
||||
|
||||
assert combined == {"luigi311": "luigi311", "test": "test2", "test3": "test3"}
|
||||
|
||||
|
||||
def test_filter_user_lists():
|
||||
users = {"luigi311": "luigi311", "test": "test2", "test3": "test3"}
|
||||
blacklist_users = ["test3"]
|
||||
whitelist_users = ["test", "luigi311"]
|
||||
|
||||
filtered = filter_user_lists(users, blacklist_users, whitelist_users)
|
||||
|
||||
assert filtered == {"test": "test2", "luigi311": "luigi311"}
|
||||
|
|
@ -1,301 +1,410 @@
|
|||
import sys
|
||||
import os
|
||||
|
||||
# getting the name of the directory
|
||||
# where the this file is present.
|
||||
current = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
# Getting the parent directory name
|
||||
# where the current directory is present.
|
||||
parent = os.path.dirname(current)
|
||||
|
||||
# adding the parent directory to
|
||||
# the sys.path.
|
||||
sys.path.append(parent)
|
||||
|
||||
from src.main import cleanup_watched
|
||||
|
||||
tv_shows_watched_list_1 = {
|
||||
frozenset(
|
||||
{
|
||||
("tvdb", "75710"),
|
||||
("title", "Criminal Minds"),
|
||||
("imdb", "tt0452046"),
|
||||
("locations", ("Criminal Minds",)),
|
||||
("tmdb", "4057"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt0550489",
|
||||
"tmdb": "282843",
|
||||
"tvdb": "176357",
|
||||
"locations": (
|
||||
"Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv",
|
||||
),
|
||||
},
|
||||
{
|
||||
"imdb": "tt0550487",
|
||||
"tmdb": "282861",
|
||||
"tvdb": "300385",
|
||||
"locations": ("Criminal Minds S01E02 Compulsion WEBDL-720p.mkv",),
|
||||
},
|
||||
]
|
||||
},
|
||||
frozenset({("title", "Test"), ("locations", ("Test",))}): {
|
||||
"Season 1": [
|
||||
{"locations": ("Test S01E01.mkv",)},
|
||||
{"locations": ("Test S01E02.mkv",)},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
movies_watched_list_1 = [
|
||||
{
|
||||
"imdb": "tt2380307",
|
||||
"tmdb": "354912",
|
||||
"title": "Coco",
|
||||
"locations": ("Coco (2017) Remux-1080p.mkv",),
|
||||
},
|
||||
{
|
||||
"tmdbcollection": "448150",
|
||||
"imdb": "tt1431045",
|
||||
"tmdb": "293660",
|
||||
"title": "Deadpool",
|
||||
"locations": ("Deadpool (2016) Remux-1080p.mkv",),
|
||||
},
|
||||
]
|
||||
|
||||
tv_shows_watched_list_2 = {
|
||||
frozenset(
|
||||
{
|
||||
("tvdb", "75710"),
|
||||
("title", "Criminal Minds"),
|
||||
("imdb", "tt0452046"),
|
||||
("locations", ("Criminal Minds",)),
|
||||
("tmdb", "4057"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt0550487",
|
||||
"tmdb": "282861",
|
||||
"tvdb": "300385",
|
||||
"locations": ("Criminal Minds S01E02 Compulsion WEBDL-720p.mkv",),
|
||||
},
|
||||
{
|
||||
"imdb": "tt0550498",
|
||||
"tmdb": "282865",
|
||||
"tvdb": "300474",
|
||||
"locations": (
|
||||
"Criminal Minds S01E03 Won't Get Fooled Again WEBDL-720p.mkv",
|
||||
),
|
||||
},
|
||||
]
|
||||
},
|
||||
frozenset({("title", "Test"), ("locations", ("Test",))}): {
|
||||
"Season 1": [
|
||||
{"locations": ("Test S01E02.mkv",)},
|
||||
{"locations": ("Test S01E03.mkv",)},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
movies_watched_list_2 = [
|
||||
{
|
||||
"imdb": "tt2380307",
|
||||
"tmdb": "354912",
|
||||
"title": "Coco",
|
||||
"locations": ("Coco (2017) Remux-1080p.mkv",),
|
||||
},
|
||||
{
|
||||
"imdb": "tt0384793",
|
||||
"tmdb": "9788",
|
||||
"tvdb": "9103",
|
||||
"title": "Accepted",
|
||||
"locations": ("Accepted (2006) Remux-1080p.mkv",),
|
||||
},
|
||||
]
|
||||
|
||||
# Test to see if objects get deleted all the way up to the root.
|
||||
tv_shows_2_watched_list_1 = {
|
||||
frozenset(
|
||||
{
|
||||
("tvdb", "75710"),
|
||||
("title", "Criminal Minds"),
|
||||
("imdb", "tt0452046"),
|
||||
("locations", ("Criminal Minds",)),
|
||||
("tmdb", "4057"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt0550489",
|
||||
"tmdb": "282843",
|
||||
"tvdb": "176357",
|
||||
"locations": (
|
||||
"Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv",
|
||||
),
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
expected_tv_show_watched_list_1 = {
|
||||
frozenset(
|
||||
{
|
||||
("tvdb", "75710"),
|
||||
("title", "Criminal Minds"),
|
||||
("imdb", "tt0452046"),
|
||||
("locations", ("Criminal Minds",)),
|
||||
("tmdb", "4057"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt0550489",
|
||||
"tmdb": "282843",
|
||||
"tvdb": "176357",
|
||||
"locations": (
|
||||
"Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv",
|
||||
),
|
||||
}
|
||||
]
|
||||
},
|
||||
frozenset({("title", "Test"), ("locations", ("Test",))}): {
|
||||
"Season 1": [{"locations": ("Test S01E01.mkv",)}]
|
||||
},
|
||||
}
|
||||
|
||||
expected_movie_watched_list_1 = [
|
||||
{
|
||||
"tmdbcollection": "448150",
|
||||
"imdb": "tt1431045",
|
||||
"tmdb": "293660",
|
||||
"title": "Deadpool",
|
||||
"locations": ("Deadpool (2016) Remux-1080p.mkv",),
|
||||
}
|
||||
]
|
||||
|
||||
expected_tv_show_watched_list_2 = {
|
||||
frozenset(
|
||||
{
|
||||
("tvdb", "75710"),
|
||||
("title", "Criminal Minds"),
|
||||
("imdb", "tt0452046"),
|
||||
("locations", ("Criminal Minds",)),
|
||||
("tmdb", "4057"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt0550498",
|
||||
"tmdb": "282865",
|
||||
"tvdb": "300474",
|
||||
"locations": (
|
||||
"Criminal Minds S01E03 Won't Get Fooled Again WEBDL-720p.mkv",
|
||||
),
|
||||
}
|
||||
]
|
||||
},
|
||||
frozenset({("title", "Test"), ("locations", ("Test",))}): {
|
||||
"Season 1": [{"locations": ("Test S01E03.mkv",)}]
|
||||
},
|
||||
}
|
||||
|
||||
expected_movie_watched_list_2 = [
|
||||
{
|
||||
"imdb": "tt0384793",
|
||||
"tmdb": "9788",
|
||||
"tvdb": "9103",
|
||||
"title": "Accepted",
|
||||
"locations": ("Accepted (2006) Remux-1080p.mkv",),
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def test_simple_cleanup_watched():
|
||||
user_watched_list_1 = {
|
||||
"user1": {
|
||||
"TV Shows": tv_shows_watched_list_1,
|
||||
"Movies": movies_watched_list_1,
|
||||
"Other Shows": tv_shows_2_watched_list_1,
|
||||
},
|
||||
}
|
||||
user_watched_list_2 = {
|
||||
"user1": {
|
||||
"TV Shows": tv_shows_watched_list_2,
|
||||
"Movies": movies_watched_list_2,
|
||||
"Other Shows": tv_shows_2_watched_list_1,
|
||||
}
|
||||
}
|
||||
|
||||
expected_watched_list_1 = {
|
||||
"user1": {
|
||||
"TV Shows": expected_tv_show_watched_list_1,
|
||||
"Movies": expected_movie_watched_list_1,
|
||||
}
|
||||
}
|
||||
|
||||
expected_watched_list_2 = {
|
||||
"user1": {
|
||||
"TV Shows": expected_tv_show_watched_list_2,
|
||||
"Movies": expected_movie_watched_list_2,
|
||||
}
|
||||
}
|
||||
|
||||
return_watched_list_1 = cleanup_watched(user_watched_list_1, user_watched_list_2)
|
||||
return_watched_list_2 = cleanup_watched(user_watched_list_2, user_watched_list_1)
|
||||
|
||||
assert return_watched_list_1 == expected_watched_list_1
|
||||
assert return_watched_list_2 == expected_watched_list_2
|
||||
|
||||
|
||||
def test_mapping_cleanup_watched():
|
||||
user_watched_list_1 = {
|
||||
"user1": {
|
||||
"TV Shows": tv_shows_watched_list_1,
|
||||
"Movies": movies_watched_list_1,
|
||||
"Other Shows": tv_shows_2_watched_list_1,
|
||||
},
|
||||
}
|
||||
user_watched_list_2 = {
|
||||
"user2": {
|
||||
"Shows": tv_shows_watched_list_2,
|
||||
"Movies": movies_watched_list_2,
|
||||
"Other Shows": tv_shows_2_watched_list_1,
|
||||
}
|
||||
}
|
||||
|
||||
expected_watched_list_1 = {
|
||||
"user1": {
|
||||
"TV Shows": expected_tv_show_watched_list_1,
|
||||
"Movies": expected_movie_watched_list_1,
|
||||
}
|
||||
}
|
||||
|
||||
expected_watched_list_2 = {
|
||||
"user2": {
|
||||
"Shows": expected_tv_show_watched_list_2,
|
||||
"Movies": expected_movie_watched_list_2,
|
||||
}
|
||||
}
|
||||
|
||||
user_mapping = {"user1": "user2"}
|
||||
library_mapping = {"TV Shows": "Shows"}
|
||||
|
||||
return_watched_list_1 = cleanup_watched(
|
||||
user_watched_list_1,
|
||||
user_watched_list_2,
|
||||
user_mapping=user_mapping,
|
||||
library_mapping=library_mapping,
|
||||
)
|
||||
return_watched_list_2 = cleanup_watched(
|
||||
user_watched_list_2,
|
||||
user_watched_list_1,
|
||||
user_mapping=user_mapping,
|
||||
library_mapping=library_mapping,
|
||||
)
|
||||
|
||||
assert return_watched_list_1 == expected_watched_list_1
|
||||
assert return_watched_list_2 == expected_watched_list_2
|
||||
import sys
|
||||
import os
|
||||
|
||||
# getting the name of the directory
|
||||
# where the this file is present.
|
||||
current = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
# Getting the parent directory name
|
||||
# where the current directory is present.
|
||||
parent = os.path.dirname(current)
|
||||
|
||||
# adding the parent directory to
|
||||
# the sys.path.
|
||||
sys.path.append(parent)
|
||||
|
||||
from src.watched import cleanup_watched, combine_watched_dicts
|
||||
|
||||
tv_shows_watched_list_1 = {
|
||||
frozenset(
|
||||
{
|
||||
("tvdb", "75710"),
|
||||
("title", "Criminal Minds"),
|
||||
("imdb", "tt0452046"),
|
||||
("locations", ("Criminal Minds",)),
|
||||
("tmdb", "4057"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt0550489",
|
||||
"tmdb": "282843",
|
||||
"tvdb": "176357",
|
||||
"locations": (
|
||||
"Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv",
|
||||
),
|
||||
},
|
||||
{
|
||||
"imdb": "tt0550487",
|
||||
"tmdb": "282861",
|
||||
"tvdb": "300385",
|
||||
"locations": ("Criminal Minds S01E02 Compulsion WEBDL-720p.mkv",),
|
||||
},
|
||||
]
|
||||
},
|
||||
frozenset({("title", "Test"), ("locations", ("Test",))}): {
|
||||
"Season 1": [
|
||||
{"locations": ("Test S01E01.mkv",)},
|
||||
{"locations": ("Test S01E02.mkv",)},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
movies_watched_list_1 = [
|
||||
{
|
||||
"imdb": "tt2380307",
|
||||
"tmdb": "354912",
|
||||
"title": "Coco",
|
||||
"locations": ("Coco (2017) Remux-1080p.mkv",),
|
||||
},
|
||||
{
|
||||
"tmdbcollection": "448150",
|
||||
"imdb": "tt1431045",
|
||||
"tmdb": "293660",
|
||||
"title": "Deadpool",
|
||||
"locations": ("Deadpool (2016) Remux-1080p.mkv",),
|
||||
},
|
||||
]
|
||||
|
||||
tv_shows_watched_list_2 = {
|
||||
frozenset(
|
||||
{
|
||||
("tvdb", "75710"),
|
||||
("title", "Criminal Minds"),
|
||||
("imdb", "tt0452046"),
|
||||
("locations", ("Criminal Minds",)),
|
||||
("tmdb", "4057"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt0550487",
|
||||
"tmdb": "282861",
|
||||
"tvdb": "300385",
|
||||
"locations": ("Criminal Minds S01E02 Compulsion WEBDL-720p.mkv",),
|
||||
},
|
||||
{
|
||||
"imdb": "tt0550498",
|
||||
"tmdb": "282865",
|
||||
"tvdb": "300474",
|
||||
"locations": (
|
||||
"Criminal Minds S01E03 Won't Get Fooled Again WEBDL-720p.mkv",
|
||||
),
|
||||
},
|
||||
]
|
||||
},
|
||||
frozenset({("title", "Test"), ("locations", ("Test",))}): {
|
||||
"Season 1": [
|
||||
{"locations": ("Test S01E02.mkv",)},
|
||||
{"locations": ("Test S01E03.mkv",)},
|
||||
]
|
||||
},
|
||||
}
|
||||
|
||||
movies_watched_list_2 = [
|
||||
{
|
||||
"imdb": "tt2380307",
|
||||
"tmdb": "354912",
|
||||
"title": "Coco",
|
||||
"locations": ("Coco (2017) Remux-1080p.mkv",),
|
||||
},
|
||||
{
|
||||
"imdb": "tt0384793",
|
||||
"tmdb": "9788",
|
||||
"tvdb": "9103",
|
||||
"title": "Accepted",
|
||||
"locations": ("Accepted (2006) Remux-1080p.mkv",),
|
||||
},
|
||||
]
|
||||
|
||||
# Test to see if objects get deleted all the way up to the root.
|
||||
tv_shows_2_watched_list_1 = {
|
||||
frozenset(
|
||||
{
|
||||
("tvdb", "75710"),
|
||||
("title", "Criminal Minds"),
|
||||
("imdb", "tt0452046"),
|
||||
("locations", ("Criminal Minds",)),
|
||||
("tmdb", "4057"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt0550489",
|
||||
"tmdb": "282843",
|
||||
"tvdb": "176357",
|
||||
"locations": (
|
||||
"Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv",
|
||||
),
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
expected_tv_show_watched_list_1 = {
|
||||
frozenset(
|
||||
{
|
||||
("tvdb", "75710"),
|
||||
("title", "Criminal Minds"),
|
||||
("imdb", "tt0452046"),
|
||||
("locations", ("Criminal Minds",)),
|
||||
("tmdb", "4057"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt0550489",
|
||||
"tmdb": "282843",
|
||||
"tvdb": "176357",
|
||||
"locations": (
|
||||
"Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv",
|
||||
),
|
||||
}
|
||||
]
|
||||
},
|
||||
frozenset({("title", "Test"), ("locations", ("Test",))}): {
|
||||
"Season 1": [{"locations": ("Test S01E01.mkv",)}]
|
||||
},
|
||||
}
|
||||
|
||||
expected_movie_watched_list_1 = [
|
||||
{
|
||||
"tmdbcollection": "448150",
|
||||
"imdb": "tt1431045",
|
||||
"tmdb": "293660",
|
||||
"title": "Deadpool",
|
||||
"locations": ("Deadpool (2016) Remux-1080p.mkv",),
|
||||
}
|
||||
]
|
||||
|
||||
expected_tv_show_watched_list_2 = {
|
||||
frozenset(
|
||||
{
|
||||
("tvdb", "75710"),
|
||||
("title", "Criminal Minds"),
|
||||
("imdb", "tt0452046"),
|
||||
("locations", ("Criminal Minds",)),
|
||||
("tmdb", "4057"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt0550498",
|
||||
"tmdb": "282865",
|
||||
"tvdb": "300474",
|
||||
"locations": (
|
||||
"Criminal Minds S01E03 Won't Get Fooled Again WEBDL-720p.mkv",
|
||||
),
|
||||
}
|
||||
]
|
||||
},
|
||||
frozenset({("title", "Test"), ("locations", ("Test",))}): {
|
||||
"Season 1": [{"locations": ("Test S01E03.mkv",)}]
|
||||
},
|
||||
}
|
||||
|
||||
expected_movie_watched_list_2 = [
|
||||
{
|
||||
"imdb": "tt0384793",
|
||||
"tmdb": "9788",
|
||||
"tvdb": "9103",
|
||||
"title": "Accepted",
|
||||
"locations": ("Accepted (2006) Remux-1080p.mkv",),
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def test_simple_cleanup_watched():
|
||||
user_watched_list_1 = {
|
||||
"user1": {
|
||||
"TV Shows": tv_shows_watched_list_1,
|
||||
"Movies": movies_watched_list_1,
|
||||
"Other Shows": tv_shows_2_watched_list_1,
|
||||
},
|
||||
}
|
||||
user_watched_list_2 = {
|
||||
"user1": {
|
||||
"TV Shows": tv_shows_watched_list_2,
|
||||
"Movies": movies_watched_list_2,
|
||||
"Other Shows": tv_shows_2_watched_list_1,
|
||||
}
|
||||
}
|
||||
|
||||
expected_watched_list_1 = {
|
||||
"user1": {
|
||||
"TV Shows": expected_tv_show_watched_list_1,
|
||||
"Movies": expected_movie_watched_list_1,
|
||||
}
|
||||
}
|
||||
|
||||
expected_watched_list_2 = {
|
||||
"user1": {
|
||||
"TV Shows": expected_tv_show_watched_list_2,
|
||||
"Movies": expected_movie_watched_list_2,
|
||||
}
|
||||
}
|
||||
|
||||
return_watched_list_1 = cleanup_watched(user_watched_list_1, user_watched_list_2)
|
||||
return_watched_list_2 = cleanup_watched(user_watched_list_2, user_watched_list_1)
|
||||
|
||||
assert return_watched_list_1 == expected_watched_list_1
|
||||
assert return_watched_list_2 == expected_watched_list_2
|
||||
|
||||
|
||||
def test_mapping_cleanup_watched():
|
||||
user_watched_list_1 = {
|
||||
"user1": {
|
||||
"TV Shows": tv_shows_watched_list_1,
|
||||
"Movies": movies_watched_list_1,
|
||||
"Other Shows": tv_shows_2_watched_list_1,
|
||||
},
|
||||
}
|
||||
user_watched_list_2 = {
|
||||
"user2": {
|
||||
"Shows": tv_shows_watched_list_2,
|
||||
"Movies": movies_watched_list_2,
|
||||
"Other Shows": tv_shows_2_watched_list_1,
|
||||
}
|
||||
}
|
||||
|
||||
expected_watched_list_1 = {
|
||||
"user1": {
|
||||
"TV Shows": expected_tv_show_watched_list_1,
|
||||
"Movies": expected_movie_watched_list_1,
|
||||
}
|
||||
}
|
||||
|
||||
expected_watched_list_2 = {
|
||||
"user2": {
|
||||
"Shows": expected_tv_show_watched_list_2,
|
||||
"Movies": expected_movie_watched_list_2,
|
||||
}
|
||||
}
|
||||
|
||||
user_mapping = {"user1": "user2"}
|
||||
library_mapping = {"TV Shows": "Shows"}
|
||||
|
||||
return_watched_list_1 = cleanup_watched(
|
||||
user_watched_list_1,
|
||||
user_watched_list_2,
|
||||
user_mapping=user_mapping,
|
||||
library_mapping=library_mapping,
|
||||
)
|
||||
return_watched_list_2 = cleanup_watched(
|
||||
user_watched_list_2,
|
||||
user_watched_list_1,
|
||||
user_mapping=user_mapping,
|
||||
library_mapping=library_mapping,
|
||||
)
|
||||
|
||||
assert return_watched_list_1 == expected_watched_list_1
|
||||
assert return_watched_list_2 == expected_watched_list_2
|
||||
|
||||
|
||||
def test_combine_watched_dicts():
|
||||
input_watched = [
|
||||
{
|
||||
"test3": {
|
||||
"Anime Movies": [
|
||||
{
|
||||
"title": "Ponyo",
|
||||
"tmdb": "12429",
|
||||
"imdb": "tt0876563",
|
||||
"locations": ("Ponyo (2008) Bluray-1080p.mkv",),
|
||||
},
|
||||
{
|
||||
"title": "Spirited Away",
|
||||
"tmdb": "129",
|
||||
"imdb": "tt0245429",
|
||||
"locations": ("Spirited Away (2001) Bluray-1080p.mkv",),
|
||||
},
|
||||
{
|
||||
"title": "Castle in the Sky",
|
||||
"tmdb": "10515",
|
||||
"imdb": "tt0092067",
|
||||
"locations": ("Castle in the Sky (1986) Bluray-1080p.mkv",),
|
||||
},
|
||||
]
|
||||
}
|
||||
},
|
||||
{"test3": {"Anime Shows": {}}},
|
||||
{"test3": {"Cartoon Shows": {}}},
|
||||
{
|
||||
"test3": {
|
||||
"Shows": {
|
||||
frozenset(
|
||||
{
|
||||
("tmdb", "64464"),
|
||||
("tvdb", "301824"),
|
||||
("tvrage", "45210"),
|
||||
("title", "11.22.63"),
|
||||
("locations", ("11.22.63",)),
|
||||
("imdb", "tt2879552"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt4460418",
|
||||
"title": "The Rabbit Hole",
|
||||
"locations": (
|
||||
"11.22.63 S01E01 The Rabbit Hole Bluray-1080p.mkv",
|
||||
),
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{"test3": {"Subbed Anime": {}}},
|
||||
]
|
||||
expected = {
|
||||
"test3": {
|
||||
"Anime Movies": [
|
||||
{
|
||||
"title": "Ponyo",
|
||||
"tmdb": "12429",
|
||||
"imdb": "tt0876563",
|
||||
"locations": ("Ponyo (2008) Bluray-1080p.mkv",),
|
||||
},
|
||||
{
|
||||
"title": "Spirited Away",
|
||||
"tmdb": "129",
|
||||
"imdb": "tt0245429",
|
||||
"locations": ("Spirited Away (2001) Bluray-1080p.mkv",),
|
||||
},
|
||||
{
|
||||
"title": "Castle in the Sky",
|
||||
"tmdb": "10515",
|
||||
"imdb": "tt0092067",
|
||||
"locations": ("Castle in the Sky (1986) Bluray-1080p.mkv",),
|
||||
},
|
||||
],
|
||||
"Anime Shows": {},
|
||||
"Cartoon Shows": {},
|
||||
"Shows": {
|
||||
frozenset(
|
||||
{
|
||||
("tmdb", "64464"),
|
||||
("tvdb", "301824"),
|
||||
("tvrage", "45210"),
|
||||
("title", "11.22.63"),
|
||||
("locations", ("11.22.63",)),
|
||||
("imdb", "tt2879552"),
|
||||
}
|
||||
): {
|
||||
"Season 1": [
|
||||
{
|
||||
"imdb": "tt4460418",
|
||||
"title": "The Rabbit Hole",
|
||||
"locations": (
|
||||
"11.22.63 S01E01 The Rabbit Hole Bluray-1080p.mkv",
|
||||
),
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"Subbed Anime": {},
|
||||
}
|
||||
}
|
||||
|
||||
assert combine_watched_dicts(input_watched) == expected
|
||||
Loading…
Reference in New Issue