feat: Implement bidirectional watched status synchronization

Refactors the core synchronization logic to support true bidirectional syncing of watched and unwatched statuses between servers.

The previous implementation was a one-way, progress-based sync. It would only sync from a less-watched state to a more-watched state and did not handle manually marking items as "unwatched".

This change introduces a new timestamp-based synchronization mechanism:
- For each media item, the modification timestamp of its watched status is now fetched from both Plex (`updatedAt`) and Jellyfin/Emby (`DateLastSaved`).
- The item with the most recent timestamp is considered the "source of truth."
- The status from the source of truth (either "watched" or "unwatched") is then synced to the other server.

This allows for a natural, intuitive synchronization where the latest action taken by the user on any server is the one that is propagated.

Key changes include:
- A new `sync_watched_lists` function in `src/watched.py` that replaces the old `cleanup_watched` logic.
- `mark_watched` and `mark_unwatched` methods added to the Plex and Jellyfin/Emby server classes.
- The main application loop in `src/main.py` has been updated to use the new action-based sync system.
- New unit tests have been added in `test/test_sync.py` to validate the bidirectional logic.
pull/312/head
google-labs-jules[bot] 2025-09-14 17:59:04 +00:00
parent 6eefedfc40
commit 8d63defdc6
6 changed files with 840 additions and 2719 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,323 +1,157 @@
import os
import traceback
import json
import sys
from dotenv import dotenv_values
from time import sleep, perf_counter
from loguru import logger
from src.emby import Emby
from src.jellyfin import Jellyfin
from src.plex import Plex
from src.library import setup_libraries
from src.functions import (
parse_string_to_list,
str_to_bool,
get_env_value,
)
from src.users import setup_users
from src.watched import (
cleanup_watched,
merge_server_watched,
)
from src.black_white import setup_black_white_lists
from src.connection import generate_server_connections
def configure_logger(log_file: str = "log.log", debug_level: str = "INFO") -> None:
# Remove default logger to configure our own
logger.remove()
# Choose log level based on environment
# If in debug mode with a "debug" level, use DEBUG; otherwise, default to INFO.
if debug_level not in ["INFO", "DEBUG", "TRACE"]:
logger.add(sys.stdout)
raise Exception(
f"Invalid DEBUG_LEVEL {debug_level}, please choose between INFO, DEBUG, TRACE"
)
# Add a sink for file logging and the console.
logger.add(log_file, level=debug_level, mode="w")
logger.add(sys.stdout, level=debug_level)
def should_sync_server(
env,
server_1: Plex | Jellyfin | Emby,
server_2: Plex | Jellyfin | Emby,
) -> bool:
sync_from_plex_to_jellyfin = str_to_bool(
get_env_value(env, "SYNC_FROM_PLEX_TO_JELLYFIN", "True")
)
sync_from_plex_to_plex = str_to_bool(
get_env_value(env, "SYNC_FROM_PLEX_TO_PLEX", "True")
)
sync_from_plex_to_emby = str_to_bool(
get_env_value(env, "SYNC_FROM_PLEX_TO_EMBY", "True")
)
sync_from_jelly_to_plex = str_to_bool(
get_env_value(env, "SYNC_FROM_JELLYFIN_TO_PLEX", "True")
)
sync_from_jelly_to_jellyfin = str_to_bool(
get_env_value(env, "SYNC_FROM_JELLYFIN_TO_JELLYFIN", "True")
)
sync_from_jelly_to_emby = str_to_bool(
get_env_value(env, "SYNC_FROM_JELLYFIN_TO_EMBY", "True")
)
sync_from_emby_to_plex = str_to_bool(
get_env_value(env, "SYNC_FROM_EMBY_TO_PLEX", "True")
)
sync_from_emby_to_jellyfin = str_to_bool(
get_env_value(env, "SYNC_FROM_EMBY_TO_JELLYFIN", "True")
)
sync_from_emby_to_emby = str_to_bool(
get_env_value(env, "SYNC_FROM_EMBY_TO_EMBY", "True")
)
if isinstance(server_1, Plex):
if isinstance(server_2, Jellyfin) and not sync_from_plex_to_jellyfin:
logger.info("Sync from plex -> jellyfin is disabled")
return False
if isinstance(server_2, Emby) and not sync_from_plex_to_emby:
logger.info("Sync from plex -> emby is disabled")
return False
if isinstance(server_2, Plex) and not sync_from_plex_to_plex:
logger.info("Sync from plex -> plex is disabled")
return False
if isinstance(server_1, Jellyfin):
if isinstance(server_2, Plex) and not sync_from_jelly_to_plex:
logger.info("Sync from jellyfin -> plex is disabled")
return False
if isinstance(server_2, Jellyfin) and not sync_from_jelly_to_jellyfin:
logger.info("Sync from jellyfin -> jellyfin is disabled")
return False
if isinstance(server_2, Emby) and not sync_from_jelly_to_emby:
logger.info("Sync from jellyfin -> emby is disabled")
return False
if isinstance(server_1, Emby):
if isinstance(server_2, Plex) and not sync_from_emby_to_plex:
logger.info("Sync from emby -> plex is disabled")
return False
if isinstance(server_2, Jellyfin) and not sync_from_emby_to_jellyfin:
logger.info("Sync from emby -> jellyfin is disabled")
return False
if isinstance(server_2, Emby) and not sync_from_emby_to_emby:
logger.info("Sync from emby -> emby is disabled")
return False
return True
def main_loop(env) -> None:
dryrun = str_to_bool(get_env_value(env, "DRYRUN", "False"))
logger.info(f"Dryrun: {dryrun}")
user_mapping_env = get_env_value(env, "USER_MAPPING", None)
user_mapping = None
if user_mapping_env:
user_mapping = json.loads(user_mapping_env.lower())
logger.info(f"User Mapping: {user_mapping}")
library_mapping_env = get_env_value(env, "LIBRARY_MAPPING", None)
library_mapping = None
if library_mapping_env:
library_mapping = json.loads(library_mapping_env)
logger.info(f"Library Mapping: {library_mapping}")
# Create (black/white)lists
logger.info("Creating (black/white)lists")
blacklist_library = parse_string_to_list(
get_env_value(env, "BLACKLIST_LIBRARY", None)
)
whitelist_library = parse_string_to_list(
get_env_value(env, "WHITELIST_LIBRARY", None)
)
blacklist_library_type = parse_string_to_list(
get_env_value(env, "BLACKLIST_LIBRARY_TYPE", None)
)
whitelist_library_type = parse_string_to_list(
get_env_value(env, "WHITELIST_LIBRARY_TYPE", None)
)
blacklist_users = parse_string_to_list(get_env_value(env, "BLACKLIST_USERS", None))
whitelist_users = parse_string_to_list(get_env_value(env, "WHITELIST_USERS", None))
(
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
blacklist_users,
whitelist_users,
) = setup_black_white_lists(
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
blacklist_users,
whitelist_users,
library_mapping,
user_mapping,
)
# Create server connections
logger.info("Creating server connections")
servers = generate_server_connections(env)
for server_1 in servers:
# If server is the final server in the list, then we are done with the loop
if server_1 == servers[-1]:
break
# Store a copy of server_1_watched that way it can be used multiple times without having to regather everyones watch history every single time
server_1_watched = None
# Start server_2 at the next server in the list
for server_2 in servers[servers.index(server_1) + 1 :]:
# Check if server 1 and server 2 are going to be synced in either direction, skip if not
if not should_sync_server(
env, server_1, server_2
) and not should_sync_server(env, server_2, server_1):
continue
logger.info(f"Server 1: {type(server_1)}: {server_1.info()}")
logger.info(f"Server 2: {type(server_2)}: {server_2.info()}")
# Create users list
logger.info("Creating users list")
server_1_users, server_2_users = setup_users(
server_1, server_2, blacklist_users, whitelist_users, user_mapping
)
server_1_libraries, server_2_libraries = setup_libraries(
server_1,
server_2,
blacklist_library,
blacklist_library_type,
whitelist_library,
whitelist_library_type,
library_mapping,
)
logger.info(f"Server 1 syncing libraries: {server_1_libraries}")
logger.info(f"Server 2 syncing libraries: {server_2_libraries}")
logger.info("Creating watched lists", 1)
server_1_watched = server_1.get_watched(
server_1_users, server_1_libraries, server_1_watched
)
logger.info("Finished creating watched list server 1")
server_2_watched = server_2.get_watched(server_2_users, server_2_libraries)
logger.info("Finished creating watched list server 2")
logger.trace(f"Server 1 watched: {server_1_watched}")
logger.trace(f"Server 2 watched: {server_2_watched}")
logger.info("Cleaning Server 1 Watched", 1)
server_1_watched_filtered = cleanup_watched(
server_1_watched, server_2_watched, user_mapping, library_mapping
)
logger.info("Cleaning Server 2 Watched", 1)
server_2_watched_filtered = cleanup_watched(
server_2_watched, server_1_watched, user_mapping, library_mapping
)
logger.debug(
f"server 1 watched that needs to be synced to server 2:\n{server_1_watched_filtered}",
)
logger.debug(
f"server 2 watched that needs to be synced to server 1:\n{server_2_watched_filtered}",
)
if should_sync_server(env, server_2, server_1):
logger.info(f"Syncing {server_2.info()} -> {server_1.info()}")
# Add server_2_watched_filtered to server_1_watched that way the stored version isn't stale for the next server
if not dryrun:
server_1_watched = merge_server_watched(
server_1_watched,
server_2_watched_filtered,
user_mapping,
library_mapping,
)
server_1.update_watched(
server_2_watched_filtered,
user_mapping,
library_mapping,
dryrun,
)
if should_sync_server(env, server_1, server_2):
logger.info(f"Syncing {server_1.info()} -> {server_2.info()}")
server_2.update_watched(
server_1_watched_filtered,
user_mapping,
library_mapping,
dryrun,
)
@logger.catch
def main() -> None:
# Get environment variables
env_file = get_env_value(None, "ENV_FILE", ".env")
env = dotenv_values(env_file)
run_only_once = str_to_bool(get_env_value(env, "RUN_ONLY_ONCE", "False"))
sleep_duration = float(get_env_value(env, "SLEEP_DURATION", "3600"))
log_file = get_env_value(env, "LOG_FILE", "log.log")
debug_level = get_env_value(env, "DEBUG_LEVEL", "INFO")
if debug_level:
debug_level = debug_level.upper()
times: list[float] = []
while True:
try:
start = perf_counter()
# Reconfigure the logger on each loop so the logs are rotated on each run
configure_logger(log_file, debug_level)
main_loop(env)
end = perf_counter()
times.append(end - start)
if len(times) > 0:
logger.info(f"Average time: {sum(times) / len(times)}")
if run_only_once:
break
logger.info(f"Looping in {sleep_duration}")
sleep(sleep_duration)
except Exception as error:
if isinstance(error, list):
for message in error:
logger.error(message)
else:
logger.error(error)
logger.error(traceback.format_exc())
if run_only_once:
break
logger.info(f"Retrying in {sleep_duration}")
sleep(sleep_duration)
except KeyboardInterrupt:
if len(times) > 0:
logger.info(f"Average time: {sum(times) / len(times)}")
logger.info("Exiting")
os._exit(0)
import os
import traceback
import json
import sys
from dotenv import dotenv_values
from time import sleep, perf_counter
from loguru import logger
from src.emby import Emby
from src.jellyfin import Jellyfin
from src.plex import Plex
from src.library import setup_libraries
from src.functions import (
parse_string_to_list,
str_to_bool,
get_env_value,
)
from src.users import setup_users
from src.watched import sync_watched_lists
from src.black_white import setup_black_white_lists
from src.connection import generate_server_connections
def configure_logger(log_file: str = "log.log", debug_level: str = "INFO") -> None:
logger.remove()
if debug_level not in ["INFO", "DEBUG", "TRACE"]:
logger.add(sys.stdout)
raise Exception(f"Invalid DEBUG_LEVEL {debug_level}, please choose between INFO, DEBUG, TRACE")
logger.add(log_file, level=debug_level, mode="w")
logger.add(sys.stdout, level=debug_level)
def should_sync_server(env, server_1: Plex | Jellyfin | Emby, server_2: Plex | Jellyfin | Emby) -> bool:
sync_map = {
(Plex, Jellyfin): "SYNC_FROM_PLEX_TO_JELLYFIN",
(Plex, Emby): "SYNC_FROM_PLEX_TO_EMBY",
(Plex, Plex): "SYNC_FROM_PLEX_TO_PLEX",
(Jellyfin, Plex): "SYNC_FROM_JELLYFIN_TO_PLEX",
(Jellyfin, Jellyfin): "SYNC_FROM_JELLYFIN_TO_JELLYFIN",
(Jellyfin, Emby): "SYNC_FROM_JELLYFIN_TO_EMBY",
(Emby, Plex): "SYNC_FROM_EMBY_TO_PLEX",
(Emby, Jellyfin): "SYNC_FROM_EMBY_TO_JELLYFIN",
(Emby, Emby): "SYNC_FROM_EMBY_TO_EMBY",
}
key = (type(server_1), type(server_2))
env_var = sync_map.get(key)
if env_var and not str_to_bool(get_env_value(env, env_var, "True")):
logger.info(f"Sync from {server_1.server_type} -> {server_2.server_type} is disabled")
return False
return True
def main_loop(env) -> None:
dryrun = str_to_bool(get_env_value(env, "DRYRUN", "False"))
logger.info(f"Dryrun: {dryrun}")
user_mapping = json.loads(get_env_value(env, "USER_MAPPING", "{}").lower())
logger.info(f"User Mapping: {user_mapping}")
library_mapping = json.loads(get_env_value(env, "LIBRARY_MAPPING", "{}"))
logger.info(f"Library Mapping: {library_mapping}")
(
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
blacklist_users,
whitelist_users,
) = setup_black_white_lists(
parse_string_to_list(get_env_value(env, "BLACKLIST_LIBRARY", None)),
parse_string_to_list(get_env_value(env, "WHITELIST_LIBRARY", None)),
parse_string_to_list(get_env_value(env, "BLACKLIST_LIBRARY_TYPE", None)),
parse_string_to_list(get_env_value(env, "WHITELIST_LIBRARY_TYPE", None)),
parse_string_to_list(get_env_value(env, "BLACKLIST_USERS", None)),
parse_string_to_list(get_env_value(env, "WHITELIST_USERS", None)),
library_mapping,
user_mapping,
)
servers = generate_server_connections(env)
for i, server_1 in enumerate(servers):
for j in range(i + 1, len(servers)):
server_2 = servers[j]
if not should_sync_server(env, server_1, server_2) and not should_sync_server(env, server_2, server_1):
continue
logger.info(f"Comparing Server 1: {server_1.info()} with Server 2: {server_2.info()}")
server_1_users, server_2_users = setup_users(server_1, server_2, blacklist_users, whitelist_users, user_mapping)
server_1_libraries, server_2_libraries = setup_libraries(server_1, server_2, blacklist_library, blacklist_library_type, whitelist_library, whitelist_library_type, library_mapping)
logger.info("Gathering watched content from servers...")
server_1_watched = server_1.get_watched(server_1_users, server_1_libraries)
server_2_watched = server_2.get_watched(server_2_users, server_2_libraries)
logger.info("Comparing watched content and generating sync actions...")
actions = sync_watched_lists(server_1_watched, server_2_watched, user_mapping, library_mapping)
if not actions:
logger.info("No sync actions needed.")
continue
logger.info(f"Found {len(actions)} actions to perform.")
for action_type, server, user_id, item_id, viewed_date in actions:
if dryrun:
logger.info(f"[DRYRUN] Would perform {action_type} for item {item_id} for user {user_id} on {server.server_type}")
continue
try:
if action_type == "mark_watched":
server.mark_watched(user_id, item_id, viewed_date)
logger.success(f"Marked item {item_id} as watched for user {user_id} on {server.server_type}")
elif action_type == "mark_unwatched":
server.mark_unwatched(user_id, item_id)
logger.success(f"Marked item {item_id} as unwatched for user {user_id} on {server.server_type}")
except Exception as e:
logger.error(f"Failed to perform action {action_type} for item {item_id} on {server.server_type}: {e}")
@logger.catch
def main() -> None:
env_file = get_env_value(None, "ENV_FILE", ".env")
env = dotenv_values(env_file)
run_only_once = str_to_bool(get_env_value(env, "RUN_ONLY_ONCE", "False"))
sleep_duration = float(get_env_value(env, "SLEEP_DURATION", "3600"))
log_file = get_env_value(env, "LOG_FILE", "log.log")
debug_level = get_env_value(env, "DEBUG_LEVEL", "INFO",).upper()
times = []
while True:
try:
start = perf_counter()
configure_logger(log_file, debug_level)
main_loop(env)
end = perf_counter()
times.append(end - start)
if times:
logger.info(f"Average execution time: {sum(times) / len(times):.2f}s")
if run_only_once:
break
logger.info(f"Sleeping for {sleep_duration} seconds.")
sleep(sleep_duration)
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
logger.error(traceback.format_exc())
if run_only_once:
break
logger.info(f"Retrying in {sleep_duration} seconds.")
sleep(sleep_duration)
except KeyboardInterrupt:
if times:
logger.info(f"Average execution time: {sum(times) / len(times):.2f}s")
logger.info("Exiting.")
os._exit(0)

