Use pydantic for structure

Complete redesign of everything using pydantic to create the
watched structure. This brings in type checking support and
simplifies a lot of things

Signed-off-by: Luis Garcia <git@luigi311.com>
This commit is contained in:
Luis Garcia
2025-02-19 00:51:21 -07:00
parent a1e1ccde42
commit ce5b810a5b
8 changed files with 1342 additions and 1662 deletions

View File

@@ -1,55 +1,110 @@
import copy
from pydantic import BaseModel
from src.functions import logger, search_mapping, contains_nested
from src.library import generate_library_guids_dict
from src.functions import logger, search_mapping
def check_remove_entry(video, library, video_index, library_watched_list_2):
if video_index is not None:
if (
library_watched_list_2["completed"][video_index]
== video["status"]["completed"]
) and (library_watched_list_2["time"][video_index] == video["status"]["time"]):
logger(
f"Removing {video['title']} from {library} due to exact match",
3,
)
return True
elif (
library_watched_list_2["completed"][video_index] == True
and video["status"]["completed"] == False
):
logger(
f"Removing {video['title']} from {library} due to being complete in one library and not the other",
3,
)
return True
elif (
library_watched_list_2["completed"][video_index] == False
and video["status"]["completed"] == False
) and (video["status"]["time"] < library_watched_list_2["time"][video_index]):
logger(
f"Removing {video['title']} from {library} due to more time watched in one library than the other",
3,
)
return True
elif (
library_watched_list_2["completed"][video_index] == True
and video["status"]["completed"] == True
):
logger(
f"Removing {video['title']} from {library} due to being complete in both libraries",
3,
)
class MediaIdentifiers(BaseModel):
title: str
# File information, will be folder for series and media file for episode/movie
locations: tuple[str, ...] = tuple()
# Guids
imdb_id: str | None = None
tvdb_id: str | None = None
tmdb_id: str | None = None
class WatchedStatus(BaseModel):
completed: bool
time: int
class MediaItem(BaseModel):
identifiers: MediaIdentifiers
status: WatchedStatus
class Series(BaseModel):
identifiers: MediaIdentifiers
episodes: list[MediaItem] = []
class LibraryData(BaseModel):
title: str
movies: list[MediaItem] = []
series: list[Series] = []
class UserData(BaseModel):
libraries: dict[str, LibraryData] = {}
def check_same_identifiers(item1: MediaIdentifiers, item2: MediaIdentifiers) -> bool:
# Check for duplicate based on file locations:
if item1.locations and item2.locations:
if set(item1.locations) & set(item2.locations):
return True
# Check for duplicate based on GUIDs:
if (
(item1.imdb_id and item2.imdb_id and item1.imdb_id == item2.imdb_id)
or (item1.tvdb_id and item2.tvdb_id and item1.tvdb_id == item2.tvdb_id)
or (item1.tmdb_id and item2.tmdb_id and item1.tmdb_id == item2.tmdb_id)
):
return True
return False
def check_remove_entry(item1: MediaItem, item2: MediaItem) -> bool:
"""
Returns True if item1 (from watched_list_1) should be removed
in favor of item2 (from watched_list_2), based on:
- Duplicate criteria:
* They match if any file location is shared OR
at least one of imdb_id, tvdb_id, or tmdb_id matches.
- Watched status:
* If one is complete and the other is not, remove the incomplete one.
* If both are incomplete, remove the one with lower progress (time).
* If both are complete, remove item1 as duplicate.
"""
if not check_same_identifiers(item1.identifiers, item2.identifiers):
return False
# Compare watched statuses.
status1 = item1.status
status2 = item2.status
# If one is complete and the other isn't, remove the one that's not complete.
if status1.completed != status2.completed:
if not status1.completed and status2.completed:
return True # Remove item1 since it's not complete.
else:
return False # Do not remove item1; it's complete.
# Both have the same completed status.
if not status1.completed and not status2.completed:
# Both incomplete: remove the one with lower progress (time)
if status1.time < status2.time:
return True # Remove item1 because it has watched less.
elif status1.time > status2.time:
return False # Keep item1 because it has more progress.
else:
# Same progress; Remove duplicate
return True
# If both are complete, consider item1 the duplicate and remove it.
return True
def cleanup_watched(
watched_list_1, watched_list_2, user_mapping=None, library_mapping=None
):
watched_list_1: dict[str, UserData],
watched_list_2: dict[str, UserData],
user_mapping=None,
library_mapping=None,
) -> dict[str, UserData]:
modified_watched_list_1 = copy.deepcopy(watched_list_1)
# remove entries from watched_list_1 that are in watched_list_2
@@ -61,84 +116,86 @@ def cleanup_watched(
if user_2 is None:
continue
for library_1 in watched_list_1[user_1]:
for library_1_key in watched_list_1[user_1].libraries:
library_other = None
if library_mapping:
library_other = search_mapping(library_mapping, library_1)
library_2 = get_other(watched_list_2[user_2], library_1, library_other)
if library_2 is None:
library_other = search_mapping(library_mapping, library_1_key)
library_2_key = get_other(
watched_list_2[user_2].libraries, library_1_key, library_other
)
if library_2_key is None:
continue
(
_,
episode_watched_list_2_keys_dict,
movies_watched_list_2_keys_dict,
) = generate_library_guids_dict(watched_list_2[user_2][library_2])
library_1 = watched_list_1[user_1].libraries[library_1_key]
library_2 = watched_list_2[user_2].libraries[library_2_key]
# Movies
if isinstance(watched_list_1[user_1][library_1], list):
for movie in watched_list_1[user_1][library_1]:
movie_index = get_movie_index_in_dict(
movie, movies_watched_list_2_keys_dict
)
if movie_index is not None:
if check_remove_entry(
movie,
library_1,
movie_index,
movies_watched_list_2_keys_dict,
):
modified_watched_list_1[user_1][library_1].remove(movie)
filtered_movies = []
for movie in library_1.movies:
remove_flag = False
for movie2 in library_2.movies:
if check_remove_entry(movie, movie2):
logger(f"Removing movie: {movie.identifiers.title}", 3)
remove_flag = True
break
if not remove_flag:
filtered_movies.append(movie)
modified_watched_list_1[user_1].libraries[
library_1_key
].movies = filtered_movies
# TV Shows
elif isinstance(watched_list_1[user_1][library_1], dict):
for show_key_1 in watched_list_1[user_1][library_1].keys():
show_key_dict = dict(show_key_1)
filtered_series_list = []
for series1 in library_1.series:
matching_series = None
for series2 in library_2.series:
if check_same_identifiers(series1.identifiers, series2.identifiers):
matching_series = series2
break
# Filter the episode_watched_list_2_keys_dict dictionary to handle cases
# where episode location names are not unique such as S01E01.mkv
filtered_episode_watched_list_2_keys_dict = (
filter_episode_watched_list_2_keys_dict(
episode_watched_list_2_keys_dict, show_key_dict
if matching_series is None:
# No matching show in watched_list_2; keep the series as is.
filtered_series_list.append(series1)
else:
# We have a matching show; now clean up the episodes.
filtered_episodes = []
for ep1 in series1.episodes:
remove_flag = False
for ep2 in matching_series.episodes:
if check_remove_entry(ep1, ep2):
logger(
f"Removing episode '{ep1.identifiers.title}' from show '{series1.identifiers.title}'",
3,
)
remove_flag = True
break
if not remove_flag:
filtered_episodes.append(ep1)
# Only keep the series if there are remaining episodes.
if filtered_episodes:
modified_series1 = copy.deepcopy(series1)
modified_series1.episodes = filtered_episodes
filtered_series_list.append(modified_series1)
else:
logger(
f"Removing entire show '{series1.identifiers.title}' as no episodes remain after cleanup.",
3,
)
)
for episode in watched_list_1[user_1][library_1][show_key_1]:
episode_index = get_episode_index_in_dict(
episode, filtered_episode_watched_list_2_keys_dict
)
if episode_index is not None:
if check_remove_entry(
episode,
library_1,
episode_index,
episode_watched_list_2_keys_dict,
):
modified_watched_list_1[user_1][library_1][
show_key_1
].remove(episode)
modified_watched_list_1[user_1].libraries[
library_1_key
].series = filtered_series_list
# Remove empty shows
if len(modified_watched_list_1[user_1][library_1][show_key_1]) == 0:
if show_key_1 in modified_watched_list_1[user_1][library_1]:
logger(
f"Removing {show_key_dict['title']} because it is empty",
3,
)
del modified_watched_list_1[user_1][library_1][show_key_1]
for user_1 in watched_list_1:
for library_1 in watched_list_1[user_1]:
if library_1 in modified_watched_list_1[user_1]:
# If library is empty then remove it
if len(modified_watched_list_1[user_1][library_1]) == 0:
logger(f"Removing {library_1} from {user_1} because it is empty", 1)
del modified_watched_list_1[user_1][library_1]
if user_1 in modified_watched_list_1:
# If user is empty delete user
if len(modified_watched_list_1[user_1]) == 0:
logger(f"Removing {user_1} from watched list 1 because it is empty", 1)
del modified_watched_list_1[user_1]
# After processing, remove any library that is completely empty.
for user, user_data in modified_watched_list_1.items():
new_libraries = {}
for lib_key, library in user_data.libraries.items():
if library.movies or library.series:
new_libraries[lib_key] = library
else:
logger(f"Removing empty library '{lib_key}' for user '{user}'", 3)
user_data.libraries = new_libraries
return modified_watched_list_1
@@ -151,105 +208,3 @@ def get_other(watched_list, object_1, object_2):
else:
logger(f"{object_1} and {object_2} not found in watched list 2", 1)
return None
def get_movie_index_in_dict(movie, movies_watched_list_2_keys_dict):
# Iterate through the keys and values of the movie dictionary
for movie_key, movie_value in movie.items():
# If the key is "locations", check if the "locations" key is present in the movies_watched_list_2_keys_dict dictionary
if movie_key == "locations":
if "locations" in movies_watched_list_2_keys_dict.keys():
# Iterate through the locations in the movie dictionary
for location in movie_value:
# If the location is in the movies_watched_list_2_keys_dict dictionary, return index of the key
return contains_nested(
location, movies_watched_list_2_keys_dict["locations"]
)
# If the key is not "locations", check if the movie_key is present in the movies_watched_list_2_keys_dict dictionary
else:
if movie_key in movies_watched_list_2_keys_dict.keys():
# If the movie_value is in the movies_watched_list_2_keys_dict dictionary, return True
if movie_value in movies_watched_list_2_keys_dict[movie_key]:
return movies_watched_list_2_keys_dict[movie_key].index(movie_value)
# If the loop completes without finding a match, return False
return None
def filter_episode_watched_list_2_keys_dict(
episode_watched_list_2_keys_dict, show_key_dict
):
# If the episode_watched_list_2_keys_dict dictionary is empty, missing show then return an empty dictionary
if (
len(episode_watched_list_2_keys_dict) == 0
or "show" not in episode_watched_list_2_keys_dict.keys()
):
return {}
# Filter the episode_watched_list_2_keys_dict dictionary to only include values for the correct show
filtered_episode_watched_list_2_keys_dict = {}
show_indecies = []
# Iterate through episode_watched_list_2_keys_dict["show"] and find the indecies that match show_key_dict
for show_index, show_value in enumerate(episode_watched_list_2_keys_dict["show"]):
# Iterate through the keys and values of the show_value dictionary and check if they match show_key_dict
for show_key, show_key_value in show_value.items():
if show_key == "locations":
# Iterate through the locations in the show_value dictionary
for location in show_key_value:
# If the location is in the episode_watched_list_2_keys_dict dictionary, return index of the key
if (
contains_nested(location, show_key_dict["locations"])
is not None
):
show_indecies.append(show_index)
break
else:
if show_key in show_key_dict.keys():
if show_key_value == show_key_dict[show_key]:
show_indecies.append(show_index)
break
# lists
indecies = list(set(show_indecies))
# If there are no indecies that match the show, return an empty dictionary
if len(indecies) == 0:
return {}
# Create a copy of the dictionary with indecies that match the show and none that don't
for key, value in episode_watched_list_2_keys_dict.items():
if key not in filtered_episode_watched_list_2_keys_dict:
filtered_episode_watched_list_2_keys_dict[key] = []
for index, _ in enumerate(value):
if index in indecies:
filtered_episode_watched_list_2_keys_dict[key].append(value[index])
else:
filtered_episode_watched_list_2_keys_dict[key].append(None)
return filtered_episode_watched_list_2_keys_dict
def get_episode_index_in_dict(episode, episode_watched_list_2_keys_dict):
# Iterate through the keys and values of the episode dictionary
for episode_key, episode_value in episode.items():
if episode_key in episode_watched_list_2_keys_dict.keys():
if episode_key == "locations":
# Iterate through the locations in the episode dictionary
for location in episode_value:
# If the location is in the episode_watched_list_2_keys_dict dictionary, return index of the key
return contains_nested(
location, episode_watched_list_2_keys_dict["locations"]
)
else:
# If the episode_value is in the episode_watched_list_2_keys_dict dictionary, return True
if episode_value in episode_watched_list_2_keys_dict[episode_key]:
return episode_watched_list_2_keys_dict[episode_key].index(
episode_value
)
# If the loop completes without finding a match, return False
return None