Switch logging to loguru

Signed-off-by: Luis Garcia <git@luigi311.com>
pull/229/head
Luis Garcia 2025-02-21 16:03:29 -07:00
parent 38e65f5a17
commit 588c23ce41
12 changed files with 182 additions and 200 deletions

View File

@ -5,6 +5,7 @@ description = "Sync watched between media servers locally"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"loguru>=0.7.3",
"packaging==24.2",
"plexapi==4.16.1",
"pydantic==2.10.6",

View File

@ -1,4 +1,6 @@
from src.functions import logger, search_mapping
from loguru import logger
from src.functions import search_mapping
def setup_black_white_lists(
@ -58,13 +60,13 @@ def setup_x_lists(
temp_library.append(library_other)
out_library = out_library + temp_library
logger(f"{xlist_type}list Library: {xlist_library}", 1)
logger.info(f"{xlist_type}list Library: {xlist_library}")
out_library_type: list[str] = []
if xlist_library_type:
out_library_type = [x.lower().strip() for x in xlist_library_type]
logger(f"{xlist_type}list Library Type: {out_library_type}", 1)
logger.info(f"{xlist_type}list Library Type: {out_library_type}")
out_users: list[str] = []
if xlist_users:
@ -78,6 +80,6 @@ def setup_x_lists(
out_users = out_users + temp_users
logger(f"{xlist_type}list Users: {out_users}", 1)
logger.info(f"{xlist_type}list Users: {out_users}")
return out_library, out_library_type, out_users

View File

@ -1,8 +1,9 @@
import os
from typing import Literal
from dotenv import load_dotenv
from loguru import logger
from src.functions import logger, str_to_bool
from src.functions import str_to_bool
from src.plex import Plex
from src.jellyfin import Jellyfin
from src.emby import Emby
@ -39,7 +40,7 @@ def jellyfin_emby_server_connection(
else:
raise Exception("Unknown server type")
logger(f"{server_type} Server {i} info: {server.info()}", 3)
logger.debug(f"{server_type} Server {i} info: {server.info()}")
return servers
@ -73,7 +74,7 @@ def generate_server_connections() -> list[Plex | Jellyfin | Emby]:
ssl_bypass=ssl_bypass,
)
logger(f"Plex Server {i} info: {server.info()}", 3)
logger.debug(f"Plex Server {i} info: {server.info()}")
servers.append(server)
@ -99,7 +100,7 @@ def generate_server_connections() -> list[Plex | Jellyfin | Emby]:
ssl_bypass=ssl_bypass,
)
logger(f"Plex Server {i} info: {server.info()}", 3)
logger.debug(f"Plex Server {i} info: {server.info()}")
servers.append(server)
jellyfin_baseurl = os.getenv("JELLYFIN_BASEURL", None)

View File

@ -9,34 +9,6 @@ log_file = os.getenv("LOG_FILE", os.getenv("LOGFILE", "log.log"))
mark_file = os.getenv("MARK_FILE", os.getenv("MARKFILE", "mark.log"))
def logger(message: str, log_type: int = 0):
debug = str_to_bool(os.getenv("DEBUG", "False"))
debug_level = os.getenv("DEBUG_LEVEL", "info").lower()
output: str | None = str(message)
if log_type == 0:
pass
elif log_type == 1 and (debug and debug_level in ("info", "debug")):
output = f"[INFO]: {output}"
elif log_type == 2:
output = f"[ERROR]: {output}"
elif log_type == 3 and (debug and debug_level == "debug"):
output = f"[DEBUG]: {output}"
elif log_type == 4:
output = f"[WARNING]: {output}"
elif log_type == 5:
output = f"[MARK]: {output}"
elif log_type == 6:
output = f"[DRYRUN]: {output}"
else:
output = None
if output is not None:
print(output)
with open(f"{log_file}", "a", encoding="utf-8") as file:
file.write(output + "\n")
def log_marked(
server_type: str,
server_name: str,

View File

@ -1,15 +1,15 @@
# Functions for Jellyfin and Emby
import requests
import traceback
import os
from math import floor
from typing import Any, Literal
from dotenv import load_dotenv
import requests
from packaging.version import parse, Version
from loguru import logger
from src.functions import (
logger,
search_mapping,
log_marked,
str_to_bool,
@ -35,15 +35,14 @@ def extract_identifiers_from_item(server_type, item: dict) -> MediaIdentifiers:
id = None
if not title:
id = item.get("Id")
logger(f"{server_type}: Name not found in {id}", 1)
logger.info(f"{server_type}: Name not found in {id}")
guids = {}
if generate_guids:
guids = {k.lower(): v for k, v in item["ProviderIds"].items()}
if not guids:
logger(
logger.info(
f"{server_type}: {title if title else id} has no guids",
1,
)
locations = tuple()
@ -56,7 +55,7 @@ def extract_identifiers_from_item(server_type, item: dict) -> MediaIdentifiers:
)
if not locations:
logger(f"{server_type}: {title if title else id} has no locations", 1)
logger.info(f"{server_type}: {title if title else id} has no locations")
return MediaIdentifiers(
title=title,
@ -155,9 +154,8 @@ class JellyfinEmby:
return results
except Exception as e:
logger(
logger.error(
f"{self.server_type}: Query {query_type} {query}\nResults {results}\n{e}",
2,
)
raise Exception(e)
@ -180,7 +178,7 @@ class JellyfinEmby:
return None
except Exception as e:
logger(f"{self.server_type}: Get server name failed {e}", 2)
logger.error(f"{self.server_type}: Get server name failed {e}")
raise Exception(e)
def get_users(self) -> dict[str, str]:
@ -198,7 +196,7 @@ class JellyfinEmby:
return users
except Exception as e:
logger(f"{self.server_type}: Get users failed {e}", 2)
logger.error(f"{self.server_type}: Get users failed {e}")
raise Exception(e)
def get_libraries(self) -> dict[str, str]:
@ -215,9 +213,8 @@ class JellyfinEmby:
library_type = library.get("CollectionType")
if library_type not in ["movies", "tvshows"]:
logger(
logger.debug(
f"{self.server_type}: Skipping Library {library_title} found type {library_type}",
1,
)
continue
@ -225,7 +222,7 @@ class JellyfinEmby:
return libraries
except Exception as e:
logger(f"{self.server_type}: Get libraries failed {e}", 2)
logger.error(f"{self.server_type}: Get libraries failed {e}")
raise Exception(e)
def get_user_library_watched(
@ -233,9 +230,8 @@ class JellyfinEmby:
) -> LibraryData:
user_name = user_name.lower()
try:
logger(
logger.info(
f"{self.server_type}: Generating watched for {user_name} in library {library_title}",
0,
)
watched = LibraryData(title=library_title)
@ -339,19 +335,17 @@ class JellyfinEmby:
)
)
logger(
logger.info(
f"{self.server_type}: Finished getting watched for {user_name} in library {library_title}",
1,
)
return watched
except Exception as e:
logger(
logger.error(
f"{self.server_type}: Failed to get watched for {user_name} in library {library_title}, Error: {e}",
2,
)
logger(traceback.format_exc(), 2)
logger.error(traceback.format_exc())
return {}
def get_watched(
@ -419,7 +413,7 @@ class JellyfinEmby:
return users_watched
except Exception as e:
logger(f"{self.server_type}: Failed to get watched, Error: {e}", 2)
logger.error(f"{self.server_type}: Failed to get watched, Error: {e}")
raise Exception(e)
def update_user_watched(
@ -437,9 +431,8 @@ class JellyfinEmby:
if not library_data.series and not library_data.movies:
return
logger(
logger.info(
f"{self.server_type}: Updating watched for {user_name} in library {library_name}",
1,
)
# Update movies.
@ -463,14 +456,12 @@ class JellyfinEmby:
if stored_movie.status.completed:
msg = f"{self.server_type}: {jellyfin_video.get('Name')} as watched for {user_name} in {library_name}"
if not dryrun:
logger(msg, 5)
self.query(
f"/Users/{user_id}/PlayedItems/{jellyfin_video_id}",
"post",
)
else:
logger(msg, 6)
logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}")
log_marked(
self.server_type,
self.server_name,
@ -482,7 +473,6 @@ class JellyfinEmby:
msg = f"{self.server_type}: {jellyfin_video.get('Name')} as partially watched for {floor(stored_movie.status.time / 60_000)} minutes for {user_name} in {library_name}"
if not dryrun:
logger(msg, 5)
playback_position_payload = {
"PlaybackPositionTicks": stored_movie.status.time
* 10_000,
@ -492,9 +482,8 @@ class JellyfinEmby:
"post",
json=playback_position_payload,
)
else:
logger(msg, 6)
logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}")
log_marked(
self.server_type,
self.server_name,
@ -504,9 +493,8 @@ class JellyfinEmby:
duration=floor(stored_movie.status.time / 60_000),
)
else:
logger(
logger.trace(
f"{self.server_type}: Skipping movie {jellyfin_video.get('Name')} as it is not in mark list for {user_name}",
3,
)
# Update TV Shows (series/episodes).
@ -528,9 +516,8 @@ class JellyfinEmby:
if check_same_identifiers(
jellyfin_show_identifiers, stored_series.identifiers
):
logger(
logger.info(
f"Found matching show for '{jellyfin_show.get('Name')}'",
1,
)
# Now update episodes.
# Get the list of Plex episodes for this show.
@ -559,14 +546,14 @@ class JellyfinEmby:
+ f" as watched for {user_name} in {library_name}"
)
if not dryrun:
logger(msg, 5)
self.query(
f"/Users/{user_id}/PlayedItems/{jellyfin_episode_id}",
"post",
)
else:
logger(msg, 6)
logger.success(
f"{'[DRYRUN] ' if dryrun else ''}{msg}"
)
log_marked(
self.server_type,
self.server_name,
@ -582,7 +569,6 @@ class JellyfinEmby:
)
if not dryrun:
logger(msg, 5)
playback_position_payload = {
"PlaybackPositionTicks": stored_ep.status.time
* 10_000,
@ -592,9 +578,10 @@ class JellyfinEmby:
"post",
json=playback_position_payload,
)
else:
logger(msg, 6)
logger.success(
f"{'[DRYRUN] ' if dryrun else ''}{msg}"
)
log_marked(
self.server_type,
self.server_name,
@ -607,22 +594,19 @@ class JellyfinEmby:
),
)
else:
logger(
logger.trace(
f"{self.server_type}: Skipping episode {jellyfin_episode.get('Name')} as it is not in mark list for {user_name}",
3,
)
else:
logger(
logger.trace(
f"{self.server_type}: Skipping show {jellyfin_show.get('Name')} as it is not in mark list for {user_name}",
3,
)
except Exception as e:
logger(
logger.error(
f"{self.server_type}: Error updating watched for {user_name} in library {library_name}, {e}",
2,
)
logger(traceback.format_exc(), 2)
logger.error(traceback.format_exc())
raise Exception(e)
def update_watched(
@ -637,9 +621,8 @@ class JellyfinEmby:
update_partial = self.is_partial_update_supported(server_version)
if not update_partial:
logger(
logger.info(
f"{self.server_type}: Server version {server_version} does not support updating playback position.",
2,
)
for user, user_data in watched_list.items():
@ -663,7 +646,7 @@ class JellyfinEmby:
break
if not user_id:
logger(f"{user} {user_other} not found in Jellyfin", 2)
logger.info(f"{user} {user_other} not found in Jellyfin")
continue
jellyfin_libraries = self.query(
@ -692,21 +675,18 @@ class JellyfinEmby:
if library_other.lower() in [
x["Name"].lower() for x in jellyfin_libraries
]:
logger(
logger.info(
f"{self.server_type}: Library {library_name} not found, but {library_other} found, using {library_other}",
1,
)
library_name = library_other
else:
logger(
logger.info(
f"{self.server_type}: Library {library_name} or {library_other} not found in library list",
1,
)
continue
else:
logger(
logger.info(
f"{self.server_type}: Library {library_name} not found in library list",
1,
)
continue
@ -728,5 +708,5 @@ class JellyfinEmby:
)
except Exception as e:
logger(f"{self.server_type}: Error updating watched, {e}", 2)
logger.error(f"{self.server_type}: Error updating watched, {e}")
raise Exception(e)

View File

@ -1,5 +1,6 @@
from loguru import logger
from src.functions import (
logger,
match_list,
search_mapping,
)
@ -151,7 +152,7 @@ def filter_libaries(
)
if skip_reason:
logger(f"Skipping library {library}: {skip_reason}", 1)
logger.info(f"Skipping library {library}: {skip_reason}")
continue
filtered_libaries.append(library)
@ -170,8 +171,6 @@ def setup_libraries(
) -> tuple[list[str], list[str]]:
server_1_libraries = server_1.get_libraries()
server_2_libraries = server_2.get_libraries()
logger(f"Server 1 libraries: {server_1_libraries}", 1)
logger(f"Server 2 libraries: {server_2_libraries}", 1)
# Filter out all blacklist, whitelist libaries
filtered_server_1_libraries = filter_libaries(

View File

@ -1,15 +1,16 @@
import os
import traceback
import json
import sys
from dotenv import load_dotenv
from time import sleep, perf_counter
from loguru import logger
from src.emby import Emby
from src.jellyfin import Jellyfin
from src.plex import Plex
from src.library import setup_libraries
from src.functions import (
logger,
parse_string_to_list,
str_to_bool,
)
@ -51,41 +52,41 @@ def should_sync_server(
if isinstance(server_1, Plex):
if isinstance(server_2, Jellyfin) and not sync_from_plex_to_jellyfin:
logger("Sync from plex -> jellyfin is disabled", 1)
logger.info("Sync from plex -> jellyfin is disabled")
return False
if isinstance(server_2, Emby) and not sync_from_plex_to_emby:
logger("Sync from plex -> emby is disabled", 1)
logger.info("Sync from plex -> emby is disabled")
return False
if isinstance(server_2, Plex) and not sync_from_plex_to_plex:
logger("Sync from plex -> plex is disabled", 1)
logger.info("Sync from plex -> plex is disabled")
return False
if isinstance(server_1, Jellyfin):
if isinstance(server_2, Plex) and not sync_from_jelly_to_plex:
logger("Sync from jellyfin -> plex is disabled", 1)
logger.info("Sync from jellyfin -> plex is disabled")
return False
if isinstance(server_2, Jellyfin) and not sync_from_jelly_to_jellyfin:
logger("Sync from jellyfin -> jellyfin is disabled", 1)
logger.info("Sync from jellyfin -> jellyfin is disabled")
return False
if isinstance(server_2, Emby) and not sync_from_jelly_to_emby:
logger("Sync from jellyfin -> emby is disabled", 1)
logger.info("Sync from jellyfin -> emby is disabled")
return False
if isinstance(server_1, Emby):
if isinstance(server_2, Plex) and not sync_from_emby_to_plex:
logger("Sync from emby -> plex is disabled", 1)
logger.info("Sync from emby -> plex is disabled")
return False
if isinstance(server_2, Jellyfin) and not sync_from_emby_to_jellyfin:
logger("Sync from emby -> jellyfin is disabled", 1)
logger.info("Sync from emby -> jellyfin is disabled")
return False
if isinstance(server_2, Emby) and not sync_from_emby_to_emby:
logger("Sync from emby -> emby is disabled", 1)
logger.info("Sync from emby -> emby is disabled")
return False
return True
@ -98,18 +99,18 @@ def main_loop():
os.remove(log_file)
dryrun = str_to_bool(os.getenv("DRYRUN", "False"))
logger(f"Dryrun: {dryrun}", 1)
logger.info(f"Dryrun: {dryrun}")
user_mapping = os.getenv("USER_MAPPING", "")
user_mapping = json.loads(user_mapping.lower())
logger(f"User Mapping: {user_mapping}", 1)
logger.info(f"User Mapping: {user_mapping}")
library_mapping = os.getenv("LIBRARY_MAPPING", "")
library_mapping = json.loads(library_mapping)
logger(f"Library Mapping: {library_mapping}", 1)
logger.info(f"Library Mapping: {library_mapping}")
# Create (black/white)lists
logger("Creating (black/white)lists", 1)
logger.info("Creating (black/white)lists")
blacklist_library = parse_string_to_list(os.getenv("BLACKLIST_LIBRARY", None))
whitelist_library = parse_string_to_list(os.getenv("WHITELIST_LIBRARY", None))
blacklist_library_type = parse_string_to_list(
@ -140,7 +141,7 @@ def main_loop():
)
# Create server connections
logger("Creating server connections", 1)
logger.info("Creating server connections")
servers = generate_server_connections()
for server_1 in servers:
@ -156,11 +157,11 @@ def main_loop():
):
continue
logger(f"Server 1: {type(server_1)}: {server_1.info()}", 0)
logger(f"Server 2: {type(server_2)}: {server_2.info()}", 0)
logger.info(f"Server 1: {type(server_1)}: {server_1.info()}")
logger.info(f"Server 2: {type(server_2)}: {server_2.info()}")
# Create users list
logger("Creating users list", 1)
logger.info("Creating users list")
server_1_users, server_2_users = setup_users(
server_1, server_2, blacklist_users, whitelist_users, user_mapping
)
@ -174,38 +175,38 @@ def main_loop():
whitelist_library_type,
library_mapping,
)
logger.info(f"Server 1 syncing libraries: {server_1_libraries}")
logger.info(f"Server 2 syncing libraries: {server_2_libraries}")
logger("Creating watched lists", 1)
logger.info("Creating watched lists", 1)
server_1_watched = server_1.get_watched(server_1_users, server_1_libraries)
logger("Finished creating watched list server 1", 1)
logger.info("Finished creating watched list server 1")
server_2_watched = server_2.get_watched(server_2_users, server_2_libraries)
logger("Finished creating watched list server 2", 1)
logger.info("Finished creating watched list server 2")
logger(f"Server 1 watched: {server_1_watched}", 3)
logger(f"Server 2 watched: {server_2_watched}", 3)
logger.debug(f"Server 1 watched: {server_1_watched}")
logger.debug(f"Server 2 watched: {server_2_watched}")
logger("Cleaning Server 1 Watched", 1)
logger.info("Cleaning Server 1 Watched", 1)
server_1_watched_filtered = cleanup_watched(
server_1_watched, server_2_watched, user_mapping, library_mapping
)
logger("Cleaning Server 2 Watched", 1)
logger.info("Cleaning Server 2 Watched", 1)
server_2_watched_filtered = cleanup_watched(
server_2_watched, server_1_watched, user_mapping, library_mapping
)
logger(
logger.debug(
f"server 1 watched that needs to be synced to server 2:\n{server_1_watched_filtered}",
1,
)
logger(
logger.debug(
f"server 2 watched that needs to be synced to server 1:\n{server_2_watched_filtered}",
1,
)
if should_sync_server(server_2, server_1):
logger(f"Syncing {server_2.info()} -> {server_1.info()}", 0)
logger.info(f"Syncing {server_2.info()} -> {server_1.info()}")
server_1.update_watched(
server_2_watched_filtered,
user_mapping,
@ -214,7 +215,7 @@ def main_loop():
)
if should_sync_server(server_1, server_2):
logger(f"Syncing {server_1.info()} -> {server_2.info()}", 0)
logger.info(f"Syncing {server_1.info()} -> {server_2.info()}")
server_2.update_watched(
server_1_watched_filtered,
user_mapping,
@ -223,7 +224,22 @@ def main_loop():
)
@logger.catch
def main():
# Remove default logger to configure our own
logger.remove()
# Choose log level based on environment
# If in debug mode with a "debug" level, use DEBUG; otherwise, default to INFO.
level = os.getenv("DEBUG_LEVEL", "INFO").upper()
if level not in ["INFO", "DEBUG", "TRACE"]:
raise Exception("Invalid DEBUG_LEVEL, please choose between INFO, DEBUG, TRACE")
# Add a sink for file logging (with optional rotation) and the console.
logger.add("log.log", level=level, rotation="10 MB")
logger.add(sys.stdout, level=level)
run_only_once = str_to_bool(os.getenv("RUN_ONLY_ONCE", "False"))
sleep_duration = float(os.getenv("SLEEP_DURATION", "3600"))
times: list[float] = []
@ -235,31 +251,31 @@ def main():
times.append(end - start)
if len(times) > 0:
logger(f"Average time: {sum(times) / len(times)}", 0)
logger.info(f"Average time: {sum(times) / len(times)}")
if run_only_once:
break
logger(f"Looping in {sleep_duration}")
logger.info(f"Looping in {sleep_duration}")
sleep(sleep_duration)
except Exception as error:
if isinstance(error, list):
for message in error:
logger(message, log_type=2)
logger.error(message)
else:
logger(error, log_type=2)
logger.error(error)
logger(traceback.format_exc(), 2)
logger.error(traceback.format_exc())
if run_only_once:
break
logger(f"Retrying in {sleep_duration}", log_type=0)
logger.info(f"Retrying in {sleep_duration}")
sleep(sleep_duration)
except KeyboardInterrupt:
if len(times) > 0:
logger(f"Average time: {sum(times) / len(times)}", 0)
logger("Exiting", log_type=0)
logger.info(f"Average time: {sum(times) / len(times)}")
logger.info("Exiting")
os._exit(0)

View File

@ -1,6 +1,7 @@
import os
import requests
from dotenv import load_dotenv
from loguru import logger
from urllib3.poolmanager import PoolManager
from math import floor
@ -12,7 +13,6 @@ from plexapi.server import PlexServer
from plexapi.myplex import MyPlexAccount
from src.functions import (
logger,
search_mapping,
log_marked,
str_to_bool,
@ -94,7 +94,9 @@ def update_user_watched(
if not library_data.series and not library_data.movies:
return
logger(f"Plex: Updating watched for {user.title} in library {library_name}", 1)
logger.info(
f"Plex: Updating watched for {user.title} in library {library_name}"
)
library_section = user_plex.library.section(library_name)
# Update movies.
@ -112,11 +114,8 @@ def update_user_watched(
if stored_movie.status.completed:
msg = f"Plex: {plex_movie.title} as watched for {user.title} in {library_name}"
if not dryrun:
logger(msg, 5)
plex_movie.markWatched()
else:
logger(msg, 6)
logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}")
log_marked(
"Plex",
user_plex.friendlyName,
@ -129,11 +128,9 @@ def update_user_watched(
else:
msg = f"Plex: {plex_movie.title} as partially watched for {floor(stored_movie.status.time / 60_000)} minutes for {user.title} in {library_name}"
if not dryrun:
logger(msg, 5)
plex_movie.updateTimeline(stored_movie.status.time)
else:
logger(msg, 6)
logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}")
log_marked(
"Plex",
user_plex.friendlyName,
@ -157,7 +154,7 @@ def update_user_watched(
if check_same_identifiers(
plex_show_identifiers, stored_series.identifiers
):
logger(f"Found matching show for '{plex_show.title}'", 1)
logger.info(f"Found matching show for '{plex_show.title}'")
# Now update episodes.
# Get the list of Plex episodes for this show.
plex_episodes = plex_show.episodes()
@ -172,11 +169,11 @@ def update_user_watched(
if stored_ep.status.completed:
msg = f"Plex: {plex_show.title} {plex_episode.title} as watched for {user.title} in {library_name}"
if not dryrun:
logger(msg, 5)
plex_episode.markWatched()
else:
logger(msg, 6)
logger.success(
f"{'[DRYRUN] ' if dryrun else ''}{msg}"
)
log_marked(
"Plex",
user_plex.friendlyName,
@ -188,13 +185,13 @@ def update_user_watched(
else:
msg = f"Plex: {plex_show.title} {plex_episode.title} as partially watched for {floor(stored_ep.status.time / 60_000)} minutes for {user.title} in {library_name}"
if not dryrun:
logger(msg, 5)
plex_episode.updateTimeline(
stored_ep.status.time
)
else:
logger(msg, 6)
logger.success(
f"{'[DRYRUN] ' if dryrun else ''}{msg}"
)
log_marked(
"Plex",
user_plex.friendlyName,
@ -206,8 +203,9 @@ def update_user_watched(
)
break # Found a matching episode.
break # Found a matching show.
except Exception as e:
logger(
logger.error(
f"Plex: Failed to update watched for {user.title} in library {library_name}, Error: {e}",
2,
)
@ -255,11 +253,11 @@ class Plex:
return plex
except Exception as e:
if self.username or self.password:
if self.username:
msg = f"Failed to login via plex account {self.username}"
logger(f"Plex: Failed to login, {msg}, Error: {e}", 2)
logger.error(f"Plex: Failed to login, {msg}, Error: {e}")
else:
logger(f"Plex: Failed to login, Error: {e}", 2)
logger.error(f"Plex: Failed to login, Error: {e}")
raise Exception(e)
def info(self) -> str:
@ -274,7 +272,7 @@ class Plex:
return users
except Exception as e:
logger(f"Plex: Failed to get users, Error: {e}", 2)
logger.error(f"Plex: Failed to get users, Error: {e}")
raise Exception(e)
def get_libraries(self) -> dict[str, str]:
@ -288,9 +286,8 @@ class Plex:
library_type = library.type
if library_type not in ["movie", "show"]:
logger(
logger.debug(
f"Plex: Skipping Library {library_title} found type {library_type}",
1,
)
continue
@ -298,15 +295,14 @@ class Plex:
return output
except Exception as e:
logger(f"Plex: Failed to get libraries, Error: {e}", 2)
logger.error(f"Plex: Failed to get libraries, Error: {e}")
raise Exception(e)
def get_user_library_watched(self, user, user_plex, library) -> LibraryData:
user_name: str = user.username.lower() if user.username else user.title.lower()
try:
logger(
logger.info(
f"Plex: Generating watched for {user_name} in library {library.title}",
0,
)
watched = LibraryData(title=library.title)
@ -365,9 +361,8 @@ class Plex:
return watched
except Exception as e:
logger(
logger.error(
f"Plex: Failed to get watched for {user_name} in library {library.title}, Error: {e}",
2,
)
return LibraryData(title=library.title)
@ -386,9 +381,8 @@ class Plex:
token,
)
else:
logger(
logger.error(
f"Plex: Failed to get token for {user.title}, skipping",
2,
)
continue
@ -411,7 +405,7 @@ class Plex:
return users_watched
except Exception as e:
logger(f"Plex: Failed to get watched, Error: {e}", 2)
logger.error(f"Plex: Failed to get watched, Error: {e}")
raise Exception(e)
def update_watched(
@ -446,9 +440,8 @@ class Plex:
user_plex = self.plex
else:
if isinstance(user, str):
logger(
logger.warning(
f"Plex: {user} is not a plex object, attempting to get object for user",
4,
)
user = self.plex.myPlexAccount().user(user)
@ -460,9 +453,8 @@ class Plex:
session=self.session,
)
else:
logger(
logger.error(
f"Plex: Failed to get token for {user.title}, skipping",
2,
)
continue
@ -480,21 +472,18 @@ class Plex:
if library_other.lower() in [
x.title.lower() for x in library_list
]:
logger(
logger.info(
f"Plex: Library {library_name} not found, but {library_other} found, using {library_other}",
1,
)
library_name = library_other
else:
logger(
logger.info(
f"Plex: Library {library_name} or {library_other} not found in library list",
1,
)
continue
else:
logger(
logger.info(
f"Plex: Library {library_name} not found in library list",
1,
)
continue
@ -507,5 +496,5 @@ class Plex:
)
except Exception as e:
logger(f"Plex: Failed to update watched, Error: {e}", 2)
logger.error(f"Plex: Failed to update watched, Error: {e}")
raise Exception(e)

View File

@ -1,11 +1,10 @@
from plexapi.myplex import MyPlexAccount
from loguru import logger
from src.emby import Emby
from src.jellyfin import Jellyfin
from src.plex import Plex
from src.functions import (
logger,
search_mapping,
)
from src.functions import search_mapping
def generate_user_list(server: Plex | Jellyfin | Emby) -> list[str]:
@ -63,7 +62,7 @@ def filter_user_lists(
# whitelist_user is not empty and user lowercase is not in whitelist lowercase
if len(whitelist_users) > 0:
if user not in whitelist_users and users[user] not in whitelist_users:
logger(f"{user} or {users[user]} is not in whitelist", 1)
logger.info(f"{user} or {users[user]} is not in whitelist")
continue
if user not in blacklist_users and users[user] not in blacklist_users:
@ -113,26 +112,26 @@ def setup_users(
) -> tuple[list[MyPlexAccount] | dict[str, str], list[MyPlexAccount] | dict[str, str]]:
server_1_users = generate_user_list(server_1)
server_2_users = generate_user_list(server_2)
logger(f"Server 1 users: {server_1_users}", 1)
logger(f"Server 2 users: {server_2_users}", 1)
logger.debug(f"Server 1 users: {server_1_users}")
logger.debug(f"Server 2 users: {server_2_users}")
users = combine_user_lists(server_1_users, server_2_users, user_mapping)
logger(f"User list that exist on both servers {users}", 1)
logger.debug(f"User list that exist on both servers {users}")
users_filtered = filter_user_lists(users, blacklist_users, whitelist_users)
logger(f"Filtered user list {users_filtered}", 1)
logger.debug(f"Filtered user list {users_filtered}")
output_server_1_users = generate_server_users(server_1, users_filtered)
output_server_2_users = generate_server_users(server_2, users_filtered)
# Check if users is none or empty
if output_server_1_users is None or len(output_server_1_users) == 0:
logger(
logger.warning(
f"No users found for server 1 {type(server_1)}, users: {server_1_users}, overlapping users {users}, filtered users {users_filtered}, server 1 users {server_1.users}"
)
if output_server_2_users is None or len(output_server_2_users) == 0:
logger(
logger.warning(
f"No users found for server 2 {type(server_2)}, users: {server_2_users}, overlapping users {users} filtered users {users_filtered}, server 2 users {server_2.users}"
)
@ -144,7 +143,7 @@ def setup_users(
):
raise Exception("No users found for one or both servers")
logger(f"Server 1 users: {output_server_1_users}", 1)
logger(f"Server 2 users: {output_server_2_users}", 1)
logger.info(f"Server 1 users: {output_server_1_users}")
logger.info(f"Server 2 users: {output_server_2_users}")
return output_server_1_users, output_server_2_users

View File

@ -1,7 +1,8 @@
import copy
from pydantic import BaseModel
from loguru import logger
from src.functions import logger, search_mapping
from src.functions import search_mapping
class MediaIdentifiers(BaseModel):
@ -134,7 +135,7 @@ def cleanup_watched(
remove_flag = False
for movie2 in library_2.movies:
if check_remove_entry(movie, movie2):
logger(f"Removing movie: {movie.identifiers.title}", 3)
logger.trace(f"Removing movie: {movie.identifiers.title}")
remove_flag = True
break
@ -164,9 +165,8 @@ def cleanup_watched(
remove_flag = False
for ep2 in matching_series.episodes:
if check_remove_entry(ep1, ep2):
logger(
logger.trace(
f"Removing episode '{ep1.identifiers.title}' from show '{series1.identifiers.title}'",
3,
)
remove_flag = True
break
@ -179,9 +179,8 @@ def cleanup_watched(
modified_series1.episodes = filtered_episodes
filtered_series_list.append(modified_series1)
else:
logger(
logger.trace(
f"Removing entire show '{series1.identifiers.title}' as no episodes remain after cleanup.",
3,
)
modified_watched_list_1[user_1].libraries[
library_1_key
@ -194,7 +193,7 @@ def cleanup_watched(
if library.movies or library.series:
new_libraries[lib_key] = library
else:
logger(f"Removing empty library '{lib_key}' for user '{user}'", 3)
logger.trace(f"Removing empty library '{lib_key}' for user '{user}'")
user_data.libraries = new_libraries
return modified_watched_list_1
@ -206,5 +205,5 @@ def get_other(watched_list, object_1, object_2):
elif object_2 in watched_list:
return object_2
else:
logger(f"{object_1} and {object_2} not found in watched list 2", 1)
logger.info(f"{object_1} and {object_2} not found in watched list 2")
return None

View File

@ -1,6 +1,6 @@
# Check the mark.log file that is generated by the CI to make sure it contains the expected values
import os, argparse
import argparse
import os
def parse_args():

24
uv.lock
View File

@ -87,6 +87,7 @@ name = "jellyplex-watched"
version = "6.1.2"
source = { virtual = "." }
dependencies = [
{ name = "loguru" },
{ name = "packaging" },
{ name = "plexapi" },
{ name = "pydantic" },
@ -104,6 +105,7 @@ lint = [
[package.metadata]
requires-dist = [
{ name = "loguru", specifier = ">=0.7.3" },
{ name = "packaging", specifier = "==24.2" },
{ name = "plexapi", specifier = "==4.16.1" },
{ name = "pydantic", specifier = "==2.10.6" },
@ -115,6 +117,19 @@ requires-dist = [
dev = [{ name = "pytest", specifier = ">=8.3.4" }]
lint = [{ name = "ruff", specifier = ">=0.9.6" }]
[[package]]
name = "loguru"
version = "0.7.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "win32-setctime", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/3a/05/a1dae3dffd1116099471c643b8924f5aa6524411dc6c63fdae648c4f1aca/loguru-0.7.3.tar.gz", hash = "sha256:19480589e77d47b8d85b2c827ad95d49bf31b0dcde16593892eb51dd18706eb6", size = 63559 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0c/29/0348de65b8cc732daa3e33e67806420b2ae89bdce2b04af740289c5c6c8c/loguru-0.7.3-py3-none-any.whl", hash = "sha256:31a33c10c8e1e10422bfd431aeb5d351c7cf7fa671e3c4df004162264b28220c", size = 61595 },
]
[[package]]
name = "packaging"
version = "24.2"
@ -279,3 +294,12 @@ sdist = { url = "https://files.pythonhosted.org/packages/aa/63/e53da845320b757bf
wheels = [
{ url = "https://files.pythonhosted.org/packages/c8/19/4ec628951a74043532ca2cf5d97b7b14863931476d117c471e8e2b1eb39f/urllib3-2.3.0-py3-none-any.whl", hash = "sha256:1cee9ad369867bfdbbb48b7dd50374c0967a0bb7710050facf0dd6911440e3df", size = 128369 },
]
[[package]]
name = "win32-setctime"
version = "1.2.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b3/8f/705086c9d734d3b663af0e9bb3d4de6578d08f46b1b101c2442fd9aecaa2/win32_setctime-1.2.0.tar.gz", hash = "sha256:ae1fdf948f5640aae05c511ade119313fb6a30d7eabe25fef9764dca5873c4c0", size = 4867 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e1/07/c6fe3ad3e685340704d314d765b7912993bcb8dc198f0e7a89382d37974b/win32_setctime-1.2.0-py3-none-any.whl", hash = "sha256:95d644c4e708aba81dc3704a116d8cbc974d70b3bdb8be1d150e36be6e9d1390", size = 4083 },
]