Merge pull request #224 from luigi311/pydantic

Use Pydantic for watch structure
pull/227/head
Luigi311 2025-02-19 13:20:45 -07:00 committed by GitHub
commit 196a49fca4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1341 additions and 1673 deletions

Binary file not shown.

View File

@ -66,21 +66,6 @@ def str_to_bool(value: str) -> bool:
return str(value).lower() in ("y", "yes", "t", "true", "on", "1")
# Search for nested element in list
def contains_nested(element: str, lst: list[tuple[str] | None] | tuple[str] | None):
if lst is None:
return None
for i, item in enumerate(lst):
if item is None:
continue
if element in item:
return i
elif element == item:
return i
return None
# Get mapped value
def search_mapping(dictionary: dict[str, str], key_value: str) -> str | None:
if key_value in dictionary.keys():

View File

@ -10,11 +10,18 @@ from packaging.version import parse, Version
from src.functions import (
logger,
search_mapping,
contains_nested,
log_marked,
str_to_bool,
)
from src.library import generate_library_guids_dict
from src.watched import (
LibraryData,
MediaIdentifiers,
MediaItem,
WatchedStatus,
Series,
UserData,
check_same_identifiers,
)
load_dotenv(override=True)
@ -22,52 +29,51 @@ generate_guids = str_to_bool(os.getenv("GENERATE_GUIDS", "True"))
generate_locations = str_to_bool(os.getenv("GENERATE_LOCATIONS", "True"))
def get_video_status(server_video, videos_ids, videos):
video_status = None
if generate_locations:
if "MediaSources" in server_video:
for video_location in server_video["MediaSources"]:
if "Path" in video_location:
if (
contains_nested(
video_location["Path"].split("/")[-1],
videos_ids["locations"],
)
is not None
):
for video in videos:
if (
contains_nested(
video_location["Path"].split("/")[-1],
video["locations"],
)
is not None
):
video_status = video["status"]
break
break
def extract_identifiers_from_item(server_type, item: dict) -> MediaIdentifiers:
title = item.get("Name", None)
id = None
if not title:
id = item.get("Id")
logger(f"{server_type}: Name not found in {id}", 1)
guids = {}
if generate_guids:
if not video_status:
for (
video_provider_source,
video_provider_id,
) in server_video["ProviderIds"].items():
if video_provider_source.lower() in videos_ids:
if (
video_provider_id.lower()
in videos_ids[video_provider_source.lower()]
):
for video in videos:
if video_provider_id.lower() in video.get(
video_provider_source.lower(), []
):
video_status = video["status"]
break
break
guids = {k.lower(): v for k, v in item["ProviderIds"].items()}
if not guids:
logger(
f"{server_type}: {title if title else id} has no guids",
1,
)
return video_status
locations = tuple()
if generate_locations:
if "Path" in item:
locations = tuple([item.get("Path").split("/")[-1]])
elif "MediaSources" in item:
locations = tuple(
[x["Path"].split("/")[-1] for x in item["MediaSources"] if "Path" in x]
)
if not locations:
logger(f"{server_type}: {title if title else id} has no locations", 1)
return MediaIdentifiers(
title=title,
locations=locations,
imdb_id=guids.get("imdb", None),
tvdb_id=guids.get("tvdb", None),
tmdb_id=guids.get("tmdb", None),
)
def get_mediaitem(server_type, item: dict) -> MediaItem:
return MediaItem(
identifiers=extract_identifiers_from_item(server_type, item),
status=WatchedStatus(
completed=item["UserData"]["Played"],
time=floor(item["UserData"]["PlaybackPositionTicks"] / 10000),
),
)
class JellyfinEmby:
@ -194,44 +200,6 @@ class JellyfinEmby:
logger(f"{self.server_type}: Get users failed {e}", 2)
raise Exception(e)
def get_guids(self, item: dict):
guids: dict[str, str | tuple[str] | dict[str, bool | int]] = {}
if item.get("Name"):
guids["title"] = item.get("Name")
else:
logger(f"{self.server_type}: Name not found in {item.get('Id')}", 1)
guids["title"] = None
if "ProviderIds" in item:
guids.update({k.lower(): v for k, v in item["ProviderIds"].items()})
else:
logger(
f"{self.server_type}: ProviderIds not found in {item.get('Name')}", 1
)
if "MediaSources" in item:
guids["locations"] = tuple(
[x["Path"].split("/")[-1] for x in item["MediaSources"] if "Path" in x]
)
else:
logger(
f"{self.server_type}: MediaSources not found in {item.get('Name')}", 1
)
guids["locations"] = tuple()
if "UserData" in item:
guids["status"] = {
"completed": item["UserData"]["Played"],
# Convert ticks to milliseconds to match Plex
"time": floor(item["UserData"]["PlaybackPositionTicks"] / 10000),
}
else:
logger(f"{self.server_type}: UserData not found in {item.get('Name')}", 1)
guids["status"] = {}
return guids
def get_libraries(self) -> dict[str, str]:
try:
libraries = {}
@ -276,32 +244,30 @@ class JellyfinEmby:
def get_user_library_watched(
self, user_name, user_id, library_type, library_id, library_title
):
try:
) -> LibraryData:
user_name = user_name.lower()
user_watched = {}
try:
logger(
f"{self.server_type}: Generating watched for {user_name} in library {library_title}",
0,
)
watched = LibraryData(title=library_title)
# Movies
if library_type == "Movie":
user_watched[library_title] = []
watched = self.query(
watched_items = self.query(
f"/Users/{user_id}/Items"
+ f"?ParentId={library_id}&Filters=IsPlayed&IncludeItemTypes=Movie&Recursive=True&Fields=ItemCounts,ProviderIds,MediaSources",
"get",
).get("Items", [])
in_progress = self.query(
in_progress_items = self.query(
f"/Users/{user_id}/Items"
+ f"?ParentId={library_id}&Filters=IsResumable&IncludeItemTypes=Movie&Recursive=True&Fields=ItemCounts,ProviderIds,MediaSources",
"get",
).get("Items", [])
for movie in watched + in_progress:
for movie in watched_items + in_progress_items:
# Skip if theres no user data which means the movie has not been watched
if "UserData" not in movie:
continue
@ -315,26 +281,10 @@ class JellyfinEmby:
movie["UserData"]["Played"] == True
or movie["UserData"]["PlaybackPositionTicks"] > 600000000
):
logger(
f"{self.server_type}: Adding {movie.get('Name')} to {user_name} watched list",
3,
)
# Get the movie's GUIDs
movie_guids = self.get_guids(movie)
# Append the movie dictionary to the list for the given user and library
user_watched[library_title].append(movie_guids)
logger(
f"{self.server_type}: Added {movie_guids} to {user_name} watched list",
3,
)
watched.movies.append(get_mediaitem(self.server_type, movie))
# TV Shows
if library_type in ["Series", "Episode"]:
# Initialize an empty dictionary for the given user and library
user_watched[library_title] = {}
# Retrieve a list of watched TV shows
watched_shows = self.query(
f"/Users/{user_id}/Items"
@ -354,20 +304,13 @@ class JellyfinEmby:
# Retrieve the watched/partially watched list of episodes of each watched show
for show in watched_shows_filtered:
logger(
f"{self.server_type}: Adding {show.get('Name')} to {user_name} watched list",
3,
)
show_guids = {k.lower(): v for k, v in show["ProviderIds"].items()}
show_guids["title"] = show["Name"]
show_guids["locations"] = (
show_locations = (
tuple([show["Path"].split("/")[-1]])
if "Path" in show
else tuple()
)
show_guids = frozenset(show_guids.items())
show_episodes = self.query(
f"/Shows/{show['Id']}/Episodes"
+ f"?userId={user_id}&isPlaceHolder=false&Fields=ProviderIds,MediaSources",
@ -376,7 +319,7 @@ class JellyfinEmby:
# Iterate through the episodes
# Create a list to store the episodes
mark_episodes_list = []
episode_mediaitem = []
for episode in show_episodes:
if "UserData" not in episode:
continue
@ -392,29 +335,30 @@ class JellyfinEmby:
episode["UserData"]["Played"] == True
or episode["UserData"]["PlaybackPositionTicks"] > 600000000
):
episode_guids = self.get_guids(episode)
mark_episodes_list.append(episode_guids)
episode_mediaitem.append(
get_mediaitem(self.server_type, episode)
)
if mark_episodes_list:
# Add the show dictionary to the user's watched list
if show_guids not in user_watched[library_title]:
user_watched[library_title][show_guids] = []
user_watched[library_title][show_guids] = mark_episodes_list
for episode in mark_episodes_list:
logger(
f"{self.server_type}: Added {episode} to {user_name} watched list",
3,
if episode_mediaitem:
watched.series.append(
Series(
identifiers=MediaIdentifiers(
title=show.get("Name"),
locations=show_locations,
imdb_id=show_guids.get("imdb", None),
tvdb_id=show_guids.get("tvdb", None),
tmdb_id=show_guids.get("tmdb", None),
),
episodes=episode_mediaitem,
)
)
logger(
f"{self.server_type}: Got watched for {user_name} in library {library_title}",
f"{self.server_type}: Finished getting watched for {user_name} in library {library_title}",
1,
)
if library_title in user_watched:
logger(f"{self.server_type}: {user_watched[library_title]}", 3)
return user_watched
return watched
except Exception as e:
logger(
f"{self.server_type}: Failed to get watched for {user_name} in library {library_title}, Error: {e}",
@ -426,10 +370,9 @@ class JellyfinEmby:
def get_watched(
self, users: dict[str, str], sync_libraries: list[str]
):
) -> dict[str, UserData]:
try:
users_watched = {}
watched = []
users_watched: dict[str, UserData] = {}
for user_name, user_id in users.items():
libraries = []
@ -473,7 +416,7 @@ class JellyfinEmby:
for library_type in types:
# Get watched for user
watched = self.get_user_library_watched(
library_data = self.get_user_library_watched(
user_name,
user_id,
library_type,
@ -482,8 +425,11 @@ class JellyfinEmby:
)
if user_name.lower() not in users_watched:
users_watched[user_name.lower()] = {}
users_watched[user_name.lower()].update(watched)
users_watched[user_name.lower()] = UserData()
users_watched[user_name.lower()].libraries[
library_title
] = library_data
return users_watched
except Exception as e:
@ -491,37 +437,27 @@ class JellyfinEmby:
raise Exception(e)
def update_user_watched(
self, user_name, user_id, library, library_id, videos, update_partial, dryrun
self,
user_name: str,
user_id: str,
library_data: LibraryData,
library_name: str,
library_id: str,
update_partial: bool,
dryrun: bool,
):
try:
logger(
f"{self.server_type}: Updating watched for {user_name} in library {library}",
1,
)
(
videos_shows_ids,
videos_episodes_ids,
videos_movies_ids,
) = generate_library_guids_dict(videos)
if (
not videos_movies_ids
and not videos_shows_ids
and not videos_episodes_ids
):
logger(
f"{self.server_type}: No videos to mark as watched for {user_name} in library {library}",
1,
)
# If there are no movies or shows to update, exit early.
if not library_data.series and not library_data.movies:
return
logger(
f"{self.server_type}: mark list\nShows: {videos_shows_ids}\nEpisodes: {videos_episodes_ids}\nMovies: {videos_movies_ids}",
f"{self.server_type}: Updating watched for {user_name} in library {library_name}",
1,
)
if videos_movies_ids:
# Update movies.
if library_data.movies:
jellyfin_search = self.query(
f"/Users/{user_id}/Items"
+ f"?SortBy=SortName&SortOrder=Ascending&Recursive=True&ParentId={library_id}"
@ -529,14 +465,17 @@ class JellyfinEmby:
"get",
)
for jellyfin_video in jellyfin_search["Items"]:
movie_status = get_video_status(
jellyfin_video, videos_movies_ids, videos
jelly_identifiers = extract_identifiers_from_item(
self.server_type, jellyfin_video
)
if movie_status:
# Check each stored movie for a match.
for stored_movie in library_data.movies:
if check_same_identifiers(
jelly_identifiers, stored_movie.identifiers
):
jellyfin_video_id = jellyfin_video["Id"]
if movie_status["completed"]:
msg = f"{self.server_type}: {jellyfin_video.get('Name')} as watched for {user_name} in {library}"
if stored_movie.status.completed:
msg = f"{self.server_type}: {jellyfin_video.get('Name')} as watched for {user_name} in {library_name}"
if not dryrun:
logger(msg, 5)
self.query(
@ -550,16 +489,16 @@ class JellyfinEmby:
self.server_type,
self.server_name,
user_name,
library,
library_name,
jellyfin_video.get("Name"),
)
elif update_partial:
msg = f"{self.server_type}: {jellyfin_video.get('Name')} as partially watched for {floor(movie_status['time'] / 60_000)} minutes for {user_name} in {library}"
msg = f"{self.server_type}: {jellyfin_video.get('Name')} as partially watched for {floor(stored_movie.status.time / 60_000)} minutes for {user_name} in {library_name}"
if not dryrun:
logger(msg, 5)
playback_position_payload = {
"PlaybackPositionTicks": movie_status["time"]
"PlaybackPositionTicks": stored_movie.status.time
* 10_000,
}
self.query(
@ -574,9 +513,9 @@ class JellyfinEmby:
self.server_type,
self.server_name,
user_name,
library,
library_name,
jellyfin_video.get("Name"),
duration=floor(movie_status["time"] / 60_000),
duration=floor(stored_movie.status.time / 60_000),
)
else:
logger(
@ -584,8 +523,8 @@ class JellyfinEmby:
3,
)
# TV Shows
if videos_shows_ids and videos_episodes_ids:
# Update TV Shows (series/episodes).
if library_data.series:
jellyfin_search = self.query(
f"/Users/{user_id}/Items"
+ f"?SortBy=SortName&SortOrder=Ascending&Recursive=True&ParentId={library_id}"
@ -595,61 +534,20 @@ class JellyfinEmby:
jellyfin_shows = [x for x in jellyfin_search["Items"]]
for jellyfin_show in jellyfin_shows:
show_found = False
episode_videos = []
if generate_locations:
if "Path" in jellyfin_show:
if (
contains_nested(
jellyfin_show["Path"].split("/")[-1],
videos_shows_ids["locations"],
jellyfin_show_identifiers = extract_identifiers_from_item(
self.server_type, jellyfin_show
)
is not None
# Try to find a matching series in your stored library.
for stored_series in library_data.series:
if check_same_identifiers(
jellyfin_show_identifiers, stored_series.identifiers
):
show_found = True
for shows, episodes in videos.items():
show = {k: v for k, v in shows}
if (
contains_nested(
jellyfin_show["Path"].split("/")[-1],
show["locations"],
)
is not None
):
for episode in episodes:
episode_videos.append(episode)
break
if generate_guids:
if not show_found:
for show_provider_source, show_provider_id in jellyfin_show[
"ProviderIds"
].items():
if show_provider_source.lower() in videos_shows_ids:
if (
show_provider_id.lower()
in videos_shows_ids[
show_provider_source.lower()
]
):
show_found = True
for show, episodes in videos.items():
show = {k: v for k, v in show}
if show_provider_id.lower() in show.get(
show_provider_source.lower(), []
):
for episode in episodes:
episode_videos.append(episode)
break
if show_found:
logger(
f"{self.server_type}: Updating watched for {user_name} in library {library} for show {jellyfin_show.get('Name')}",
f"Found matching show for '{jellyfin_show.get('Name')}'",
1,
)
# Now update episodes.
# Get the list of Plex episodes for this show.
jellyfin_show_id = jellyfin_show["Id"]
jellyfin_episodes = self.query(
f"/Shows/{jellyfin_show_id}/Episodes"
@ -658,16 +556,21 @@ class JellyfinEmby:
)
for jellyfin_episode in jellyfin_episodes["Items"]:
episode_status = get_video_status(
jellyfin_episode, videos_episodes_ids, episode_videos
jellyfin_episode_identifiers = (
extract_identifiers_from_item(
self.server_type, jellyfin_episode
)
if episode_status:
)
for stored_ep in stored_series.episodes:
if check_same_identifiers(
jellyfin_episode_identifiers,
stored_ep.identifiers,
):
jellyfin_episode_id = jellyfin_episode["Id"]
if episode_status["completed"]:
if stored_ep.status.completed:
msg = (
f"{self.server_type}: {jellyfin_episode['SeriesName']} {jellyfin_episode['SeasonName']} Episode {jellyfin_episode.get('IndexNumber')} {jellyfin_episode.get('Name')}"
+ f" as watched for {user_name} in {library}"
+ f" as watched for {user_name} in {library_name}"
)
if not dryrun:
logger(msg, 5)
@ -682,22 +585,20 @@ class JellyfinEmby:
self.server_type,
self.server_name,
user_name,
library,
library_name,
jellyfin_episode.get("SeriesName"),
jellyfin_episode.get("Name"),
)
elif update_partial:
msg = (
f"{self.server_type}: {jellyfin_episode['SeriesName']} {jellyfin_episode['SeasonName']} Episode {jellyfin_episode.get('IndexNumber')} {jellyfin_episode.get('Name')}"
+ f" as partially watched for {floor(episode_status['time'] / 60_000)} minutes for {user_name} in {library}"
+ f" as partially watched for {floor(stored_ep.status.time / 60_000)} minutes for {user_name} in {library_name}"
)
if not dryrun:
logger(msg, 5)
playback_position_payload = {
"PlaybackPositionTicks": episode_status[
"time"
]
"PlaybackPositionTicks": stored_ep.status.time
* 10_000,
}
self.query(
@ -712,10 +613,12 @@ class JellyfinEmby:
self.server_type,
self.server_name,
user_name,
library,
library_name,
jellyfin_episode.get("SeriesName"),
jellyfin_episode.get("Name"),
duration=floor(episode_status["time"] / 60_000),
duration=floor(
stored_ep.status.time / 60_000
),
)
else:
logger(
@ -730,14 +633,18 @@ class JellyfinEmby:
except Exception as e:
logger(
f"{self.server_type}: Error updating watched for {user_name} in library {library}, {e}",
f"{self.server_type}: Error updating watched for {user_name} in library {library_name}, {e}",
2,
)
logger(traceback.format_exc(), 2)
raise Exception(e)
def update_watched(
self, watched_list, user_mapping=None, library_mapping=None, dryrun=False
self,
watched_list: dict[str, UserData],
user_mapping=None,
library_mapping=None,
dryrun=False,
):
try:
server_version = self.info(version_only=True)
@ -749,8 +656,7 @@ class JellyfinEmby:
2,
)
for user, libraries in watched_list.items():
logger(f"{self.server_type}: Updating for entry {user}, {libraries}", 1)
for user, user_data in watched_list.items():
user_other = None
user_name = None
if user_mapping:
@ -780,15 +686,20 @@ class JellyfinEmby:
)
jellyfin_libraries = [x for x in jellyfin_libraries["Items"]]
for library, videos in libraries.items():
for library_name in user_data.libraries:
if library_name == "Custom TV Shows":
print("test")
library_data = user_data.libraries[library_name]
library_other = None
if library_mapping:
if library in library_mapping.keys():
library_other = library_mapping[library]
elif library in library_mapping.values():
library_other = search_mapping(library_mapping, library)
if library_name in library_mapping.keys():
library_other = library_mapping[library_name]
elif library_name in library_mapping.values():
library_other = search_mapping(
library_mapping, library_name
)
if library.lower() not in [
if library_name.lower() not in [
x["Name"].lower() for x in jellyfin_libraries
]:
if library_other:
@ -796,26 +707,26 @@ class JellyfinEmby:
x["Name"].lower() for x in jellyfin_libraries
]:
logger(
f"{self.server_type}: Library {library} not found, but {library_other} found, using {library_other}",
f"{self.server_type}: Library {library_name} not found, but {library_other} found, using {library_other}",
1,
)
library = library_other
library_name = library_other
else:
logger(
f"{self.server_type}: Library {library} or {library_other} not found in library list",
f"{self.server_type}: Library {library_name} or {library_other} not found in library list",
1,
)
continue
else:
logger(
f"{self.server_type}: Library {library} not found in library list",
f"{self.server_type}: Library {library_name} not found in library list",
1,
)
continue
library_id = None
for jellyfin_library in jellyfin_libraries:
if jellyfin_library["Name"] == library:
if jellyfin_library["Name"] == library_name:
library_id = jellyfin_library["Id"]
continue
@ -823,9 +734,9 @@ class JellyfinEmby:
self.update_user_watched(
user_name,
user_id,
library,
library_data,
library_name,
library_id,
videos,
update_partial,
dryrun,
)

View File

@ -4,6 +4,7 @@ from src.functions import (
search_mapping,
)
def check_skip_logic(
library_title: str,
library_type: str,
@ -198,161 +199,3 @@ def setup_libraries(
)
return output_server_1_libaries, output_server_2_libaries
def show_title_dict(user_list) -> dict[str, list[tuple[str] | None]]:
try:
if not isinstance(user_list, dict):
return {}
show_output_dict: dict[str, list[tuple[str] | None]] = {}
show_output_dict["locations"] = []
show_counter = 0 # Initialize a counter for the current show position
show_output_keys = [dict(x) for x in list(user_list.keys())]
for show_key in show_output_keys:
for provider_key, provider_value in show_key.items():
# Skip title
if provider_key.lower() == "title":
continue
if provider_key.lower() not in show_output_dict:
show_output_dict[provider_key.lower()] = [None] * show_counter
if provider_key.lower() == "locations":
show_output_dict[provider_key.lower()].append(provider_value)
else:
show_output_dict[provider_key.lower()].append(
provider_value.lower()
)
show_counter += 1
for key in show_output_dict:
if len(show_output_dict[key]) < show_counter:
show_output_dict[key].append(None)
return show_output_dict
except Exception:
return {}
def episode_title_dict(
user_list,
) -> dict[
str, list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None]
]:
try:
if not isinstance(user_list, dict):
return {}
episode_output_dict: dict[
str,
list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None],
] = {}
episode_output_dict["completed"] = []
episode_output_dict["time"] = []
episode_output_dict["locations"] = []
episode_output_dict["show"] = []
episode_counter = 0 # Initialize a counter for the current episode position
# Iterate through the shows and episodes in user_list
for show in user_list:
for episode in user_list[show]:
# Add the show title to the episode_output_dict if it doesn't exist
if "show" not in episode_output_dict:
episode_output_dict["show"] = [None] * episode_counter
# Add the show title to the episode_output_dict
episode_output_dict["show"].append(dict(show))
# Iterate through the keys and values in each episode
for episode_key, episode_value in episode.items():
# If the key is not "status", add the key to episode_output_dict if it doesn't exist
if episode_key != "status":
if episode_key.lower() not in episode_output_dict:
# Initialize the list with None values up to the current episode position
episode_output_dict[episode_key.lower()] = [
None
] * episode_counter
# If the key is "locations", append each location to the list
if episode_key == "locations":
episode_output_dict[episode_key.lower()].append(episode_value)
# If the key is "status", append the "completed" and "time" values
elif episode_key == "status":
episode_output_dict["completed"].append(
episode_value["completed"]
)
episode_output_dict["time"].append(episode_value["time"])
# For other keys, append the value to the list
else:
episode_output_dict[episode_key.lower()].append(
episode_value.lower()
)
# Increment the episode_counter
episode_counter += 1
# Extend the lists in episode_output_dict with None values to match the current episode_counter
for key in episode_output_dict:
if len(episode_output_dict[key]) < episode_counter:
episode_output_dict[key].append(None)
return episode_output_dict
except Exception:
return {}
def movies_title_dict(
user_list,
) -> dict[str, list[str | bool | int | tuple[str] | None]]:
try:
if not isinstance(user_list, list):
return {}
movies_output_dict: dict[str, list[str | bool | int | tuple[str] | None]] = {
"completed": [],
"time": [],
"locations": [],
}
movie_counter = 0 # Initialize a counter for the current movie position
for movie in user_list:
for movie_key, movie_value in movie.items():
if movie_key != "status":
if movie_key.lower() not in movies_output_dict:
movies_output_dict[movie_key.lower()] = []
if movie_key == "locations":
movies_output_dict[movie_key.lower()].append(movie_value)
elif movie_key == "status":
movies_output_dict["completed"].append(movie_value["completed"])
movies_output_dict["time"].append(movie_value["time"])
else:
movies_output_dict[movie_key.lower()].append(movie_value.lower())
movie_counter += 1
for key in movies_output_dict:
if len(movies_output_dict[key]) < movie_counter:
movies_output_dict[key].append(None)
return movies_output_dict
except Exception:
return {}
def generate_library_guids_dict(user_list) -> tuple[
dict[str, list[tuple[str] | None]],
dict[str, list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None]],
dict[str, list[str | bool | int | tuple[str] | None]],
]:
# Handle the case where user_list is empty or does not contain the expected keys and values
if not user_list:
return {}, {}, {}
show_output_dict = show_title_dict(user_list)
episode_output_dict = episode_title_dict(user_list)
movies_output_dict = movies_title_dict(user_list)
return show_output_dict, episode_output_dict, movies_output_dict

View File

@ -1,6 +1,5 @@
import os, requests, traceback
import os, requests
from dotenv import load_dotenv
from typing import Dict, Union, FrozenSet
from urllib3.poolmanager import PoolManager
from math import floor
@ -14,13 +13,18 @@ from plexapi.myplex import MyPlexAccount
from src.functions import (
logger,
search_mapping,
future_thread_executor,
contains_nested,
log_marked,
str_to_bool,
)
from src.library import generate_library_guids_dict
from src.watched import (
LibraryData,
MediaIdentifiers,
MediaItem,
WatchedStatus,
Series,
UserData,
check_same_identifiers,
)
load_dotenv(override=True)
@ -40,287 +44,72 @@ class HostNameIgnoringAdapter(RequestsHTTPAdapter):
)
def extract_guids_from_item(item: Union[Movie, Show, Episode]) -> Dict[str, str]:
def extract_guids_from_item(item: Movie | Show | Episode) -> dict[str, str]:
# If GENERATE_GUIDS is set to False, then return an empty dict
if not generate_guids:
return {}
guids: Dict[str, str] = dict(
guids: dict[str, str] = dict(
guid.id.split("://")
for guid in item.guids
if guid.id is not None and len(guid.id.strip()) > 0
)
if len(guids) == 0:
logger(
f"Plex: Failed to get any guids for {item.title}",
1,
)
return guids
def get_guids(item: Union[Movie, Episode], completed=True):
if not item.locations:
logger(
f"Plex: {item.title} has no locations",
1,
)
def extract_identifiers_from_item(item: Movie | Show | Episode) -> MediaIdentifiers:
guids = extract_guids_from_item(item)
if not item.guids:
logger(
f"Plex: {item.title} has no guids",
1,
)
return {
"title": item.title,
"locations": (
return MediaIdentifiers(
title=item.title,
locations=(
tuple([location.split("/")[-1] for location in item.locations])
if generate_locations
else tuple()
),
"status": {
"completed": completed,
"time": item.viewOffset,
},
} | extract_guids_from_item(
item
) # Merge the metadata and guid dictionaries
imdb_id=guids.get("imdb", None),
tvdb_id=guids.get("tvdb", None),
tmdb_id=guids.get("tmdb", None),
)
def get_user_library_watched_show(show, process_episodes, threads=None):
def get_mediaitem(item: Movie | Episode, completed=True) -> MediaItem:
return MediaItem(
identifiers=extract_identifiers_from_item(item),
status=WatchedStatus(completed=completed, time=item.viewOffset),
)
def update_user_watched(
user: MyPlexAccount,
user_plex: PlexServer,
library_data: LibraryData,
library_name: str,
dryrun: bool,
):
try:
show_guids: FrozenSet = frozenset(
(
{
"title": show.title,
"locations": (
tuple([location.split("/")[-1] for location in show.locations])
if generate_locations
else tuple()
),
}
| extract_guids_from_item(show)
).items() # Merge the metadata and guid dictionaries
)
episode_guids_args = []
for episode in process_episodes:
episode_guids_args.append([get_guids, episode, episode.isWatched])
episode_guids_results = future_thread_executor(
episode_guids_args, threads=threads
)
episode_guids = []
for index, episode in enumerate(process_episodes):
episode_guids.append(episode_guids_results[index])
return show_guids, episode_guids
except Exception:
return {}, {}
def get_user_library_watched(user, user_plex, library):
user_name: str = user.username.lower() if user.username else user.title.lower()
try:
logger(
f"Plex: Generating watched for {user_name} in library {library.title}",
0,
)
library_videos = user_plex.library.section(library.title)
if library.type == "movie":
watched = []
args = [
[get_guids, video, video.isWatched]
for video in library_videos.search(unwatched=False)
+ library_videos.search(inProgress=True)
if video.isWatched or video.viewOffset >= 60000
]
for guid in future_thread_executor(args, threads=len(args)):
logger(f"Plex: Adding {guid['title']} to {user_name} watched list", 3)
watched.append(guid)
elif library.type == "show":
watched = {}
# Get all watched shows and partially watched shows
parallel_show_task = []
parallel_episodes_task = []
for show in library_videos.search(unwatched=False) + library_videos.search(
inProgress=True
):
process_episodes = []
for episode in show.episodes():
if episode.isWatched or episode.viewOffset >= 60000:
process_episodes.append(episode)
# Shows with more than 24 episodes has its episodes processed in parallel
# Shows with less than 24 episodes has its episodes processed in serial but the shows are processed in parallel
if len(process_episodes) >= 24:
parallel_episodes_task.append(
[
get_user_library_watched_show,
show,
process_episodes,
len(process_episodes),
]
)
else:
parallel_show_task.append(
[get_user_library_watched_show, show, process_episodes, 1]
)
for show_guids, episode_guids in future_thread_executor(
parallel_show_task, threads=len(parallel_show_task)
) + future_thread_executor(parallel_episodes_task, threads=1):
if show_guids and episode_guids:
watched[show_guids] = episode_guids
logger(
f"Plex: Added {episode_guids} to {user_name} watched list",
3,
)
else:
watched = None
logger(f"Plex: Got watched for {user_name} in library {library.title}", 1)
logger(f"Plex: {watched}", 3)
return {user_name: {library.title: watched} if watched is not None else {}}
except Exception as e:
logger(
f"Plex: Failed to get watched for {user_name} in library {library.title}, Error: {e}",
2,
)
return {}
def find_video(plex_search, video_ids, videos=None):
try:
if not generate_guids and not generate_locations:
return None
if generate_locations:
for location in plex_search.locations:
if (
contains_nested(location.split("/")[-1], video_ids["locations"])
is not None
):
episode_videos = []
if videos:
for show, episodes in videos.items():
show = {k: v for k, v in show}
if (
contains_nested(
location.split("/")[-1], show["locations"]
)
is not None
):
for episode in episodes:
episode_videos.append(episode)
return episode_videos
if generate_guids:
for guid in plex_search.guids:
guid_source, guid_id = guid.id.split("://")
# If show provider source and show provider id are in videos_shows_ids exactly, then the show is in the list
if guid_source in video_ids.keys():
if guid_id in video_ids[guid_source]:
episode_videos = []
if videos:
for show, episodes in videos.items():
show = {k: v for k, v in show}
if guid_source in show.keys():
if guid_id == show[guid_source]:
for episode in episodes:
episode_videos.append(episode)
return episode_videos
return None
except Exception:
return None
def get_video_status(plex_search, video_ids, videos):
try:
if not generate_guids and not generate_locations:
return None
if generate_locations:
for location in plex_search.locations:
if (
contains_nested(location.split("/")[-1], video_ids["locations"])
is not None
):
for video in videos:
if (
contains_nested(location.split("/")[-1], video["locations"])
is not None
):
return video["status"]
if generate_guids:
for guid in plex_search.guids:
guid_source, guid_id = guid.id.split("://")
# If show provider source and show provider id are in videos_shows_ids exactly, then the show is in the list
if guid_source in video_ids.keys():
if guid_id in video_ids[guid_source]:
for video in videos:
if guid_source in video.keys():
if guid_id == video[guid_source]:
return video["status"]
return None
except Exception:
return None
def update_user_watched(user, user_plex, library, watched_videos, dryrun):
try:
logger(f"Plex: Updating watched for {user.title} in library {library}", 1)
(
watched_shows_ids,
watched_episodes_ids,
watched_movies_ids,
) = generate_library_guids_dict(watched_videos)
if (
not watched_movies_ids
and not watched_shows_ids
and not watched_episodes_ids
):
logger(
f"Jellyfin: No videos to mark as watched for {user.title} in library {library}",
1,
)
# If there are no movies or shows to update, exit early.
if not library_data.series and not library_data.movies:
return
logger(
f"Plex: mark list\nShows: {watched_shows_ids}\nEpisodes: {watched_episodes_ids}\nMovies: {watched_movies_ids}",
1,
)
logger(f"Plex: Updating watched for {user.title} in library {library_name}", 1)
library_section = user_plex.library.section(library_name)
library_videos = user_plex.library.section(library)
if watched_movies_ids:
for plex_movie in library_videos.search(unwatched=True):
watched_movie_status = get_video_status(
plex_movie, watched_movies_ids, watched_videos
)
if watched_movie_status:
if watched_movie_status["completed"]:
msg = f"Plex: {plex_movie.title} as watched for {user.title} in {library}"
# Update movies.
if library_data.movies:
# Search for Plex movies that are currently marked as unwatched.
for plex_movie in library_section.search(unwatched=True):
plex_identifiers = extract_identifiers_from_item(plex_movie)
# Check each stored movie for a match.
for stored_movie in library_data.movies:
if check_same_identifiers(
plex_identifiers, stored_movie.identifiers
):
# If the stored movie is marked as watched (or has enough progress),
# update the Plex movie accordingly.
if stored_movie.status.completed:
msg = f"Plex: {plex_movie.title} as watched for {user.title} in {library_name}"
if not dryrun:
logger(msg, 5)
plex_movie.markWatched()
@ -331,16 +120,16 @@ def update_user_watched(user, user_plex, library, watched_videos, dryrun):
"Plex",
user_plex.friendlyName,
user.title,
library,
library_name,
plex_movie.title,
None,
None,
)
elif watched_movie_status["time"] > 60_000:
msg = f"Plex: {plex_movie.title} as partially watched for {floor(watched_movie_status['time'] / 60_000)} minutes for {user.title} in {library}"
else:
msg = f"Plex: {plex_movie.title} as partially watched for {floor(stored_movie.status.time / 60_000)} minutes for {user.title} in {library_name}"
if not dryrun:
logger(msg, 5)
plex_movie.updateTimeline(watched_movie_status["time"])
plex_movie.updateTimeline(stored_movie.status.time)
else:
logger(msg, 6)
@ -348,31 +137,39 @@ def update_user_watched(user, user_plex, library, watched_videos, dryrun):
"Plex",
user_plex.friendlyName,
user.title,
library,
library_name,
plex_movie.title,
duration=watched_movie_status["time"],
)
else:
logger(
f"Plex: Skipping movie {plex_movie.title} as it is not in mark list for {user.title}",
1,
duration=stored_movie.status.time,
)
# Once matched, no need to check further.
break
if watched_shows_ids and watched_episodes_ids:
for plex_show in library_videos.search(unwatched=True):
watched_show_episodes_status = find_video(
plex_show, watched_shows_ids, watched_videos
# Update TV Shows (series/episodes).
if library_data.series:
# For each Plex show in the library section:
plex_shows = library_section.search(unwatched=True)
for plex_show in plex_shows:
# Extract identifiers from the Plex show.
plex_show_identifiers = extract_identifiers_from_item(plex_show)
# Try to find a matching series in your stored library.
for stored_series in library_data.series:
if check_same_identifiers(
plex_show_identifiers, stored_series.identifiers
):
logger(f"Found matching show for '{plex_show.title}'", 1)
# Now update episodes.
# Get the list of Plex episodes for this show.
plex_episodes = plex_show.episodes()
for plex_episode in plex_episodes:
plex_episode_identifiers = extract_identifiers_from_item(
plex_episode
)
if watched_show_episodes_status:
for plex_episode in plex_show.episodes():
watched_episode_status = get_video_status(
plex_episode,
watched_episodes_ids,
watched_show_episodes_status,
)
if watched_episode_status:
if watched_episode_status["completed"]:
msg = f"Plex: {plex_show.title} {plex_episode.title} as watched for {user.title} in {library}"
for stored_ep in stored_series.episodes:
if check_same_identifiers(
plex_episode_identifiers, stored_ep.identifiers
):
if stored_ep.status.completed:
msg = f"Plex: {plex_show.title} {plex_episode.title} as watched for {user.title} in {library_name}"
if not dryrun:
logger(msg, 5)
plex_episode.markWatched()
@ -383,16 +180,16 @@ def update_user_watched(user, user_plex, library, watched_videos, dryrun):
"Plex",
user_plex.friendlyName,
user.title,
library,
library_name,
plex_show.title,
plex_episode.title,
)
else:
msg = f"Plex: {plex_show.title} {plex_episode.title} as partially watched for {floor(watched_episode_status['time'] / 60_000)} minutes for {user.title} in {library}"
msg = f"Plex: {plex_show.title} {plex_episode.title} as partially watched for {floor(stored_ep.status.time / 60_000)} minutes for {user.title} in {library_name}"
if not dryrun:
logger(msg, 5)
plex_episode.updateTimeline(
watched_episode_status["time"]
stored_ep.status.time
)
else:
logger(msg, 6)
@ -401,28 +198,19 @@ def update_user_watched(user, user_plex, library, watched_videos, dryrun):
"Plex",
user_plex.friendlyName,
user.title,
library,
library_name,
plex_show.title,
plex_episode.title,
watched_episode_status["time"],
stored_ep.status.time,
)
else:
logger(
f"Plex: Skipping episode {plex_episode.title} as it is not in mark list for {user.title}",
3,
)
else:
logger(
f"Plex: Skipping show {plex_show.title} as it is not in mark list for {user.title}",
3,
)
break # Found a matching episode.
break # Found a matching show.
except Exception as e:
logger(
f"Plex: Failed to update watched for {user.title} in library {library}, Error: {e}",
f"Plex: Failed to update watched for {user.title} in library {library_name}, Error: {e}",
2,
)
logger(traceback.format_exc(), 2)
raise e
# class plex accept base url and token and username and password but default with none
@ -505,10 +293,77 @@ class Plex:
logger(f"Plex: Failed to get libraries, Error: {e}", 2)
raise Exception(e)
def get_watched(self, users, sync_libraries):
def get_user_library_watched(self, user, user_plex, library) -> LibraryData:
user_name: str = user.username.lower() if user.username else user.title.lower()
try:
# Get all libraries
users_watched = {}
logger(
f"Plex: Generating watched for {user_name} in library {library.title}",
0,
)
watched = LibraryData(title=library.title)
library_videos = user_plex.library.section(library.title)
if library.type == "movie":
for video in library_videos.search(
unwatched=False
) + library_videos.search(inProgress=True):
if video.isWatched or video.viewOffset >= 60000:
watched.movies.append(get_mediaitem(video, video.isWatched))
elif library.type == "show":
# Keep track of processed shows to reduce duplicate shows
processed_shows = []
for show in library_videos.search(
unwatched=False
) + library_videos.search(inProgress=True):
if show.key in processed_shows:
continue
processed_shows.append(show.key)
show_guids = extract_guids_from_item(show)
episode_mediaitem = []
for episode in show.episodes():
if episode.isWatched or episode.viewOffset >= 60000:
episode_mediaitem.append(
get_mediaitem(episode, episode.isWatched)
)
if episode_mediaitem:
watched.series.append(
Series(
identifiers=MediaIdentifiers(
title=show.title,
locations=(
tuple(
[
location.split("/")[-1]
for location in show.locations
]
)
if generate_locations
else tuple()
),
imdb_id=show_guids.get("imdb", None),
tvdb_id=show_guids.get("tvdb", None),
tmdb_id=show_guids.get("tmdb", None),
),
episodes=episode_mediaitem,
)
)
return watched
except Exception as e:
logger(
f"Plex: Failed to get watched for {user_name} in library {library.title}, Error: {e}",
2,
)
return LibraryData(title=library.title)
def get_watched(self, users, sync_libraries) -> dict[str, UserData]:
try:
users_watched: dict[str, UserData] = {}
for user in users:
if self.admin_user == user:
@ -525,7 +380,6 @@ class Plex:
f"Plex: Failed to get token for {user.title}, skipping",
2,
)
users_watched[user.title] = {}
continue
libraries = user_plex.library.sections()
@ -534,12 +388,16 @@ class Plex:
if library.title not in sync_libraries:
continue
user_watched = get_user_library_watched(user, user_plex, library)
library_data = self.get_user_library_watched(
user, user_plex, library
)
for user_watched, user_watched_temp in user_watched.items():
if user_watched not in users_watched:
users_watched[user_watched] = {}
users_watched[user_watched].update(user_watched_temp)
if user.title.lower() not in users_watched:
users_watched[user.title.lower()] = UserData()
users_watched[user.title.lower()].libraries[
library.title
] = library_data
return users_watched
except Exception as e:
@ -547,12 +405,14 @@ class Plex:
raise Exception(e)
def update_watched(
self, watched_list, user_mapping=None, library_mapping=None, dryrun=False
self,
watched_list: dict[str, UserData],
user_mapping=None,
library_mapping=None,
dryrun=False,
):
try:
args = []
for user, libraries in watched_list.items():
for user, user_data in watched_list.items():
user_other = None
# If type of user is dict
if user_mapping:
@ -596,48 +456,46 @@ class Plex:
)
continue
for library, watched_videos in libraries.items():
for library_name in user_data.libraries:
library_data = user_data.libraries[library_name]
library_other = None
if library_mapping:
library_other = search_mapping(library_mapping, library)
library_other = search_mapping(library_mapping, library_name)
# if library in plex library list
library_list = user_plex.library.sections()
if library.lower() not in [x.title.lower() for x in library_list]:
if library_name.lower() not in [
x.title.lower() for x in library_list
]:
if library_other:
if library_other.lower() in [
x.title.lower() for x in library_list
]:
logger(
f"Plex: Library {library} not found, but {library_other} found, using {library_other}",
f"Plex: Library {library_name} not found, but {library_other} found, using {library_other}",
1,
)
library = library_other
library_name = library_other
else:
logger(
f"Plex: Library {library} or {library_other} not found in library list",
f"Plex: Library {library_name} or {library_other} not found in library list",
1,
)
continue
else:
logger(
f"Plex: Library {library} not found in library list",
f"Plex: Library {library_name} not found in library list",
1,
)
continue
args.append(
[
update_user_watched,
update_user_watched(
user,
user_plex,
library,
watched_videos,
library_data,
library_name,
dryrun,
]
)
future_thread_executor(args)
except Exception as e:
logger(f"Plex: Failed to update watched, Error: {e}", 2)
raise Exception(e)

View File

@ -1,55 +1,110 @@
import copy
from pydantic import BaseModel
from src.functions import logger, search_mapping, contains_nested
from src.library import generate_library_guids_dict
from src.functions import logger, search_mapping
def check_remove_entry(video, library, video_index, library_watched_list_2):
if video_index is not None:
class MediaIdentifiers(BaseModel):
title: str
# File information, will be folder for series and media file for episode/movie
locations: tuple[str, ...] = tuple()
# Guids
imdb_id: str | None = None
tvdb_id: str | None = None
tmdb_id: str | None = None
class WatchedStatus(BaseModel):
completed: bool
time: int
class MediaItem(BaseModel):
identifiers: MediaIdentifiers
status: WatchedStatus
class Series(BaseModel):
identifiers: MediaIdentifiers
episodes: list[MediaItem] = []
class LibraryData(BaseModel):
title: str
movies: list[MediaItem] = []
series: list[Series] = []
class UserData(BaseModel):
libraries: dict[str, LibraryData] = {}
def check_same_identifiers(item1: MediaIdentifiers, item2: MediaIdentifiers) -> bool:
# Check for duplicate based on file locations:
if item1.locations and item2.locations:
if set(item1.locations) & set(item2.locations):
return True
# Check for duplicate based on GUIDs:
if (
library_watched_list_2["completed"][video_index]
== video["status"]["completed"]
) and (library_watched_list_2["time"][video_index] == video["status"]["time"]):
logger(
f"Removing {video['title']} from {library} due to exact match",
3,
)
return True
elif (
library_watched_list_2["completed"][video_index] == True
and video["status"]["completed"] == False
(item1.imdb_id and item2.imdb_id and item1.imdb_id == item2.imdb_id)
or (item1.tvdb_id and item2.tvdb_id and item1.tvdb_id == item2.tvdb_id)
or (item1.tmdb_id and item2.tmdb_id and item1.tmdb_id == item2.tmdb_id)
):
logger(
f"Removing {video['title']} from {library} due to being complete in one library and not the other",
3,
)
return True
elif (
library_watched_list_2["completed"][video_index] == False
and video["status"]["completed"] == False
) and (video["status"]["time"] < library_watched_list_2["time"][video_index]):
logger(
f"Removing {video['title']} from {library} due to more time watched in one library than the other",
3,
)
return True
elif (
library_watched_list_2["completed"][video_index] == True
and video["status"]["completed"] == True
):
logger(
f"Removing {video['title']} from {library} due to being complete in both libraries",
3,
)
return True
return False
def check_remove_entry(item1: MediaItem, item2: MediaItem) -> bool:
"""
Returns True if item1 (from watched_list_1) should be removed
in favor of item2 (from watched_list_2), based on:
- Duplicate criteria:
* They match if any file location is shared OR
at least one of imdb_id, tvdb_id, or tmdb_id matches.
- Watched status:
* If one is complete and the other is not, remove the incomplete one.
* If both are incomplete, remove the one with lower progress (time).
* If both are complete, remove item1 as duplicate.
"""
if not check_same_identifiers(item1.identifiers, item2.identifiers):
return False
# Compare watched statuses.
status1 = item1.status
status2 = item2.status
# If one is complete and the other isn't, remove the one that's not complete.
if status1.completed != status2.completed:
if not status1.completed and status2.completed:
return True # Remove item1 since it's not complete.
else:
return False # Do not remove item1; it's complete.
# Both have the same completed status.
if not status1.completed and not status2.completed:
# Both incomplete: remove the one with lower progress (time)
if status1.time < status2.time:
return True # Remove item1 because it has watched less.
elif status1.time > status2.time:
return False # Keep item1 because it has more progress.
else:
# Same progress; Remove duplicate
return True
# If both are complete, consider item1 the duplicate and remove it.
return True
def cleanup_watched(
watched_list_1, watched_list_2, user_mapping=None, library_mapping=None
):
watched_list_1: dict[str, UserData],
watched_list_2: dict[str, UserData],
user_mapping=None,
library_mapping=None,
) -> dict[str, UserData]:
modified_watched_list_1 = copy.deepcopy(watched_list_1)
# remove entries from watched_list_1 that are in watched_list_2
@ -61,84 +116,86 @@ def cleanup_watched(
if user_2 is None:
continue
for library_1 in watched_list_1[user_1]:
for library_1_key in watched_list_1[user_1].libraries:
library_other = None
if library_mapping:
library_other = search_mapping(library_mapping, library_1)
library_2 = get_other(watched_list_2[user_2], library_1, library_other)
if library_2 is None:
library_other = search_mapping(library_mapping, library_1_key)
library_2_key = get_other(
watched_list_2[user_2].libraries, library_1_key, library_other
)
if library_2_key is None:
continue
(
_,
episode_watched_list_2_keys_dict,
movies_watched_list_2_keys_dict,
) = generate_library_guids_dict(watched_list_2[user_2][library_2])
library_1 = watched_list_1[user_1].libraries[library_1_key]
library_2 = watched_list_2[user_2].libraries[library_2_key]
# Movies
if isinstance(watched_list_1[user_1][library_1], list):
for movie in watched_list_1[user_1][library_1]:
movie_index = get_movie_index_in_dict(
movie, movies_watched_list_2_keys_dict
)
if movie_index is not None:
if check_remove_entry(
movie,
library_1,
movie_index,
movies_watched_list_2_keys_dict,
):
modified_watched_list_1[user_1][library_1].remove(movie)
filtered_movies = []
for movie in library_1.movies:
remove_flag = False
for movie2 in library_2.movies:
if check_remove_entry(movie, movie2):
logger(f"Removing movie: {movie.identifiers.title}", 3)
remove_flag = True
break
if not remove_flag:
filtered_movies.append(movie)
modified_watched_list_1[user_1].libraries[
library_1_key
].movies = filtered_movies
# TV Shows
elif isinstance(watched_list_1[user_1][library_1], dict):
for show_key_1 in watched_list_1[user_1][library_1].keys():
show_key_dict = dict(show_key_1)
filtered_series_list = []
for series1 in library_1.series:
matching_series = None
for series2 in library_2.series:
if check_same_identifiers(series1.identifiers, series2.identifiers):
matching_series = series2
break
# Filter the episode_watched_list_2_keys_dict dictionary to handle cases
# where episode location names are not unique such as S01E01.mkv
filtered_episode_watched_list_2_keys_dict = (
filter_episode_watched_list_2_keys_dict(
episode_watched_list_2_keys_dict, show_key_dict
)
)
for episode in watched_list_1[user_1][library_1][show_key_1]:
episode_index = get_episode_index_in_dict(
episode, filtered_episode_watched_list_2_keys_dict
)
if episode_index is not None:
if check_remove_entry(
episode,
library_1,
episode_index,
episode_watched_list_2_keys_dict,
):
modified_watched_list_1[user_1][library_1][
show_key_1
].remove(episode)
# Remove empty shows
if len(modified_watched_list_1[user_1][library_1][show_key_1]) == 0:
if show_key_1 in modified_watched_list_1[user_1][library_1]:
if matching_series is None:
# No matching show in watched_list_2; keep the series as is.
filtered_series_list.append(series1)
else:
# We have a matching show; now clean up the episodes.
filtered_episodes = []
for ep1 in series1.episodes:
remove_flag = False
for ep2 in matching_series.episodes:
if check_remove_entry(ep1, ep2):
logger(
f"Removing {show_key_dict['title']} because it is empty",
f"Removing episode '{ep1.identifiers.title}' from show '{series1.identifiers.title}'",
3,
)
del modified_watched_list_1[user_1][library_1][show_key_1]
remove_flag = True
break
if not remove_flag:
filtered_episodes.append(ep1)
for user_1 in watched_list_1:
for library_1 in watched_list_1[user_1]:
if library_1 in modified_watched_list_1[user_1]:
# If library is empty then remove it
if len(modified_watched_list_1[user_1][library_1]) == 0:
logger(f"Removing {library_1} from {user_1} because it is empty", 1)
del modified_watched_list_1[user_1][library_1]
# Only keep the series if there are remaining episodes.
if filtered_episodes:
modified_series1 = copy.deepcopy(series1)
modified_series1.episodes = filtered_episodes
filtered_series_list.append(modified_series1)
else:
logger(
f"Removing entire show '{series1.identifiers.title}' as no episodes remain after cleanup.",
3,
)
modified_watched_list_1[user_1].libraries[
library_1_key
].series = filtered_series_list
if user_1 in modified_watched_list_1:
# If user is empty delete user
if len(modified_watched_list_1[user_1]) == 0:
logger(f"Removing {user_1} from watched list 1 because it is empty", 1)
del modified_watched_list_1[user_1]
# After processing, remove any library that is completely empty.
for user, user_data in modified_watched_list_1.items():
new_libraries = {}
for lib_key, library in user_data.libraries.items():
if library.movies or library.series:
new_libraries[lib_key] = library
else:
logger(f"Removing empty library '{lib_key}' for user '{user}'", 3)
user_data.libraries = new_libraries
return modified_watched_list_1
@ -151,105 +208,3 @@ def get_other(watched_list, object_1, object_2):
else:
logger(f"{object_1} and {object_2} not found in watched list 2", 1)
return None
def get_movie_index_in_dict(movie, movies_watched_list_2_keys_dict):
# Iterate through the keys and values of the movie dictionary
for movie_key, movie_value in movie.items():
# If the key is "locations", check if the "locations" key is present in the movies_watched_list_2_keys_dict dictionary
if movie_key == "locations":
if "locations" in movies_watched_list_2_keys_dict.keys():
# Iterate through the locations in the movie dictionary
for location in movie_value:
# If the location is in the movies_watched_list_2_keys_dict dictionary, return index of the key
return contains_nested(
location, movies_watched_list_2_keys_dict["locations"]
)
# If the key is not "locations", check if the movie_key is present in the movies_watched_list_2_keys_dict dictionary
else:
if movie_key in movies_watched_list_2_keys_dict.keys():
# If the movie_value is in the movies_watched_list_2_keys_dict dictionary, return True
if movie_value in movies_watched_list_2_keys_dict[movie_key]:
return movies_watched_list_2_keys_dict[movie_key].index(movie_value)
# If the loop completes without finding a match, return False
return None
def filter_episode_watched_list_2_keys_dict(
episode_watched_list_2_keys_dict, show_key_dict
):
# If the episode_watched_list_2_keys_dict dictionary is empty, missing show then return an empty dictionary
if (
len(episode_watched_list_2_keys_dict) == 0
or "show" not in episode_watched_list_2_keys_dict.keys()
):
return {}
# Filter the episode_watched_list_2_keys_dict dictionary to only include values for the correct show
filtered_episode_watched_list_2_keys_dict = {}
show_indecies = []
# Iterate through episode_watched_list_2_keys_dict["show"] and find the indecies that match show_key_dict
for show_index, show_value in enumerate(episode_watched_list_2_keys_dict["show"]):
# Iterate through the keys and values of the show_value dictionary and check if they match show_key_dict
for show_key, show_key_value in show_value.items():
if show_key == "locations":
# Iterate through the locations in the show_value dictionary
for location in show_key_value:
# If the location is in the episode_watched_list_2_keys_dict dictionary, return index of the key
if (
contains_nested(location, show_key_dict["locations"])
is not None
):
show_indecies.append(show_index)
break
else:
if show_key in show_key_dict.keys():
if show_key_value == show_key_dict[show_key]:
show_indecies.append(show_index)
break
# lists
indecies = list(set(show_indecies))
# If there are no indecies that match the show, return an empty dictionary
if len(indecies) == 0:
return {}
# Create a copy of the dictionary with indecies that match the show and none that don't
for key, value in episode_watched_list_2_keys_dict.items():
if key not in filtered_episode_watched_list_2_keys_dict:
filtered_episode_watched_list_2_keys_dict[key] = []
for index, _ in enumerate(value):
if index in indecies:
filtered_episode_watched_list_2_keys_dict[key].append(value[index])
else:
filtered_episode_watched_list_2_keys_dict[key].append(None)
return filtered_episode_watched_list_2_keys_dict
def get_episode_index_in_dict(episode, episode_watched_list_2_keys_dict):
# Iterate through the keys and values of the episode dictionary
for episode_key, episode_value in episode.items():
if episode_key in episode_watched_list_2_keys_dict.keys():
if episode_key == "locations":
# Iterate through the locations in the episode dictionary
for location in episode_value:
# If the location is in the episode_watched_list_2_keys_dict dictionary, return index of the key
return contains_nested(
location, episode_watched_list_2_keys_dict["locations"]
)
else:
# If the episode_value is in the episode_watched_list_2_keys_dict dictionary, return True
if episode_value in episode_watched_list_2_keys_dict[episode_key]:
return episode_watched_list_2_keys_dict[episode_key].index(
episode_value
)
# If the loop completes without finding a match, return False
return None

View File

@ -21,10 +21,6 @@ from src.library import (
check_skip_logic,
check_blacklist_logic,
check_whitelist_logic,
show_title_dict,
episode_title_dict,
movies_title_dict,
generate_library_guids_dict,
)
blacklist_library = ["TV Shows"]
@ -280,45 +276,3 @@ def test_check_whitelist_logic():
)
assert skip_reason is None
def test_show_title_dict():
show_titles_dict = show_title_dict(show_list)
assert show_titles_dict == show_titles
def test_episode_title_dict():
episode_titles_dict = episode_title_dict(show_list)
assert episode_titles_dict == episode_titles
def test_movies_title_dict():
movies_titles_dict = movies_title_dict(movie_list)
assert movies_titles_dict == movie_titles
def test_generate_library_guids_dict():
# Test with shows
(
show_titles_dict,
episode_titles_dict,
movies_titles_dict,
) = generate_library_guids_dict(show_list)
assert show_titles_dict == show_titles
assert episode_titles_dict == episode_titles
assert movies_titles_dict == {}
# Test with movies
(
show_titles_dict,
episode_titles_dict,
movies_titles_dict,
) = generate_library_guids_dict(movie_list)
assert show_titles_dict == {}
assert episode_titles_dict == {}
assert movies_titles_dict == movie_titles

File diff suppressed because it is too large Load Diff