View File

@ -1,622 +1,347 @@
from datetime import datetime, timezone
import requests
from loguru import logger
from urllib3.poolmanager import PoolManager
from math import floor
from requests.adapters import HTTPAdapter as RequestsHTTPAdapter
from plexapi.video import Show, Episode, Movie
from plexapi.server import PlexServer
from plexapi.myplex import MyPlexAccount, MyPlexUser
from plexapi.library import MovieSection, ShowSection
from src.functions import (
filename_from_any_path,
search_mapping,
log_marked,
str_to_bool,
get_env_value,
)
from src.watched import (
LibraryData,
MediaIdentifiers,
MediaItem,
WatchedStatus,
Series,
UserData,
check_same_identifiers,
)
# Bypass hostname validation for ssl. Taken from https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186
class HostNameIgnoringAdapter(RequestsHTTPAdapter):
def init_poolmanager(
self, connections: int, maxsize: int | None, block=..., **pool_kwargs
) -> None:
self.poolmanager = PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
assert_hostname=False,
**pool_kwargs,
)
def extract_guids_from_item(
item: Movie | Show | Episode, generate_guids: bool
) -> dict[str, str]:
# If GENERATE_GUIDS is set to False, then return an empty dict
if not generate_guids:
return {}
guids: dict[str, str] = dict(
guid.id.split("://")
for guid in item.guids
if guid.id and len(guid.id.strip()) > 0
)
return guids
def extract_identifiers_from_item(
item: Movie | Show | Episode,
generate_guids: bool,
generate_locations: bool,
) -> MediaIdentifiers:
guids = extract_guids_from_item(item, generate_guids)
locations = (
tuple([filename_from_any_path(loc) for loc in item.locations])
if generate_locations
else tuple()
)
if generate_guids:
if not guids:
logger.debug(
f"Plex: {item.title} has no guids{f', locations: {" ".join(item.locations)}' if generate_locations else ''}",
)
if generate_locations:
if not locations:
logger.debug(
f"Plex: {item.title} has no locations{f', guids: {guids}' if generate_guids else ''}",
)
return MediaIdentifiers(
title=item.title,
locations=locations,
imdb_id=guids.get("imdb"),
tvdb_id=guids.get("tvdb"),
tmdb_id=guids.get("tmdb"),
)
def get_mediaitem(
item: Movie | Episode,
completed: bool,
generate_guids: bool = True,
generate_locations: bool = True,
) -> MediaItem:
last_viewed_at = item.lastViewedAt
viewed_date = datetime.today()
if last_viewed_at:
viewed_date = last_viewed_at.replace(tzinfo=timezone.utc)
return MediaItem(
identifiers=extract_identifiers_from_item(
item, generate_guids, generate_locations
),
status=WatchedStatus(
completed=completed, time=item.viewOffset, viewed_date=viewed_date
),
)
# class plex accept base url and token and username and password but default with none
class Plex:
def __init__(
self,
env,
base_url: str | None = None,
token: str | None = None,
user_name: str | None = None,
password: str | None = None,
server_name: str | None = None,
ssl_bypass: bool = False,
session: requests.Session | None = None,
) -> None:
self.env = env
self.server_type: str = "Plex"
self.ssl_bypass: bool = ssl_bypass
if ssl_bypass:
# Session for ssl bypass
session = requests.Session()
# By pass ssl hostname check https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186
session.mount("https://", HostNameIgnoringAdapter())
self.session = session
self.plex: PlexServer = self.login(
base_url, token, user_name, password, server_name
)
self.base_url: str = self.plex._baseurl
self.admin_user: MyPlexAccount = self.plex.myPlexAccount()
self.users: list[MyPlexUser | MyPlexAccount] = self.get_users()
self.generate_guids: bool = str_to_bool(
get_env_value(self.env, "GENERATE_GUIDS", "True")
)
self.generate_locations: bool = str_to_bool(
get_env_value(self.env, "GENERATE_LOCATIONS", "True")
)
def login(
self,
base_url: str | None,
token: str | None,
user_name: str | None,
password: str | None,
server_name: str | None,
) -> PlexServer:
try:
if base_url and token:
plex: PlexServer = PlexServer(base_url, token, session=self.session)
elif user_name and password and server_name:
# Login via plex account
account = MyPlexAccount(user_name, password)
plex = account.resource(server_name).connect()
else:
raise Exception("No complete plex credentials provided")
return plex
except Exception as e:
if user_name:
msg = f"Failed to login via plex account {user_name}"
logger.error(f"Plex: Failed to login, {msg}, Error: {e}")
else:
logger.error(f"Plex: Failed to login, Error: {e}")
raise Exception(e)
def info(self) -> str:
return f"Plex {self.plex.friendlyName}: {self.plex.version}"
def get_users(self) -> list[MyPlexUser | MyPlexAccount]:
try:
users: list[MyPlexUser | MyPlexAccount] = self.plex.myPlexAccount().users()
# append self to users
users.append(self.plex.myPlexAccount())
return users
except Exception as e:
logger.error(f"Plex: Failed to get users, Error: {e}")
raise Exception(e)
def get_libraries(self) -> dict[str, str]:
try:
output = {}
libraries = self.plex.library.sections()
logger.debug(
f"Plex: All Libraries {[library.title for library in libraries]}"
)
for library in libraries:
library_title = library.title
library_type = library.type
if library_type not in ["movie", "show"]:
logger.debug(
f"Plex: Skipping Library {library_title} found type {library_type}",
)
continue
output[library_title] = library_type
return output
except Exception as e:
logger.error(f"Plex: Failed to get libraries, Error: {e}")
raise Exception(e)
def get_user_library_watched(
self, user_name: str, user_plex: PlexServer, library: MovieSection | ShowSection
) -> LibraryData:
try:
logger.info(
f"Plex: Generating watched for {user_name} in library {library.title}",
)
watched = LibraryData(title=library.title)
library_videos = user_plex.library.section(library.title)
if library.type == "movie":
for video in library_videos.search(
unwatched=False
) + library_videos.search(inProgress=True):
if video.isWatched or video.viewOffset >= 60000:
watched.movies.append(
get_mediaitem(
video,
video.isWatched,
self.generate_guids,
self.generate_locations,
)
)
elif library.type == "show":
# Keep track of processed shows to reduce duplicate shows
processed_shows = []
for show in library_videos.search(
unwatched=False
) + library_videos.search(inProgress=True):
if show.key in processed_shows:
continue
processed_shows.append(show.key)
show_guids = extract_guids_from_item(show, self.generate_guids)
episode_mediaitem = []
# Fetch watched or partially watched episodes
for episode in show.watched() + show.episodes(
viewOffset__gte=60_000
):
episode_mediaitem.append(
get_mediaitem(
episode,
episode.isWatched,
self.generate_guids,
self.generate_locations,
)
)
if episode_mediaitem:
watched.series.append(
Series(
identifiers=MediaIdentifiers(
title=show.title,
locations=(
tuple(
[
filename_from_any_path(location)
for location in show.locations
]
)
if self.generate_locations
else tuple()
),
imdb_id=show_guids.get("imdb"),
tvdb_id=show_guids.get("tvdb"),
tmdb_id=show_guids.get("tmdb"),
),
episodes=episode_mediaitem,
)
)
return watched
except Exception as e:
logger.error(
f"Plex: Failed to get watched for {user_name} in library {library.title}, Error: {e}",
)
return LibraryData(title=library.title)
def get_watched(
self,
users: list[MyPlexUser | MyPlexAccount],
sync_libraries: list[str],
users_watched: dict[str, UserData] = None,
) -> dict[str, UserData]:
try:
if not users_watched:
users_watched: dict[str, UserData] = {}
for user in users:
if self.admin_user == user:
user_plex = self.plex
else:
token = user.get_token(self.plex.machineIdentifier)
if token:
user_plex = self.login(self.base_url, token, None, None, None)
else:
logger.error(
f"Plex: Failed to get token for {user.title}, skipping",
)
continue
user_name: str = (
user.username.lower() if user.username else user.title.lower()
)
libraries = user_plex.library.sections()
for library in libraries:
if library.title not in sync_libraries:
continue
if user_name not in users_watched:
users_watched[user_name] = UserData()
if library.title in users_watched[user_name].libraries:
logger.info(
f"Plex: {user_name} {library.title} watched history has already been gathered, skipping"
)
continue
library_data = self.get_user_library_watched(
user_name, user_plex, library
)
users_watched[user_name].libraries[library.title] = library_data
return users_watched
except Exception as e:
logger.error(f"Plex: Failed to get users watched, Error: {e}")
return {}
def update_user_watched(
self,
user: MyPlexAccount,
user_plex: PlexServer,
library_data: LibraryData,
library_name: str,
dryrun: bool,
) -> None:
# If there are no movies or shows to update, exit early.
if not library_data.series and not library_data.movies:
return
logger.info(
f"Plex: Updating watched for {user.title} in library {library_name}"
)
library_section = user_plex.library.section(library_name)
if not library_section:
logger.error(
f"Plex: Library {library_name} not found for {user.title}, skipping",
)
return
# Update movies.
if library_data.movies:
# Search for Plex movies that are currently marked as unwatched.
for plex_movie in library_section.search(unwatched=True):
plex_identifiers = extract_identifiers_from_item(
plex_movie, self.generate_guids, self.generate_locations
)
# Check each stored movie for a match.
for stored_movie in library_data.movies:
if check_same_identifiers(
plex_identifiers, stored_movie.identifiers
):
# If the stored movie is marked as watched (or has enough progress),
# update the Plex movie accordingly.
if stored_movie.status.completed:
msg = f"Plex: {plex_movie.title} as watched for {user.title} in {library_name}"
if not dryrun:
try:
plex_movie.markWatched()
except Exception as e:
logger.error(
f"Plex: Failed to mark {plex_movie.title} as watched, Error: {e}"
)
continue
logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}")
log_marked(
"Plex",
user_plex.friendlyName,
user.title,
library_name,
plex_movie.title,
None,
None,
mark_file=get_env_value(
self.env, "MARK_FILE", "mark.log"
),
)
else:
msg = f"Plex: {plex_movie.title} as partially watched for {floor(stored_movie.status.time / 60_000)} minutes for {user.title} in {library_name}"
if not dryrun:
try:
plex_movie.updateTimeline(stored_movie.status.time)
except Exception as e:
logger.error(
f"Plex: Failed to update {plex_movie.title} timeline, Error: {e}"
)
continue
logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}")
log_marked(
"Plex",
user_plex.friendlyName,
user.title,
library_name,
plex_movie.title,
duration=stored_movie.status.time,
mark_file=get_env_value(
self.env, "MARK_FILE", "mark.log"
),
)
# Once matched, no need to check further.
break
# Update TV Shows (series/episodes).
if library_data.series:
# For each Plex show in the library section:
plex_shows = library_section.search(unwatched=True)
for plex_show in plex_shows:
# Extract identifiers from the Plex show.
plex_show_identifiers = extract_identifiers_from_item(
plex_show, self.generate_guids, self.generate_locations
)
# Try to find a matching series in your stored library.
for stored_series in library_data.series:
if check_same_identifiers(
plex_show_identifiers, stored_series.identifiers
):
logger.trace(f"Found matching show for '{plex_show.title}'")
# Now update episodes.
# Get the list of Plex episodes for this show.
plex_episodes = plex_show.episodes()
for plex_episode in plex_episodes:
plex_episode_identifiers = extract_identifiers_from_item(
plex_episode,
self.generate_guids,
self.generate_locations,
)
for stored_ep in stored_series.episodes:
if check_same_identifiers(
plex_episode_identifiers, stored_ep.identifiers
):
if stored_ep.status.completed:
msg = f"Plex: {plex_show.title} {plex_episode.title} as watched for {user.title} in {library_name}"
if not dryrun:
try:
plex_episode.markWatched()
except Exception as e:
logger.error(
f"Plex: Failed to mark {plex_show.title} {plex_episode.title} as watched, Error: {e}"
)
continue
logger.success(
f"{'[DRYRUN] ' if dryrun else ''}{msg}"
)
log_marked(
"Plex",
user_plex.friendlyName,
user.title,
library_name,
plex_show.title,
plex_episode.title,
mark_file=get_env_value(
self.env, "MARK_FILE", "mark.log"
),
)
else:
msg = f"Plex: {plex_show.title} {plex_episode.title} as partially watched for {floor(stored_ep.status.time / 60_000)} minutes for {user.title} in {library_name}"
if not dryrun:
try:
plex_episode.updateTimeline(
stored_ep.status.time
)
except Exception as e:
logger.error(
f"Plex: Failed to update {plex_show.title} {plex_episode.title} timeline, Error: {e}"
)
continue
logger.success(
f"{'[DRYRUN] ' if dryrun else ''}{msg}"
)
log_marked(
"Plex",
user_plex.friendlyName,
user.title,
library_name,
plex_show.title,
plex_episode.title,
stored_ep.status.time,
mark_file=get_env_value(
self.env, "MARK_FILE", "mark.log"
),
)
break # Found a matching episode.
break # Found a matching show.
def update_watched(
self,
watched_list: dict[str, UserData],
user_mapping: dict[str, str] | None = None,
library_mapping: dict[str, str] | None = None,
dryrun: bool = False,
) -> None:
for user, user_data in watched_list.items():
user_other = None
# If type of user is dict
if user_mapping:
user_other = search_mapping(user_mapping, user)
for index, value in enumerate(self.users):
username_title = (
value.username.lower() if value.username else value.title.lower()
)
if user.lower() == username_title:
user = self.users[index]
break
elif user_other and user_other.lower() == username_title:
user = self.users[index]
break
if self.admin_user == user:
user_plex = self.plex
else:
if isinstance(user, str):
logger.debug(
f"Plex: {user} is not a plex object, attempting to get object for user",
)
user = self.plex.myPlexAccount().user(user)
if not isinstance(user, MyPlexUser):
logger.error(f"Plex: {user} failed to get PlexUser")
continue
token = user.get_token(self.plex.machineIdentifier)
if token:
user_plex = PlexServer(
self.base_url,
token,
session=self.session,
)
else:
logger.error(
f"Plex: Failed to get token for {user.title}, skipping",
)
continue
if not user_plex:
logger.error(f"Plex: {user} Failed to get PlexServer")
continue
for library_name in user_data.libraries:
library_data = user_data.libraries[library_name]
library_other = None
if library_mapping:
library_other = search_mapping(library_mapping, library_name)
# if library in plex library list
library_list = user_plex.library.sections()
if library_name.lower() not in [x.title.lower() for x in library_list]:
if library_other:
if library_other.lower() in [
x.title.lower() for x in library_list
]:
logger.info(
f"Plex: Library {library_name} not found, but {library_other} found, using {library_other}",
)
library_name = library_other
else:
logger.info(
f"Plex: Library {library_name} or {library_other} not found in library list",
)
continue
else:
logger.info(
f"Plex: Library {library_name} not found in library list",
)
continue
try:
self.update_user_watched(
user,
user_plex,
library_data,
library_name,
dryrun,
)
except Exception as e:
logger.error(
f"Plex: Failed to update watched for {user.title} in {library_name}, Error: {e}",
)
continue
from datetime import datetime, timezone
import requests
from loguru import logger
from typing import Any
from urllib3.poolmanager import PoolManager
from math import floor
from requests.adapters import HTTPAdapter as RequestsHTTPAdapter
from plexapi.video import Show, Episode, Movie
from plexapi.server import PlexServer
from plexapi.myplex import MyPlexAccount, MyPlexUser
from plexapi.library import MovieSection, ShowSection
from src.functions import (
filename_from_any_path,
search_mapping,
log_marked,
str_to_bool,
get_env_value,
)
from src.watched import (
LibraryData,
MediaIdentifiers,
MediaItem,
WatchedStatus,
Series,
UserData,
check_same_identifiers,
)
# Bypass hostname validation for ssl. Taken from https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186
class HostNameIgnoringAdapter(RequestsHTTPAdapter):
def init_poolmanager(
self, connections: int, maxsize: int | None, block=..., **pool_kwargs
) -> None:
self.poolmanager = PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
assert_hostname=False,
**pool_kwargs,
)
def extract_guids_from_item(
item: Movie | Show | Episode, generate_guids: bool
) -> dict[str, str]:
# If GENERATE_GUIDS is set to False, then return an empty dict
if not generate_guids:
return {}
guids: dict[str, str] = dict(
guid.id.split("://")
for guid in item.guids
if guid.id and len(guid.id.strip()) > 0
)
return guids
def extract_identifiers_from_item(
server: Any,
user_id: str,
item: Movie | Show | Episode,
generate_guids: bool,
generate_locations: bool,
) -> MediaIdentifiers:
guids = extract_guids_from_item(item, generate_guids)
locations = (
tuple([filename_from_any_path(loc) for loc in item.locations])
if generate_locations
else tuple()
)
if generate_guids:
if not guids:
logger.debug(
f"Plex: {item.title} has no guids{f', locations: {" ".join(item.locations)}' if generate_locations else ''}",
)
if generate_locations:
if not locations:
logger.debug(
f"Plex: {item.title} has no locations{f', guids: {guids}' if generate_guids else ''}",
)
return MediaIdentifiers(
title=item.title,
locations=locations,
imdb_id=guids.get("imdb"),
tvdb_id=guids.get("tvdb"),
tmdb_id=guids.get("tmdb"),
id=item.ratingKey,
server=server,
user_id=user_id,
)
def get_mediaitem(
server: Any,
user_id: str,
item: Movie | Episode,
completed: bool,
generate_guids: bool = True,
generate_locations: bool = True,
) -> MediaItem:
last_viewed_at = item.lastViewedAt
viewed_date = datetime.today()
if last_viewed_at:
viewed_date = last_viewed_at.replace(tzinfo=timezone.utc)
# updatedAt is a datetime object
last_updated_at = item.updatedAt.replace(tzinfo=timezone.utc)
return MediaItem(
identifiers=extract_identifiers_from_item(
server, user_id, item, generate_guids, generate_locations
),
status=WatchedStatus(
completed=completed,
time=item.viewOffset,
viewed_date=viewed_date,
last_updated_at=last_updated_at,
),
)
# class plex accept base url and token and username and password but default with none
class Plex:
def __init__(
self,
env,
base_url: str | None = None,
token: str | None = None,
user_name: str | None = None,
password: str | None = None,
server_name: str | None = None,
ssl_bypass: bool = False,
session: requests.Session | None = None,
) -> None:
self.env = env
self.server_type: str = "Plex"
self.ssl_bypass: bool = ssl_bypass
if ssl_bypass:
# Session for ssl bypass
session = requests.Session()
# By pass ssl hostname check https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186
session.mount("https://", HostNameIgnoringAdapter())
self.session = session
self.plex: PlexServer = self.login(
base_url, token, user_name, password, server_name
)
self.base_url: str = self.plex._baseurl
self.admin_user: MyPlexAccount = self.plex.myPlexAccount()
self.users: list[MyPlexUser | MyPlexAccount] = self.get_users()
self.generate_guids: bool = str_to_bool(
get_env_value(self.env, "GENERATE_GUIDS", "True")
)
self.generate_locations: bool = str_to_bool(
get_env_value(self.env, "GENERATE_LOCATIONS", "True")
)
def login(
self,
base_url: str | None,
token: str | None,
user_name: str | None,
password: str | None,
server_name: str | None,
) -> PlexServer:
try:
if base_url and token:
plex: PlexServer = PlexServer(base_url, token, session=self.session)
elif user_name and password and server_name:
# Login via plex account
account = MyPlexAccount(user_name, password)
plex = account.resource(server_name).connect()
else:
raise Exception("No complete plex credentials provided")
return plex
except Exception as e:
if user_name:
msg = f"Failed to login via plex account {user_name}"
logger.error(f"Plex: Failed to login, {msg}, Error: {e}")
else:
logger.error(f"Plex: Failed to login, Error: {e}")
raise Exception(e)
def info(self) -> str:
return f"Plex {self.plex.friendlyName}: {self.plex.version}"
def get_users(self) -> list[MyPlexUser | MyPlexAccount]:
try:
users: list[MyPlexUser | MyPlexAccount] = self.plex.myPlexAccount().users()
users.append(self.plex.myPlexAccount())
return users
except Exception as e:
logger.error(f"Plex: Failed to get users, Error: {e}")
raise Exception(e)
def get_libraries(self) -> dict[str, str]:
try:
output = {}
libraries = self.plex.library.sections()
logger.debug(
f"Plex: All Libraries {[library.title for library in libraries]}"
)
for library in libraries:
if library.type in ["movie", "show"]:
output[library.title] = library.type
else:
logger.debug(
f"Plex: Skipping Library {library.title} found type {library.type}",
)
return output
except Exception as e:
logger.error(f"Plex: Failed to get libraries, Error: {e}")
raise Exception(e)
def get_user_library_watched(
self, user_id: str, user_plex: PlexServer, library: MovieSection | ShowSection
) -> LibraryData:
try:
logger.info(
f"Plex: Generating watched for {user_id} in library {library.title}",
)
watched = LibraryData(title=library.title)
library_videos = user_plex.library.section(library.title)
if library.type == "movie":
for video in library_videos.search(unwatched=False) + library_videos.search(inProgress=True):
if video.isWatched or video.viewOffset >= 60000:
watched.movies.append(
get_mediaitem(
self, user_id, video, video.isWatched, self.generate_guids, self.generate_locations
)
)
elif library.type == "show":
processed_shows = []
for show in library_videos.search(unwatched=False) + library_videos.search(inProgress=True):
if show.key in processed_shows:
continue
processed_shows.append(show.key)
show_guids = extract_guids_from_item(show, self.generate_guids)
episode_mediaitem = []
for episode in show.watched() + show.episodes(viewOffset__gte=60_000):
episode_mediaitem.append(
get_mediaitem(
self, user_id, episode, episode.isWatched, self.generate_guids, self.generate_locations
)
)
if episode_mediaitem:
watched.series.append(
Series(
identifiers=extract_identifiers_from_item(self, user_id, show, self.generate_guids, self.generate_locations),
episodes=episode_mediaitem,
)
)
return watched
except Exception as e:
logger.error(
f"Plex: Failed to get watched for {user_id} in library {library.title}, Error: {e}",
)
return LibraryData(title=library.title)
def get_watched(
self,
users: list[MyPlexUser | MyPlexAccount],
sync_libraries: list[str],
users_watched: dict[str, UserData] = None,
) -> dict[str, UserData]:
try:
if not users_watched:
users_watched = {}
for user in users:
user_plex = self.plex if self.admin_user == user else self.login(self.base_url, user.get_token(self.plex.machineIdentifier), None, None, None)
if not user_plex:
logger.error(f"Plex: Failed to get token for {user.title}, skipping")
continue
user_name = user.username.lower() if user.username else user.title.lower()
if user_name not in users_watched:
users_watched[user_name] = UserData()
for library in user_plex.library.sections():
if library.title not in sync_libraries:
continue
if library.title in users_watched[user_name].libraries:
logger.info(f"Plex: {user_name} {library.title} watched history has already been gathered, skipping")
continue
library_data = self.get_user_library_watched(user_name, user_plex, library)
users_watched[user_name].libraries[library.title] = library_data
return users_watched
except Exception as e:
logger.error(f"Plex: Failed to get users watched, Error: {e}")
return {}
def get_plex_user_from_id(self, user_id: str) -> MyPlexUser | MyPlexAccount | None:
for u in self.users:
username = u.username.lower() if u.username else u.title.lower()
if username == user_id.lower():
return u
return None
def mark_watched(self, user_id: str, item_id: str):
user = self.get_plex_user_from_id(user_id)
if not user:
logger.error(f"Plex: User {user_id} not found.")
return
user_plex = self.plex if self.admin_user == user else self.login(self.base_url, user.get_token(self.plex.machineIdentifier), None, None, None)
item = user_plex.fetchItem(int(item_id))
if item:
item.markWatched()
def mark_unwatched(self, user_id: str, item_id: str):
user = self.get_plex_user_from_id(user_id)
if not user:
logger.error(f"Plex: User {user_id} not found.")
return
user_plex = self.plex if self.admin_user == user else self.login(self.base_url, user.get_token(self.plex.machineIdentifier), None, None, None)
item = user_plex.fetchItem(int(item_id))
if item:
item.markUnwatched()
def update_watched(
self,
watched_list: dict[str, UserData],
user_mapping: dict[str, str] | None = None,
library_mapping: dict[str, str] | None = None,
dryrun: bool = False,
) -> None:
# This function is now deprecated and will be removed.
# The new sync logic in watched.py will be used instead.
pass

View File

@ -1,28 +1,27 @@
import copy
from datetime import datetime
from pydantic import BaseModel, Field
from loguru import logger
from typing import Any
from typing import Any, Literal
from src.functions import search_mapping
class MediaIdentifiers(BaseModel):
title: str | None = None
# File information, will be folder for series and media file for episode/movie
locations: tuple[str, ...] = tuple()
# Guids
imdb_id: str | None = None
tvdb_id: str | None = None
tmdb_id: str | None = None
id: str | None = None
server: Any | None = None
user_id: str | None = None
class WatchedStatus(BaseModel):
completed: bool
time: int
viewed_date: datetime
last_updated_at: datetime
class MediaItem(BaseModel):
@ -45,279 +44,83 @@ class UserData(BaseModel):
libraries: dict[str, LibraryData] = Field(default_factory=dict)
def merge_mediaitem_data(ep1: MediaItem, ep2: MediaItem) -> MediaItem:
"""
Merge two MediaItem episodes by comparing their watched status.
If one is completed while the other isn't, choose the completed one.
If both are completed or both are not, choose the one with the higher time.
"""
if ep1.status.completed != ep2.status.completed:
return ep1 if ep1.status.completed else ep2
return ep1 if ep1.status.time >= ep2.status.time else ep2
def merge_series_data(series1: Series, series2: Series) -> Series:
"""
Merge two Series objects by combining their episodes.
For duplicate episodes (determined by check_same_identifiers), merge their watched status.
"""
merged_series = copy.deepcopy(series1)
for ep in series2.episodes:
for idx, merged_ep in enumerate(merged_series.episodes):
if check_same_identifiers(ep.identifiers, merged_ep.identifiers):
merged_series.episodes[idx] = merge_mediaitem_data(merged_ep, ep)
break
else:
merged_series.episodes.append(copy.deepcopy(ep))
return merged_series
def merge_library_data(lib1: LibraryData, lib2: LibraryData) -> LibraryData:
"""
Merge two LibraryData objects by extending movies and merging series.
For series, duplicates are determined using check_same_identifiers.
"""
merged = copy.deepcopy(lib1)
# Merge movies.
for movie in lib2.movies:
for idx, merged_movie in enumerate(merged.movies):
if check_same_identifiers(movie.identifiers, merged_movie.identifiers):
merged.movies[idx] = merge_mediaitem_data(merged_movie, movie)
break
else:
merged.movies.append(copy.deepcopy(movie))
# Merge series.
for series2 in lib2.series:
for idx, series1 in enumerate(merged.series):
if check_same_identifiers(series1.identifiers, series2.identifiers):
merged.series[idx] = merge_series_data(series1, series2)
break
else:
merged.series.append(copy.deepcopy(series2))
return merged
def merge_user_data(user1: UserData, user2: UserData) -> UserData:
"""
Merge two UserData objects by merging their libraries.
If a library exists in both, merge its content; otherwise, add the new library.
"""
merged_libraries = copy.deepcopy(user1.libraries)
for lib_key, lib_data in user2.libraries.items():
if lib_key in merged_libraries:
merged_libraries[lib_key] = merge_library_data(
merged_libraries[lib_key], lib_data
)
else:
merged_libraries[lib_key] = copy.deepcopy(lib_data)
return UserData(libraries=merged_libraries)
def merge_server_watched(
watched_list_1: dict[str, UserData],
watched_list_2: dict[str, UserData],
user_mapping: dict[str, str] | None = None,
library_mapping: dict[str, str] | None = None,
) -> dict[str, UserData]:
"""
Merge two dictionaries of UserData while taking into account possible
differences in user and library keys via the provided mappings.
"""
merged_watched = copy.deepcopy(watched_list_1)
for user_2, user_data in watched_list_2.items():
# Determine matching user key.
user_key = user_mapping.get(user_2, user_2) if user_mapping else user_2
if user_key not in merged_watched:
merged_watched[user_2] = copy.deepcopy(user_data)
continue
for lib_key, lib_data in user_data.libraries.items():
mapped_lib_key = (
library_mapping.get(lib_key, lib_key) if library_mapping else lib_key
)
if mapped_lib_key not in merged_watched[user_key].libraries:
merged_watched[user_key].libraries[lib_key] = copy.deepcopy(lib_data)
else:
merged_watched[user_key].libraries[mapped_lib_key] = merge_library_data(
merged_watched[user_key].libraries[mapped_lib_key],
lib_data,
)
return merged_watched
def check_same_identifiers(item1: MediaIdentifiers, item2: MediaIdentifiers) -> bool:
# Check for duplicate based on file locations:
if item1.locations and item2.locations:
if set(item1.locations) & set(item2.locations):
return True
# Check for duplicate based on GUIDs:
if (
(item1.imdb_id and item2.imdb_id and item1.imdb_id == item2.imdb_id)
or (item1.tvdb_id and item2.tvdb_id and item1.tvdb_id == item2.tvdb_id)
or (item1.tmdb_id and item2.tmdb_id and item1.tmdb_id == item2.tmdb_id)
):
return True
return False
def check_remove_entry(item1: MediaItem, item2: MediaItem) -> bool:
"""
Returns True if item1 (from watched_list_1) should be removed
in favor of item2 (from watched_list_2), based on:
- Duplicate criteria:
* They match if any file location is shared OR
at least one of imdb_id, tvdb_id, or tmdb_id matches.
- Watched status:
* If one is complete and the other is not, remove the incomplete one.
* If both are incomplete, remove the one with lower progress (time).
* If both are complete, remove item1 as duplicate.
"""
if not check_same_identifiers(item1.identifiers, item2.identifiers):
return False
# Compare watched statuses.
status1 = item1.status
status2 = item2.status
# If one is complete and the other isn't, remove the one that's not complete.
if status1.completed != status2.completed:
if not status1.completed and status2.completed:
return True # Remove item1 since it's not complete.
else:
return False # Do not remove item1; it's complete.
# Both have the same completed status.
if not status1.completed and not status2.completed:
# Both incomplete: remove the one with lower progress (time)
if status1.time < status2.time:
return True # Remove item1 because it has watched less.
elif status1.time > status2.time:
return False # Keep item1 because it has more progress.
else:
# Same progress; Remove duplicate
return True
# If both are complete, consider item1 the duplicate and remove it.
return True
def cleanup_watched(
watched_list_1: dict[str, UserData],
watched_list_2: dict[str, UserData],
def sync_watched_lists(
server1_data: dict[str, UserData],
server2_data: dict[str, UserData],
user_mapping: dict[str, str] | None = None,
library_mapping: dict[str, str] | None = None,
) -> dict[str, UserData]:
modified_watched_list_1 = copy.deepcopy(watched_list_1)
) -> list[tuple[Literal["mark_watched", "mark_unwatched"], Any, str, str, str]]:
actions = []
# remove entries from watched_list_1 that are in watched_list_2
for user_1 in watched_list_1:
user_other = None
if user_mapping:
user_other = search_mapping(user_mapping, user_1)
user_2 = get_other(watched_list_2, user_1, user_other)
if user_2 is None:
for user1_name, user1_data in server1_data.items():
user2_name = search_mapping(user_mapping, user1_name) if user_mapping else user1_name
if user2_name not in server2_data:
continue
for library_1_key in watched_list_1[user_1].libraries:
library_other = None
if library_mapping:
library_other = search_mapping(library_mapping, library_1_key)
library_2_key = get_other(
watched_list_2[user_2].libraries, library_1_key, library_other
)
if library_2_key is None:
user2_data = server2_data[user2_name]
for lib1_name, lib1_data in user1_data.libraries.items():
lib2_name = search_mapping(library_mapping, lib1_name) if library_mapping else lib1_name
if lib2_name not in user2_data.libraries:
continue
library_1 = watched_list_1[user_1].libraries[library_1_key]
library_2 = watched_list_2[user_2].libraries[library_2_key]
lib2_data = user2_data.libraries[lib2_name]
filtered_movies = []
for movie in library_1.movies:
remove_flag = False
for movie2 in library_2.movies:
if check_remove_entry(movie, movie2):
logger.trace(f"Removing movie: {movie.identifiers.title}")
remove_flag = True
# Sync movies
for movie1 in lib1_data.movies:
for movie2 in lib2_data.movies:
if check_same_identifiers(movie1.identifiers, movie2.identifiers):
action = compare_and_get_action(movie1, movie2)
if action:
actions.append(action)
break
if not remove_flag:
filtered_movies.append(movie)
modified_watched_list_1[user_1].libraries[
library_1_key
].movies = filtered_movies
# TV Shows
filtered_series_list = []
for series1 in library_1.series:
matching_series = None
for series2 in library_2.series:
# Sync series (episodes)
for series1 in lib1_data.series:
for series2 in lib2_data.series:
if check_same_identifiers(series1.identifiers, series2.identifiers):
matching_series = series2
for episode1 in series1.episodes:
for episode2 in series2.episodes:
if check_same_identifiers(episode1.identifiers, episode2.identifiers):
action = compare_and_get_action(episode1, episode2)
if action:
actions.append(action)
break
break
if matching_series is None:
# No matching show in watched_list_2; keep the series as is.
filtered_series_list.append(series1)
else:
# We have a matching show; now clean up the episodes.
filtered_episodes = []
for ep1 in series1.episodes:
remove_flag = False
for ep2 in matching_series.episodes:
if check_remove_entry(ep1, ep2):
logger.trace(
f"Removing episode '{ep1.identifiers.title}' from show '{series1.identifiers.title}'",
)
remove_flag = True
break
if not remove_flag:
filtered_episodes.append(ep1)
# Only keep the series if there are remaining episodes.
if filtered_episodes:
modified_series1 = copy.deepcopy(series1)
modified_series1.episodes = filtered_episodes
filtered_series_list.append(modified_series1)
else:
logger.trace(
f"Removing entire show '{series1.identifiers.title}' as no episodes remain after cleanup.",
)
modified_watched_list_1[user_1].libraries[
library_1_key
].series = filtered_series_list
# After processing, remove any library that is completely empty.
for user, user_data in modified_watched_list_1.items():
new_libraries = {}
for lib_key, library in user_data.libraries.items():
if library.movies or library.series:
new_libraries[lib_key] = library
else:
logger.trace(f"Removing empty library '{lib_key}' for user '{user}'")
user_data.libraries = new_libraries
return modified_watched_list_1
return actions
def get_other(
watched_list: dict[str, Any], object_1: str, object_2: str | None
) -> str | None:
if object_1 in watched_list:
return object_1
def compare_and_get_action(item1: MediaItem, item2: MediaItem):
if item1.status.completed == item2.status.completed:
return None
if object_2 and object_2 in watched_list:
return object_2
if item1.status.last_updated_at > item2.status.last_updated_at:
source_item, dest_item = item1, item2
elif item2.status.last_updated_at > item1.status.last_updated_at:
source_item, dest_item = item2, item1
else:
return None
logger.info(
f"{object_1}{' and ' + object_2 if object_2 else ''} not found in watched list 2"
action_type = "mark_watched" if source_item.status.completed else "mark_unwatched"
logger.info(f"Scheduling action: {action_type} for item {dest_item.identifiers.title} on server {dest_item.identifiers.server.server_type}")
return (
action_type,
dest_item.identifiers.server,
dest_item.identifiers.user_id,
dest_item.identifiers.id,
source_item.status.viewed_date.isoformat().replace("+00:00", "Z")
)
return None

133
test/test_sync.py Normal file
View File

@ -0,0 +1,133 @@
from datetime import datetime, timedelta
import sys
import os
from unittest.mock import Mock
# Add parent directory to sys.path
current = os.path.dirname(os.path.realpath(__file__))
parent = os.path.dirname(current)
sys.path.append(parent)
from src.watched import (
LibraryData,
MediaIdentifiers,
MediaItem,
Series,
UserData,
WatchedStatus,
sync_watched_lists,
)
# --- Mock Data Setup ---
now = datetime.now()
time_new = now
time_old = now - timedelta(days=1)
# Mock server objects
mock_server1 = Mock()
mock_server1.server_type = "Plex"
mock_server2 = Mock()
mock_server2.server_type = "Jellyfin"
# --- Test Case 1: Sync "watched" from Server 1 to Server 2 ---
movie_s1_watched = MediaItem(
identifiers=MediaIdentifiers(title="Movie A", id="1", server=mock_server1, user_id="user1", imdb_id="tt1"),
status=WatchedStatus(completed=True, time=0, viewed_date=time_new, last_updated_at=time_new),
)
movie_s2_unwatched = MediaItem(
identifiers=MediaIdentifiers(title="Movie A", id="a", server=mock_server2, user_id="user1", imdb_id="tt1"),
status=WatchedStatus(completed=False, time=0, viewed_date=time_old, last_updated_at=time_old),
)
# --- Test Case 2: Sync "unwatched" from Server 2 to Server 1 ---
movie_s1_unwatched_old = MediaItem(
identifiers=MediaIdentifiers(title="Movie B", id="2", server=mock_server1, user_id="user1", imdb_id="tt2"),
status=WatchedStatus(completed=True, time=0, viewed_date=time_old, last_updated_at=time_old),
)
movie_s2_unwatched_new = MediaItem(
identifiers=MediaIdentifiers(title="Movie B", id="b", server=mock_server2, user_id="user1", imdb_id="tt2"),
status=WatchedStatus(completed=False, time=0, viewed_date=time_new, last_updated_at=time_new),
)
# --- Test Case 3: No sync needed (already in sync) ---
movie_s1_synced = MediaItem(
identifiers=MediaIdentifiers(title="Movie C", id="3", server=mock_server1, user_id="user1", imdb_id="tt3"),
status=WatchedStatus(completed=True, time=0, viewed_date=time_new, last_updated_at=time_new),
)
movie_s2_synced = MediaItem(
identifiers=MediaIdentifiers(title="Movie C", id="c", server=mock_server2, user_id="user1", imdb_id="tt3"),
status=WatchedStatus(completed=True, time=0, viewed_date=time_new, last_updated_at=time_new),
)
# --- Test Case 4: No sync needed (timestamps equal) ---
movie_s1_equal_ts = MediaItem(
identifiers=MediaIdentifiers(title="Movie D", id="4", server=mock_server1, user_id="user1", imdb_id="tt4"),
status=WatchedStatus(completed=True, time=0, viewed_date=time_new, last_updated_at=time_new),
)
movie_s2_equal_ts = MediaItem(
identifiers=MediaIdentifiers(title="Movie D", id="d", server=mock_server2, user_id="user1", imdb_id="tt4"),
status=WatchedStatus(completed=False, time=0, viewed_date=time_new, last_updated_at=time_new),
)
def build_test_data(movies1, movies2):
return (
{"user1": UserData(libraries={"Movies": LibraryData(title="Movies", movies=movies1, series=[])})},
{"user1": UserData(libraries={"Movies": LibraryData(title="Movies", movies=movies2, series=[])})},
)
def test_sync_watched_from_s1_to_s2():
server1_data, server2_data = build_test_data([movie_s1_watched], [movie_s2_unwatched])
actions = sync_watched_lists(server1_data, server2_data)
assert len(actions) == 1
action = actions[0]
assert action[0] == "mark_watched"
assert action[1] == mock_server2
assert action[2] == "user1"
assert action[3] == "a"
def test_sync_unwatched_from_s2_to_s1():
server1_data, server2_data = build_test_data([movie_s1_unwatched_old], [movie_s2_unwatched_new])
actions = sync_watched_lists(server1_data, server2_data)
assert len(actions) == 1
action = actions[0]
assert action[0] == "mark_unwatched"
assert action[1] == mock_server1
assert action[2] == "user1"
assert action[3] == "2"
def test_no_sync_when_already_synced():
server1_data, server2_data = build_test_data([movie_s1_synced], [movie_s2_synced])
actions = sync_watched_lists(server1_data, server2_data)
assert len(actions) == 0
def test_no_sync_when_timestamps_equal():
server1_data, server2_data = build_test_data([movie_s1_equal_ts], [movie_s2_equal_ts])
actions = sync_watched_lists(server1_data, server2_data)
assert len(actions) == 0
def test_sync_with_user_mapping():
server1_data = {"plex_user": UserData(libraries={"Movies": LibraryData(title="Movies", movies=[movie_s1_watched], series=[])})}
server2_data = {"jellyfin_user": UserData(libraries={"Movies": LibraryData(title="Movies", movies=[movie_s2_unwatched], series=[])})}
user_mapping = {"plex_user": "jellyfin_user"}
actions = sync_watched_lists(server1_data, server2_data, user_mapping=user_mapping)
assert len(actions) == 1
action = actions[0]
assert action[0] == "mark_watched"
assert action[1] == mock_server2
def test_sync_with_library_mapping():
server1_data = {"user1": UserData(libraries={"Plex Movies": LibraryData(title="Plex Movies", movies=[movie_s1_watched], series=[])})}
server2_data = {"user1": UserData(libraries={"Jellyfin Movies": LibraryData(title="Jellyfin Movies", movies=[movie_s2_unwatched], series=[])})}
library_mapping = {"Plex Movies": "Jellyfin Movies"}
actions = sync_watched_lists(server1_data, server2_data, library_mapping=library_mapping)
assert len(actions) == 1
action = actions[0]
assert action[0] == "mark_watched"
assert action[1] == mock_server2

View File

@ -1,724 +0,0 @@
from datetime import datetime
import sys
import os
# getting the name of the directory
# where the this file is present.
current = os.path.dirname(os.path.realpath(__file__))
# Getting the parent directory name
# where the current directory is present.
parent = os.path.dirname(current)
# adding the parent directory to
# the sys.path.
sys.path.append(parent)
from src.watched import (
LibraryData,
MediaIdentifiers,
MediaItem,
Series,
UserData,
WatchedStatus,
cleanup_watched,
)
viewed_date = datetime.today()
tv_shows_watched_list_1: list[Series] = [
Series(
identifiers=MediaIdentifiers(
title="Doctor Who (2005)",
locations=("Doctor Who (2005) {tvdb-78804} {imdb-tt0436992}",),
imdb_id="tt0436992",
tmdb_id="57243",
tvdb_id="78804",
),
episodes=[
MediaItem(
identifiers=MediaIdentifiers(
title="The Unquiet Dead",
locations=("S01E03.mkv",),
imdb_id="tt0563001",
tmdb_id="968589",
tvdb_id="295296",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="Aliens of London (1)",
locations=("S01E04.mkv",),
imdb_id="tt0562985",
tmdb_id="968590",
tvdb_id="295297",
),
status=WatchedStatus(
completed=False, time=240000, viewed_date=viewed_date
),
),
MediaItem(
identifiers=MediaIdentifiers(
title="World War Three (2)",
locations=("S01E05.mkv",),
imdb_id="tt0563003",
tmdb_id="968592",
tvdb_id="295298",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
],
),
Series(
identifiers=MediaIdentifiers(
title="Monarch: Legacy of Monsters",
locations=("Monarch - Legacy of Monsters {tvdb-422598} {imdb-tt17220216}",),
imdb_id="tt17220216",
tmdb_id="202411",
tvdb_id="422598",
),
episodes=[
MediaItem(
identifiers=MediaIdentifiers(
title="Secrets and Lies",
locations=("S01E03.mkv",),
imdb_id="tt21255044",
tmdb_id="4661246",
tvdb_id="10009418",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="Parallels and Interiors",
locations=("S01E04.mkv",),
imdb_id="tt21255050",
tmdb_id="4712059",
tvdb_id="10009419",
),
status=WatchedStatus(
completed=False, time=240000, viewed_date=viewed_date
),
),
MediaItem(
identifiers=MediaIdentifiers(
title="The Way Out",
locations=("S01E05.mkv",),
imdb_id="tt23787572",
tmdb_id="4712061",
tvdb_id="10009420",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
],
),
Series(
identifiers=MediaIdentifiers(
title="My Adventures with Superman",
locations=("My Adventures with Superman {tvdb-403172} {imdb-tt14681924}",),
imdb_id="tt14681924",
tmdb_id="125928",
tvdb_id="403172",
),
episodes=[
MediaItem(
identifiers=MediaIdentifiers(
title="Adventures of a Normal Man (1)",
locations=("S01E01.mkv",),
imdb_id="tt15699926",
tmdb_id="3070048",
tvdb_id="8438181",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="Adventures of a Normal Man (2)",
locations=("S01E02.mkv",),
imdb_id="tt20413322",
tmdb_id="4568681",
tvdb_id="9829910",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="My Interview with Superman",
locations=("S01E03.mkv",),
imdb_id="tt20413328",
tmdb_id="4497012",
tvdb_id="9870382",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
],
),
]
# ─────────────────────────────────────────────────────────────
# TV Shows Watched list 2
tv_shows_watched_list_2: list[Series] = [
Series(
identifiers=MediaIdentifiers(
title="Doctor Who",
locations=("Doctor Who (2005) {tvdb-78804} {imdb-tt0436992}",),
imdb_id="tt0436992",
tmdb_id="57243",
tvdb_id="78804",
),
episodes=[
MediaItem(
identifiers=MediaIdentifiers(
title="Rose",
locations=("S01E01.mkv",),
imdb_id="tt0562992",
tvdb_id="295294",
tmdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="The End of the World",
locations=("S01E02.mkv",),
imdb_id="tt0562997",
tvdb_id="295295",
tmdb_id=None,
),
status=WatchedStatus(
completed=False, time=300670, viewed_date=viewed_date
),
),
MediaItem(
identifiers=MediaIdentifiers(
title="World War Three (2)",
locations=("S01E05.mkv",),
imdb_id="tt0563003",
tvdb_id="295298",
tmdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
],
),
Series(
identifiers=MediaIdentifiers(
title="Monarch: Legacy of Monsters",
locations=("Monarch - Legacy of Monsters {tvdb-422598} {imdb-tt17220216}",),
imdb_id="tt17220216",
tmdb_id="202411",
tvdb_id="422598",
),
episodes=[
MediaItem(
identifiers=MediaIdentifiers(
title="Aftermath",
locations=("S01E01.mkv",),
imdb_id="tt20412166",
tvdb_id="9959300",
tmdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="Departure",
locations=("S01E02.mkv",),
imdb_id="tt22866594",
tvdb_id="10009417",
tmdb_id=None,
),
status=WatchedStatus(
completed=False, time=300741, viewed_date=viewed_date
),
),
MediaItem(
identifiers=MediaIdentifiers(
title="The Way Out",
locations=("S01E05.mkv",),
imdb_id="tt23787572",
tvdb_id="10009420",
tmdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
],
),
Series(
identifiers=MediaIdentifiers(
title="My Adventures with Superman",
locations=("My Adventures with Superman {tvdb-403172} {imdb-tt14681924}",),
imdb_id="tt14681924",
tmdb_id="125928",
tvdb_id="403172",
),
episodes=[
MediaItem(
identifiers=MediaIdentifiers(
title="Adventures of a Normal Man (1)",
locations=("S01E01.mkv",),
imdb_id="tt15699926",
tvdb_id="8438181",
tmdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="Adventures of a Normal Man (2)",
locations=("S01E02.mkv",),
imdb_id="tt20413322",
tvdb_id="9829910",
tmdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="My Interview with Superman",
locations=("S01E03.mkv",),
imdb_id="tt20413328",
tvdb_id="9870382",
tmdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
],
),
]
# ─────────────────────────────────────────────────────────────
# Expected TV Shows Watched list 1 (after cleanup)
expected_tv_show_watched_list_1: list[Series] = [
Series(
identifiers=MediaIdentifiers(
title="Doctor Who (2005)",
locations=("Doctor Who (2005) {tvdb-78804} {imdb-tt0436992}",),
imdb_id="tt0436992",
tmdb_id="57243",
tvdb_id="78804",
),
episodes=[
MediaItem(
identifiers=MediaIdentifiers(
title="The Unquiet Dead",
locations=("S01E03.mkv",),
imdb_id="tt0563001",
tmdb_id="968589",
tvdb_id="295296",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="Aliens of London (1)",
locations=("S01E04.mkv",),
imdb_id="tt0562985",
tmdb_id="968590",
tvdb_id="295297",
),
status=WatchedStatus(
completed=False, time=240000, viewed_date=viewed_date
),
),
],
),
Series(
identifiers=MediaIdentifiers(
title="Monarch: Legacy of Monsters",
locations=("Monarch - Legacy of Monsters {tvdb-422598} {imdb-tt17220216}",),
imdb_id="tt17220216",
tmdb_id="202411",
tvdb_id="422598",
),
episodes=[
MediaItem(
identifiers=MediaIdentifiers(
title="Secrets and Lies",
locations=("S01E03.mkv",),
imdb_id="tt21255044",
tmdb_id="4661246",
tvdb_id="10009418",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="Parallels and Interiors",
locations=("S01E04.mkv",),
imdb_id="tt21255050",
tmdb_id="4712059",
tvdb_id="10009419",
),
status=WatchedStatus(
completed=False, time=240000, viewed_date=viewed_date
),
),
],
),
]
# ─────────────────────────────────────────────────────────────
# Expected TV Shows Watched list 2 (after cleanup)
expected_tv_show_watched_list_2: list[Series] = [
Series(
identifiers=MediaIdentifiers(
title="Doctor Who",
locations=("Doctor Who (2005) {tvdb-78804} {imdb-tt0436992}",),
imdb_id="tt0436992",
tmdb_id="57243",
tvdb_id="78804",
),
episodes=[
MediaItem(
identifiers=MediaIdentifiers(
title="Rose",
locations=("S01E01.mkv",),
imdb_id="tt0562992",
tvdb_id="295294",
tmdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="The End of the World",
locations=("S01E02.mkv",),
imdb_id="tt0562997",
tvdb_id="295295",
tmdb_id=None,
),
status=WatchedStatus(
completed=False, time=300670, viewed_date=viewed_date
),
),
],
),
Series(
identifiers=MediaIdentifiers(
title="Monarch: Legacy of Monsters",
locations=("Monarch - Legacy of Monsters {tvdb-422598} {imdb-tt17220216}",),
imdb_id="tt17220216",
tmdb_id="202411",
tvdb_id="422598",
),
episodes=[
MediaItem(
identifiers=MediaIdentifiers(
title="Aftermath",
locations=("S01E01.mkv",),
imdb_id="tt20412166",
tvdb_id="9959300",
tmdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="Departure",
locations=("S01E02.mkv",),
imdb_id="tt22866594",
tvdb_id="10009417",
tmdb_id=None,
),
status=WatchedStatus(
completed=False, time=300741, viewed_date=viewed_date
),
),
],
),
]
# ─────────────────────────────────────────────────────────────
# Movies Watched list 1
movies_watched_list_1: list[MediaItem] = [
MediaItem(
identifiers=MediaIdentifiers(
title="Big Buck Bunny",
locations=("Big Buck Bunny.mkv",),
imdb_id="tt1254207",
tmdb_id="10378",
tvdb_id="12352",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="The Family Plan",
locations=("The Family Plan (2023).mkv",),
imdb_id="tt16431870",
tmdb_id="1029575",
tvdb_id="351194",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="Killers of the Flower Moon",
locations=("Killers of the Flower Moon (2023).mkv",),
imdb_id="tt5537002",
tmdb_id="466420",
tvdb_id="135852",
),
status=WatchedStatus(completed=False, time=240000, viewed_date=viewed_date),
),
]
# ─────────────────────────────────────────────────────────────
# Movies Watched list 2
movies_watched_list_2: list[MediaItem] = [
MediaItem(
identifiers=MediaIdentifiers(
title="The Family Plan",
locations=("The Family Plan (2023).mkv",),
imdb_id="tt16431870",
tmdb_id="1029575",
tvdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="Five Nights at Freddy's",
locations=("Five Nights at Freddy's (2023).mkv",),
imdb_id="tt4589218",
tmdb_id="507089",
tvdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="The Hunger Games: The Ballad of Songbirds & Snakes",
locations=("The Hunger Games The Ballad of Songbirds & Snakes (2023).mkv",),
imdb_id="tt10545296",
tmdb_id="695721",
tvdb_id=None,
),
status=WatchedStatus(completed=False, time=301215, viewed_date=viewed_date),
),
]
# ─────────────────────────────────────────────────────────────
# Expected Movies Watched list 1
expected_movie_watched_list_1: list[MediaItem] = [
MediaItem(
identifiers=MediaIdentifiers(
title="Big Buck Bunny",
locations=("Big Buck Bunny.mkv",),
imdb_id="tt1254207",
tmdb_id="10378",
tvdb_id="12352",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="Killers of the Flower Moon",
locations=("Killers of the Flower Moon (2023).mkv",),
imdb_id="tt5537002",
tmdb_id="466420",
tvdb_id="135852",
),
status=WatchedStatus(completed=False, time=240000, viewed_date=viewed_date),
),
]
# ─────────────────────────────────────────────────────────────
# Expected Movies Watched list 2
expected_movie_watched_list_2: list[MediaItem] = [
MediaItem(
identifiers=MediaIdentifiers(
title="Five Nights at Freddy's",
locations=("Five Nights at Freddy's (2023).mkv",),
imdb_id="tt4589218",
tmdb_id="507089",
tvdb_id=None,
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
),
MediaItem(
identifiers=MediaIdentifiers(
title="The Hunger Games: The Ballad of Songbirds & Snakes",
locations=("The Hunger Games The Ballad of Songbirds & Snakes (2023).mkv",),
imdb_id="tt10545296",
tmdb_id="695721",
tvdb_id=None,
),
status=WatchedStatus(completed=False, time=301215, viewed_date=viewed_date),
),
]
# ─────────────────────────────────────────────────────────────
# TV Shows 2 Watched list 1 (for testing deletion up to the root)
# Here we use a single Series entry for "Criminal Minds"
tv_shows_2_watched_list_1: list[Series] = [
Series(
identifiers=MediaIdentifiers(
title="Criminal Minds",
locations=("Criminal Minds",),
imdb_id="tt0452046",
tmdb_id="4057",
tvdb_id="75710",
),
episodes=[
MediaItem(
identifiers=MediaIdentifiers(
title="Extreme Aggressor",
locations=(
"Criminal Minds S01E01 Extreme Aggressor WEBDL-720p.mkv",
),
imdb_id="tt0550489",
tmdb_id="282843",
tvdb_id="176357",
),
status=WatchedStatus(completed=True, time=0, viewed_date=viewed_date),
)
],
)
]
def test_simple_cleanup_watched():
user_watched_list_1: dict[str, UserData] = {
"user1": UserData(
libraries={
"TV Shows": LibraryData(
title="TV Shows",
movies=[],
series=tv_shows_watched_list_1,
),
"Movies": LibraryData(
title="Movies",
movies=movies_watched_list_1,
series=[],
),
"Other Shows": LibraryData(
title="Other Shows",
movies=[],
series=tv_shows_2_watched_list_1,
),
}
)
}
user_watched_list_2: dict[str, UserData] = {
"user1": UserData(
libraries={
"TV Shows": LibraryData(
title="TV Shows",
movies=[],
series=tv_shows_watched_list_2,
),
"Movies": LibraryData(
title="Movies",
movies=movies_watched_list_2,
series=[],
),
"Other Shows": LibraryData(
title="Other Shows",
movies=[],
series=tv_shows_2_watched_list_1,
),
}
)
}
expected_watched_list_1: dict[str, UserData] = {
"user1": UserData(
libraries={
"TV Shows": LibraryData(
title="TV Shows",
movies=[],
series=expected_tv_show_watched_list_1,
),
"Movies": LibraryData(
title="Movies",
movies=expected_movie_watched_list_1,
series=[],
),
}
)
}
expected_watched_list_2: dict[str, UserData] = {
"user1": UserData(
libraries={
"TV Shows": LibraryData(
title="TV Shows",
movies=[],
series=expected_tv_show_watched_list_2,
),
"Movies": LibraryData(
title="Movies",
movies=expected_movie_watched_list_2,
series=[],
),
}
)
}
return_watched_list_1 = cleanup_watched(user_watched_list_1, user_watched_list_2)
return_watched_list_2 = cleanup_watched(user_watched_list_2, user_watched_list_1)
assert return_watched_list_1 == expected_watched_list_1
assert return_watched_list_2 == expected_watched_list_2
# def test_mapping_cleanup_watched():
# user_watched_list_1 = {
# "user1": {
# "TV Shows": tv_shows_watched_list_1,
# "Movies": movies_watched_list_1,
# "Other Shows": tv_shows_2_watched_list_1,
# },
# }
# user_watched_list_2 = {
# "user2": {
# "Shows": tv_shows_watched_list_2,
# "Movies": movies_watched_list_2,
# "Other Shows": tv_shows_2_watched_list_1,
# }
# }
#
# expected_watched_list_1 = {
# "user1": {
# "TV Shows": expected_tv_show_watched_list_1,
# "Movies": expected_movie_watched_list_1,
# }
# }
#
# expected_watched_list_2 = {
# "user2": {
# "Shows": expected_tv_show_watched_list_2,
# "Movies": expected_movie_watched_list_2,
# }
# }
#
# user_mapping = {"user1": "user2"}
# library_mapping = {"TV Shows": "Shows"}
#
# return_watched_list_1 = cleanup_watched(
# user_watched_list_1,
# user_watched_list_2,
# user_mapping=user_mapping,
# library_mapping=library_mapping,
# )
# return_watched_list_2 = cleanup_watched(
# user_watched_list_2,
# user_watched_list_1,
# user_mapping=user_mapping,
# library_mapping=library_mapping,
# )
#
# assert return_watched_list_1 == expected_watched_list_1
# assert return_watched_list_2 == expected_watched_list_2