Merge pull request #210 from luigi311/more_cleanup

More cleanup
This commit is contained in:
Luigi311
2025-02-18 18:09:57 -07:00
committed by GitHub
13 changed files with 510 additions and 530 deletions

View File

@@ -2,14 +2,14 @@ from src.functions import logger, search_mapping
def setup_black_white_lists( def setup_black_white_lists(
blacklist_library: str, blacklist_library: list[str] | None,
whitelist_library: str, whitelist_library: list[str] | None,
blacklist_library_type: str, blacklist_library_type: list[str] | None,
whitelist_library_type: str, whitelist_library_type: list[str] | None,
blacklist_users: str, blacklist_users: list[str] | None,
whitelist_users: str, whitelist_users: list[str] | None,
library_mapping=None, library_mapping: dict[str, str] | None = None,
user_mapping=None, user_mapping: dict[str, str] | None = None,
): ):
blacklist_library, blacklist_library_type, blacklist_users = setup_x_lists( blacklist_library, blacklist_library_type, blacklist_users = setup_x_lists(
blacklist_library, blacklist_library,
@@ -40,53 +40,44 @@ def setup_black_white_lists(
def setup_x_lists( def setup_x_lists(
xlist_library, xlist_library: list[str] | None,
xlist_library_type, xlist_library_type: list[str] | None,
xlist_users, xlist_users: list[str] | None,
xlist_type, xlist_type: str | None,
library_mapping=None, library_mapping: dict[str, str] | None = None,
user_mapping=None, user_mapping: dict[str, str] | None = None,
): ) -> tuple[list[str], list[str], list[str]]:
out_library: list[str] = []
if xlist_library: if xlist_library:
if len(xlist_library) > 0: out_library = [x.strip() for x in xlist_library]
xlist_library = xlist_library.split(",")
xlist_library = [x.strip() for x in xlist_library]
if library_mapping: if library_mapping:
temp_library = [] temp_library: list[str] = []
for library in xlist_library: for library in xlist_library:
library_other = search_mapping(library_mapping, library) library_other = search_mapping(library_mapping, library)
if library_other: if library_other:
temp_library.append(library_other) temp_library.append(library_other)
xlist_library = xlist_library + temp_library out_library = out_library + temp_library
else:
xlist_library = []
logger(f"{xlist_type}list Library: {xlist_library}", 1) logger(f"{xlist_type}list Library: {xlist_library}", 1)
out_library_type: list[str] = []
if xlist_library_type: if xlist_library_type:
if len(xlist_library_type) > 0: out_library_type = [x.lower().strip() for x in xlist_library_type]
xlist_library_type = xlist_library_type.split(",")
xlist_library_type = [x.lower().strip() for x in xlist_library_type]
else:
xlist_library_type = []
logger(f"{xlist_type}list Library Type: {xlist_library_type}", 1)
logger(f"{xlist_type}list Library Type: {out_library_type}", 1)
out_users: list[str] = []
if xlist_users: if xlist_users:
if len(xlist_users) > 0: out_users = [x.lower().strip() for x in xlist_users]
xlist_users = xlist_users.split(",")
xlist_users = [x.lower().strip() for x in xlist_users]
if user_mapping: if user_mapping:
temp_users = [] temp_users: list[str] = []
for user in xlist_users: for user in out_users:
user_other = search_mapping(user_mapping, user) user_other = search_mapping(user_mapping, user)
if user_other: if user_other:
temp_users.append(user_other) temp_users.append(user_other)
xlist_users = xlist_users + temp_users out_users = out_users + temp_users
else:
xlist_users = []
else:
xlist_users = []
logger(f"{xlist_type}list Users: {xlist_users}", 1)
return xlist_library, xlist_library_type, xlist_users logger(f"{xlist_type}list Users: {out_users}", 1)
return out_library, out_library_type, out_users

View File

@@ -1,4 +1,5 @@
import os import os
from typing import Literal
from dotenv import load_dotenv from dotenv import load_dotenv
from src.functions import logger, str_to_bool from src.functions import logger, str_to_bool
@@ -9,39 +10,32 @@ from src.emby import Emby
load_dotenv(override=True) load_dotenv(override=True)
def jellyfin_emby_server_connection(server_baseurl, server_token, server_type): def jellyfin_emby_server_connection(
servers = [] server_baseurl: str, server_token: str, server_type: Literal["jellyfin", "emby"]
) -> list[Jellyfin | Emby]:
servers: list[Jellyfin | Emby] = []
server: Jellyfin | Emby
server_baseurl = server_baseurl.split(",") server_baseurls = server_baseurl.split(",")
server_token = server_token.split(",") server_tokens = server_token.split(",")
if len(server_baseurl) != len(server_token): if len(server_baseurls) != len(server_tokens):
raise Exception( raise Exception(
f"{server_type.upper()}_BASEURL and {server_type.upper()}_TOKEN must have the same number of entries" f"{server_type.upper()}_BASEURL and {server_type.upper()}_TOKEN must have the same number of entries"
) )
for i, baseurl in enumerate(server_baseurl): for i, baseurl in enumerate(server_baseurls):
baseurl = baseurl.strip() baseurl = baseurl.strip()
if baseurl[-1] == "/": if baseurl[-1] == "/":
baseurl = baseurl[:-1] baseurl = baseurl[:-1]
if server_type == "jellyfin": if server_type == "jellyfin":
server = Jellyfin(baseurl=baseurl, token=server_token[i].strip()) server = Jellyfin(baseurl=baseurl, token=server_tokens[i].strip())
servers.append( servers.append(server)
(
"jellyfin",
server,
)
)
elif server_type == "emby": elif server_type == "emby":
server = Emby(baseurl=baseurl, token=server_token[i].strip()) server = Emby(baseurl=baseurl, token=server_tokens[i].strip())
servers.append( servers.append(server)
(
"emby",
server,
)
)
else: else:
raise Exception("Unknown server type") raise Exception("Unknown server type")
@@ -50,19 +44,19 @@ def jellyfin_emby_server_connection(server_baseurl, server_token, server_type):
return servers return servers
def generate_server_connections(): def generate_server_connections() -> list[Plex | Jellyfin | Emby]:
servers = [] servers: list[Plex | Jellyfin | Emby] = []
plex_baseurl = os.getenv("PLEX_BASEURL", None) plex_baseurl_str: str | None = os.getenv("PLEX_BASEURL", None)
plex_token = os.getenv("PLEX_TOKEN", None) plex_token_str: str | None = os.getenv("PLEX_TOKEN", None)
plex_username = os.getenv("PLEX_USERNAME", None) plex_username_str: str | None = os.getenv("PLEX_USERNAME", None)
plex_password = os.getenv("PLEX_PASSWORD", None) plex_password_str: str | None = os.getenv("PLEX_PASSWORD", None)
plex_servername = os.getenv("PLEX_SERVERNAME", None) plex_servername_str: str | None = os.getenv("PLEX_SERVERNAME", None)
ssl_bypass = str_to_bool(os.getenv("SSL_BYPASS", "False")) ssl_bypass = str_to_bool(os.getenv("SSL_BYPASS", "False"))
if plex_baseurl and plex_token: if plex_baseurl_str and plex_token_str:
plex_baseurl = plex_baseurl.split(",") plex_baseurl = plex_baseurl_str.split(",")
plex_token = plex_token.split(",") plex_token = plex_token_str.split(",")
if len(plex_baseurl) != len(plex_token): if len(plex_baseurl) != len(plex_token):
raise Exception( raise Exception(
@@ -81,17 +75,12 @@ def generate_server_connections():
logger(f"Plex Server {i} info: {server.info()}", 3) logger(f"Plex Server {i} info: {server.info()}", 3)
servers.append( servers.append(server)
(
"plex",
server,
)
)
if plex_username and plex_password and plex_servername: if plex_username_str and plex_password_str and plex_servername_str:
plex_username = plex_username.split(",") plex_username = plex_username_str.split(",")
plex_password = plex_password.split(",") plex_password = plex_password_str.split(",")
plex_servername = plex_servername.split(",") plex_servername = plex_servername_str.split(",")
if len(plex_username) != len(plex_password) or len(plex_username) != len( if len(plex_username) != len(plex_password) or len(plex_username) != len(
plex_servername plex_servername
@@ -111,16 +100,10 @@ def generate_server_connections():
) )
logger(f"Plex Server {i} info: {server.info()}", 3) logger(f"Plex Server {i} info: {server.info()}", 3)
servers.append( servers.append(server)
(
"plex",
server,
)
)
jellyfin_baseurl = os.getenv("JELLYFIN_BASEURL", None) jellyfin_baseurl = os.getenv("JELLYFIN_BASEURL", None)
jellyfin_token = os.getenv("JELLYFIN_TOKEN", None) jellyfin_token = os.getenv("JELLYFIN_TOKEN", None)
if jellyfin_baseurl and jellyfin_token: if jellyfin_baseurl and jellyfin_token:
servers.extend( servers.extend(
jellyfin_emby_server_connection( jellyfin_emby_server_connection(
@@ -130,7 +113,6 @@ def generate_server_connections():
emby_baseurl = os.getenv("EMBY_BASEURL", None) emby_baseurl = os.getenv("EMBY_BASEURL", None)
emby_token = os.getenv("EMBY_TOKEN", None) emby_token = os.getenv("EMBY_TOKEN", None)
if emby_baseurl and emby_token: if emby_baseurl and emby_token:
servers.extend( servers.extend(
jellyfin_emby_server_connection(emby_baseurl, emby_token, "emby") jellyfin_emby_server_connection(emby_baseurl, emby_token, "emby")

View File

@@ -1,5 +1,5 @@
from src.jellyfin_emby import JellyfinEmby from src.jellyfin_emby import JellyfinEmby
from packaging import version from packaging.version import parse, Version
class Emby(JellyfinEmby): class Emby(JellyfinEmby):
@@ -21,5 +21,5 @@ class Emby(JellyfinEmby):
server_type="Emby", baseurl=baseurl, token=token, headers=headers server_type="Emby", baseurl=baseurl, token=token, headers=headers
) )
def is_partial_update_supported(self, server_version): def is_partial_update_supported(self, server_version: Version) -> bool:
return server_version > version.parse("4.4") return server_version > parse("4.4")

View File

@@ -1,5 +1,6 @@
import os import os
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Callable
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv(override=True) load_dotenv(override=True)
@@ -8,11 +9,11 @@ log_file = os.getenv("LOG_FILE", os.getenv("LOGFILE", "log.log"))
mark_file = os.getenv("MARK_FILE", os.getenv("MARKFILE", "mark.log")) mark_file = os.getenv("MARK_FILE", os.getenv("MARKFILE", "mark.log"))
def logger(message: str, log_type=0): def logger(message: str, log_type: int = 0):
debug = str_to_bool(os.getenv("DEBUG", "False")) debug = str_to_bool(os.getenv("DEBUG", "False"))
debug_level = os.getenv("DEBUG_LEVEL", "info").lower() debug_level = os.getenv("DEBUG_LEVEL", "info").lower()
output = str(message) output: str | None = str(message)
if log_type == 0: if log_type == 0:
pass pass
elif log_type == 1 and (debug and debug_level in ("info", "debug")): elif log_type == 1 and (debug and debug_level in ("info", "debug")):
@@ -42,12 +43,9 @@ def log_marked(
username: str, username: str,
library: str, library: str,
movie_show: str, movie_show: str,
episode: str = None, episode: str | None = None,
duration=None, duration: float | None = None,
): ):
if mark_file is None:
return
output = f"{server_type}/{server_name}/{username}/{library}/{movie_show}" output = f"{server_type}/{server_name}/{username}/{library}/{movie_show}"
if episode: if episode:
@@ -62,14 +60,14 @@ def log_marked(
# Reimplementation of distutils.util.strtobool due to it being deprecated # Reimplementation of distutils.util.strtobool due to it being deprecated
# Source: https://github.com/PostHog/posthog/blob/01e184c29d2c10c43166f1d40a334abbc3f99d8a/posthog/utils.py#L668 # Source: https://github.com/PostHog/posthog/blob/01e184c29d2c10c43166f1d40a334abbc3f99d8a/posthog/utils.py#L668
def str_to_bool(value: any) -> bool: def str_to_bool(value: str) -> bool:
if not value: if not value:
return False return False
return str(value).lower() in ("y", "yes", "t", "true", "on", "1") return str(value).lower() in ("y", "yes", "t", "true", "on", "1")
# Search for nested element in list # Search for nested element in list
def contains_nested(element, lst): def contains_nested(element: str, lst: list[tuple[str] | None] | tuple[str] | None):
if lst is None: if lst is None:
return None return None
@@ -84,7 +82,7 @@ def contains_nested(element, lst):
# Get mapped value # Get mapped value
def search_mapping(dictionary: dict, key_value: str): def search_mapping(dictionary: dict[str, str], key_value: str) -> str | None:
if key_value in dictionary.keys(): if key_value in dictionary.keys():
return dictionary[key_value] return dictionary[key_value]
elif key_value.lower() in dictionary.keys(): elif key_value.lower() in dictionary.keys():
@@ -100,8 +98,10 @@ def search_mapping(dictionary: dict, key_value: str):
# Return list of objects that exist in both lists including mappings # Return list of objects that exist in both lists including mappings
def match_list(list1, list2, list_mapping=None): def match_list(
output = [] list1: list[str], list2: list[str], list_mapping: dict[str, str] | None = None
) -> list[str]:
output: list[str] = []
for element in list1: for element in list1:
if element in list2: if element in list2:
output.append(element) output.append(element)
@@ -114,35 +114,49 @@ def match_list(list1, list2, list_mapping=None):
def future_thread_executor( def future_thread_executor(
args: list, threads: int = None, override_threads: bool = False args: list[tuple[Callable[..., Any], ...]],
): threads: int | None = None,
futures_list = [] override_threads: bool = False,
results = [] ) -> list[Any]:
results: list[Any] = []
workers = min(int(os.getenv("MAX_THREADS", 32)), os.cpu_count() * 2) # Determine the number of workers, defaulting to 1 if os.cpu_count() returns None
if threads: max_threads_env: int = int(os.getenv("MAX_THREADS", 32))
cpu_threads: int = os.cpu_count() or 1 # Default to 1 if os.cpu_count() is None
workers: int = min(max_threads_env, cpu_threads * 2)
# Adjust workers based on threads parameter and override_threads flag
if threads is not None:
workers = min(threads, workers) workers = min(threads, workers)
if override_threads: if override_threads:
workers = threads workers = threads if threads is not None else workers
# If only one worker, run in main thread to avoid overhead # If only one worker, run in main thread to avoid overhead
if workers == 1: if workers == 1:
results = []
for arg in args: for arg in args:
results.append(arg[0](*arg[1:])) results.append(arg[0](*arg[1:]))
return results return results
with ThreadPoolExecutor(max_workers=workers) as executor: with ThreadPoolExecutor(max_workers=workers) as executor:
futures_list: list[Future[Any]] = []
for arg in args: for arg in args:
# * arg unpacks the list into actual arguments # * arg unpacks the list into actual arguments
futures_list.append(executor.submit(*arg)) futures_list.append(executor.submit(*arg))
for future in futures_list: for out in futures_list:
try: try:
result = future.result() result = out.result()
results.append(result) results.append(result)
except Exception as e: except Exception as e:
raise Exception(e) raise Exception(e)
return results return results
def parse_string_to_list(string: str | None) -> list[str]:
output: list[str] = []
if string and len(string) > 0:
output = string.split(",")
return output

View File

@@ -1,5 +1,5 @@
from src.jellyfin_emby import JellyfinEmby from src.jellyfin_emby import JellyfinEmby
from packaging import version from packaging.version import parse, Version
class Jellyfin(JellyfinEmby): class Jellyfin(JellyfinEmby):
@@ -21,5 +21,5 @@ class Jellyfin(JellyfinEmby):
server_type="Jellyfin", baseurl=baseurl, token=token, headers=headers server_type="Jellyfin", baseurl=baseurl, token=token, headers=headers
) )
def is_partial_update_supported(self, server_version): def is_partial_update_supported(self, server_version: Version) -> bool:
return server_version >= version.parse("10.9.0") return server_version >= parse("10.9.0")

View File

@@ -2,9 +2,10 @@
import traceback, os import traceback, os
from math import floor from math import floor
from typing import Any, Literal
from dotenv import load_dotenv from dotenv import load_dotenv
import requests import requests
from packaging import version from packaging.version import parse, Version
from src.functions import ( from src.functions import (
logger, logger,
@@ -21,39 +22,6 @@ generate_guids = str_to_bool(os.getenv("GENERATE_GUIDS", "True"))
generate_locations = str_to_bool(os.getenv("GENERATE_LOCATIONS", "True")) generate_locations = str_to_bool(os.getenv("GENERATE_LOCATIONS", "True"))
def get_guids(server_type, item):
if item.get("Name"):
guids = {"title": item.get("Name")}
else:
logger(f"{server_type}: Name not found in {item.get('Id')}", 1)
guids = {"title": None}
if "ProviderIds" in item:
guids.update({k.lower(): v for k, v in item["ProviderIds"].items()})
else:
logger(f"{server_type}: ProviderIds not found in {item.get('Name')}", 1)
if "MediaSources" in item:
guids["locations"] = tuple(
[x["Path"].split("/")[-1] for x in item["MediaSources"] if "Path" in x]
)
else:
logger(f"{server_type}: MediaSources not found in {item.get('Name')}", 1)
guids["locations"] = tuple()
if "UserData" in item:
guids["status"] = {
"completed": item["UserData"]["Played"],
# Convert ticks to milliseconds to match Plex
"time": floor(item["UserData"]["PlaybackPositionTicks"] / 10000),
}
else:
logger(f"{server_type}: UserData not found in {item.get('Name')}", 1)
guids["status"] = {}
return guids
def get_video_status(server_video, videos_ids, videos): def get_video_status(server_video, videos_ids, videos):
video_status = None video_status = None
@@ -103,7 +71,13 @@ def get_video_status(server_video, videos_ids, videos):
class JellyfinEmby: class JellyfinEmby:
def __init__(self, server_type, baseurl, token, headers): def __init__(
self,
server_type: Literal["Jellyfin", "Emby"],
baseurl: str,
token: str,
headers: dict[str, str],
):
if server_type not in ["Jellyfin", "Emby"]: if server_type not in ["Jellyfin", "Emby"]:
raise Exception(f"Server type {server_type} not supported") raise Exception(f"Server type {server_type} not supported")
self.server_type = server_type self.server_type = server_type
@@ -122,9 +96,17 @@ class JellyfinEmby:
self.users = self.get_users() self.users = self.get_users()
self.server_name = self.info(name_only=True) self.server_name = self.info(name_only=True)
def query(self, query, query_type, identifiers=None, json=None): def query(
self,
query: str,
query_type: Literal["get", "post"],
identifiers: dict[str, str] | None = None,
json: dict[str, float] | None = None,
) -> dict[str, Any] | list[dict[str, Any]] | None:
try: try:
results = None results: (
dict[str, list[Any] | dict[str, str]] | list[dict[str, Any]] | None
) = None
if query_type == "get": if query_type == "get":
response = self.session.get( response = self.session.get(
@@ -160,7 +142,7 @@ class JellyfinEmby:
raise Exception("Query result is not of type list or dict") raise Exception("Query result is not of type list or dict")
# append identifiers to results # append identifiers to results
if identifiers: if identifiers and results:
results["Identifiers"] = identifiers results["Identifiers"] = identifiers
return results return results
@@ -172,16 +154,21 @@ class JellyfinEmby:
) )
raise Exception(e) raise Exception(e)
def info(self, name_only: bool = False) -> str: def info(
self, name_only: bool = False, version_only: bool = False
) -> str | Version | None:
try: try:
query_string = "/System/Info/Public" query_string = "/System/Info/Public"
response = self.query(query_string, "get") response: dict[str, Any] = self.query(query_string, "get")
if response: if response:
if name_only: if name_only:
return f"{response['ServerName']}" return response["ServerName"]
return f"{self.server_type} {response['ServerName']}: {response['Version']}" elif version_only:
return parse(response["Version"])
return f"{self.server_type} {response.get('ServerName')}: {response.get('Version')}"
else: else:
return None return None
@@ -189,29 +176,17 @@ class JellyfinEmby:
logger(f"{self.server_type}: Get server name failed {e}", 2) logger(f"{self.server_type}: Get server name failed {e}", 2)
raise Exception(e) raise Exception(e)
def get_server_version(self): def get_users(self) -> dict[str, str]:
try: try:
response = self.query("/System/Info/Public", "get") users: dict[str, str] = {}
if response:
return version.parse(response["Version"])
else:
return None
except Exception as e:
logger(f"{self.server_type}: Get server version failed: {e}", 2)
raise Exception(e)
def get_users(self):
try:
users = {}
query_string = "/Users" query_string = "/Users"
response = self.query(query_string, "get") response: list[dict[str, str | bool]] = self.query(query_string, "get")
# If response is not empty # If response is not empty
if response: if response:
for user in response: for user in response:
if isinstance(user["Name"], str) and isinstance(user["Id"], str):
users[user["Name"]] = user["Id"] users[user["Name"]] = user["Id"]
return users return users
@@ -219,7 +194,45 @@ class JellyfinEmby:
logger(f"{self.server_type}: Get users failed {e}", 2) logger(f"{self.server_type}: Get users failed {e}", 2)
raise Exception(e) raise Exception(e)
def get_libraries(self): def get_guids(self, item: dict):
guids: dict[str, str | tuple[str] | dict[str, bool | int]] = {}
if item.get("Name"):
guids["title"] = item.get("Name")
else:
logger(f"{self.server_type}: Name not found in {item.get('Id')}", 1)
guids["title"] = None
if "ProviderIds" in item:
guids.update({k.lower(): v for k, v in item["ProviderIds"].items()})
else:
logger(
f"{self.server_type}: ProviderIds not found in {item.get('Name')}", 1
)
if "MediaSources" in item:
guids["locations"] = tuple(
[x["Path"].split("/")[-1] for x in item["MediaSources"] if "Path" in x]
)
else:
logger(
f"{self.server_type}: MediaSources not found in {item.get('Name')}", 1
)
guids["locations"] = tuple()
if "UserData" in item:
guids["status"] = {
"completed": item["UserData"]["Played"],
# Convert ticks to milliseconds to match Plex
"time": floor(item["UserData"]["PlaybackPositionTicks"] / 10000),
}
else:
logger(f"{self.server_type}: UserData not found in {item.get('Name')}", 1)
guids["status"] = {}
return guids
def get_libraries(self) -> dict[str, str]:
try: try:
libraries = {} libraries = {}
@@ -227,7 +240,7 @@ class JellyfinEmby:
users = self.get_users() users = self.get_users()
for _, user_id in users.items(): for _, user_id in users.items():
user_libraries = self.query(f"/Users/{user_id}/Views", "get") user_libraries: dict = self.query(f"/Users/{user_id}/Views", "get")
for library in user_libraries["Items"]: for library in user_libraries["Items"]:
library_id = library["Id"] library_id = library["Id"]
library_title = library["Name"] library_title = library["Name"]
@@ -308,7 +321,7 @@ class JellyfinEmby:
) )
# Get the movie's GUIDs # Get the movie's GUIDs
movie_guids = get_guids(self.server_type, movie) movie_guids = self.get_guids(movie)
# Append the movie dictionary to the list for the given user and library # Append the movie dictionary to the list for the given user and library
user_watched[library_title].append(movie_guids) user_watched[library_title].append(movie_guids)
@@ -379,7 +392,7 @@ class JellyfinEmby:
episode["UserData"]["Played"] == True episode["UserData"]["Played"] == True
or episode["UserData"]["PlaybackPositionTicks"] > 600000000 or episode["UserData"]["PlaybackPositionTicks"] > 600000000
): ):
episode_guids = get_guids(self.server_type, episode) episode_guids = self.get_guids(episode)
mark_episodes_list.append(episode_guids) mark_episodes_list.append(episode_guids)
if mark_episodes_list: if mark_episodes_list:
@@ -411,7 +424,9 @@ class JellyfinEmby:
logger(traceback.format_exc(), 2) logger(traceback.format_exc(), 2)
return {} return {}
def get_watched(self, users, sync_libraries): def get_watched(
self, users: dict[str, str], sync_libraries: list[str]
):
try: try:
users_watched = {} users_watched = {}
watched = [] watched = []
@@ -427,7 +442,7 @@ class JellyfinEmby:
if library_title not in sync_libraries: if library_title not in sync_libraries:
continue continue
identifiers = { identifiers: dict[str, str] = {
"library_id": library_id, "library_id": library_id,
"library_title": library_title, "library_title": library_title,
} }
@@ -444,8 +459,8 @@ class JellyfinEmby:
if len(library["Items"]) == 0: if len(library["Items"]) == 0:
continue continue
library_id = library["Identifiers"]["library_id"] library_id: str = library["Identifiers"]["library_id"]
library_title = library["Identifiers"]["library_title"] library_title: str = library["Identifiers"]["library_title"]
# Get all library types excluding "Folder" # Get all library types excluding "Folder"
types = set( types = set(
@@ -725,7 +740,7 @@ class JellyfinEmby:
self, watched_list, user_mapping=None, library_mapping=None, dryrun=False self, watched_list, user_mapping=None, library_mapping=None, dryrun=False
): ):
try: try:
server_version = self.get_server_version() server_version = self.info(version_only=True)
update_partial = self.is_partial_update_supported(server_version) update_partial = self.is_partial_update_supported(server_version)
if not update_partial: if not update_partial:

View File

@@ -4,16 +4,15 @@ from src.functions import (
search_mapping, search_mapping,
) )
def check_skip_logic( def check_skip_logic(
library_title, library_title: str,
library_type, library_type: str,
blacklist_library, blacklist_library: list[str],
whitelist_library, whitelist_library: list[str],
blacklist_library_type, blacklist_library_type: list[str],
whitelist_library_type, whitelist_library_type: list[str],
library_mapping=None, library_mapping: dict[str, str] | None = None,
): ) -> str | None:
skip_reason = None skip_reason = None
library_other = None library_other = None
if library_mapping: if library_mapping:
@@ -48,11 +47,11 @@ def check_skip_logic(
def check_blacklist_logic( def check_blacklist_logic(
library_title, library_title: str,
library_type, library_type: str,
blacklist_library, blacklist_library: list[str],
blacklist_library_type, blacklist_library_type: list[str],
library_other=None, library_other: str | None = None,
): ):
skip_reason = None skip_reason = None
if isinstance(library_type, (list, tuple, set)): if isinstance(library_type, (list, tuple, set)):
@@ -84,11 +83,11 @@ def check_blacklist_logic(
def check_whitelist_logic( def check_whitelist_logic(
library_title, library_title: str,
library_type, library_type: str,
whitelist_library, whitelist_library: list[str],
whitelist_library_type, whitelist_library_type: list[str],
library_other=None, library_other: str | None = None,
): ):
skip_reason = None skip_reason = None
if len(whitelist_library_type) > 0: if len(whitelist_library_type) > 0:
@@ -131,14 +130,14 @@ def check_whitelist_logic(
def filter_libaries( def filter_libaries(
server_libraries, server_libraries: dict[str, str],
blacklist_library, blacklist_library: list[str],
blacklist_library_type, blacklist_library_type: list[str],
whitelist_library, whitelist_library: list[str],
whitelist_library_type, whitelist_library_type: list[str],
library_mapping=None, library_mapping: dict[str, str] | None = None,
): ) -> list[str]:
filtered_libaries = [] filtered_libaries: list[str] = []
for library in server_libraries: for library in server_libraries:
skip_reason = check_skip_logic( skip_reason = check_skip_logic(
library, library,
@@ -162,12 +161,12 @@ def filter_libaries(
def setup_libraries( def setup_libraries(
server_1, server_1,
server_2, server_2,
blacklist_library, blacklist_library: list[str],
blacklist_library_type, blacklist_library_type: list[str],
whitelist_library, whitelist_library: list[str],
whitelist_library_type, whitelist_library_type: list[str],
library_mapping=None, library_mapping: dict[str, str] | None = None,
): ) -> tuple[list[str], list[str]]:
server_1_libraries = server_1.get_libraries() server_1_libraries = server_1.get_libraries()
server_2_libraries = server_2.get_libraries() server_2_libraries = server_2.get_libraries()
logger(f"Server 1 libraries: {server_1_libraries}", 1) logger(f"Server 1 libraries: {server_1_libraries}", 1)
@@ -201,14 +200,16 @@ def setup_libraries(
return output_server_1_libaries, output_server_2_libaries return output_server_1_libaries, output_server_2_libaries
def show_title_dict(user_list: dict): def show_title_dict(user_list) -> dict[str, list[tuple[str] | None]]:
try: try:
show_output_dict = {} if not isinstance(user_list, dict):
return {}
show_output_dict: dict[str, list[tuple[str] | None]] = {}
show_output_dict["locations"] = [] show_output_dict["locations"] = []
show_counter = 0 # Initialize a counter for the current show position show_counter = 0 # Initialize a counter for the current show position
show_output_keys = user_list.keys() show_output_keys = [dict(x) for x in list(user_list.keys())]
show_output_keys = [dict(x) for x in list(show_output_keys)]
for show_key in show_output_keys: for show_key in show_output_keys:
for provider_key, provider_value in show_key.items(): for provider_key, provider_value in show_key.items():
# Skip title # Skip title
@@ -233,9 +234,19 @@ def show_title_dict(user_list: dict):
return {} return {}
def episode_title_dict(user_list: dict): def episode_title_dict(
user_list,
) -> dict[
str, list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None]
]:
try: try:
episode_output_dict = {} if not isinstance(user_list, dict):
return {}
episode_output_dict: dict[
str,
list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None],
] = {}
episode_output_dict["completed"] = [] episode_output_dict["completed"] = []
episode_output_dict["time"] = [] episode_output_dict["time"] = []
episode_output_dict["locations"] = [] episode_output_dict["locations"] = []
@@ -293,12 +304,18 @@ def episode_title_dict(user_list: dict):
return {} return {}
def movies_title_dict(user_list: dict): def movies_title_dict(
user_list,
) -> dict[str, list[str | bool | int | tuple[str] | None]]:
try: try:
movies_output_dict = {} if not isinstance(user_list, list):
movies_output_dict["completed"] = [] return {}
movies_output_dict["time"] = []
movies_output_dict["locations"] = [] movies_output_dict: dict[str, list[str | bool | int | tuple[str] | None]] = {
"completed": [],
"time": [],
"locations": [],
}
movie_counter = 0 # Initialize a counter for the current movie position movie_counter = 0 # Initialize a counter for the current movie position
for movie in user_list: for movie in user_list:
@@ -325,7 +342,11 @@ def movies_title_dict(user_list: dict):
return {} return {}
def generate_library_guids_dict(user_list: dict): def generate_library_guids_dict(user_list) -> tuple[
dict[str, list[tuple[str] | None]],
dict[str, list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None]],
dict[str, list[str | bool | int | tuple[str] | None]],
]:
# Handle the case where user_list is empty or does not contain the expected keys and values # Handle the case where user_list is empty or does not contain the expected keys and values
if not user_list: if not user_list:
return {}, {}, {} return {}, {}, {}

View File

@@ -2,9 +2,13 @@ import os, traceback, json
from dotenv import load_dotenv from dotenv import load_dotenv
from time import sleep, perf_counter from time import sleep, perf_counter
from src.emby import Emby
from src.jellyfin import Jellyfin
from src.plex import Plex
from src.library import setup_libraries from src.library import setup_libraries
from src.functions import ( from src.functions import (
logger, logger,
parse_string_to_list,
str_to_bool, str_to_bool,
) )
from src.users import setup_users from src.users import setup_users
@@ -17,7 +21,10 @@ from src.connection import generate_server_connections
load_dotenv(override=True) load_dotenv(override=True)
def should_sync_server(server_1_type, server_2_type): def should_sync_server(
server_1: Plex | Jellyfin | Emby,
server_2: Plex | Jellyfin | Emby,
) -> bool:
sync_from_plex_to_jellyfin = str_to_bool( sync_from_plex_to_jellyfin = str_to_bool(
os.getenv("SYNC_FROM_PLEX_TO_JELLYFIN", "True") os.getenv("SYNC_FROM_PLEX_TO_JELLYFIN", "True")
) )
@@ -40,42 +47,42 @@ def should_sync_server(server_1_type, server_2_type):
) )
sync_from_emby_to_emby = str_to_bool(os.getenv("SYNC_FROM_EMBY_TO_EMBY", "True")) sync_from_emby_to_emby = str_to_bool(os.getenv("SYNC_FROM_EMBY_TO_EMBY", "True"))
if server_1_type == "plex": if isinstance(server_1, Plex):
if server_2_type == "jellyfin" and not sync_from_plex_to_jellyfin: if isinstance(server_2, Jellyfin) and not sync_from_plex_to_jellyfin:
logger("Sync from plex -> jellyfin is disabled", 1) logger("Sync from plex -> jellyfin is disabled", 1)
return False return False
if server_2_type == "emby" and not sync_from_plex_to_emby: if isinstance(server_2, Emby) and not sync_from_plex_to_emby:
logger("Sync from plex -> emby is disabled", 1) logger("Sync from plex -> emby is disabled", 1)
return False return False
if server_2_type == "plex" and not sync_from_plex_to_plex: if isinstance(server_2, Plex) and not sync_from_plex_to_plex:
logger("Sync from plex -> plex is disabled", 1) logger("Sync from plex -> plex is disabled", 1)
return False return False
if server_1_type == "jellyfin": if isinstance(server_1, Jellyfin):
if server_2_type == "plex" and not sync_from_jelly_to_plex: if isinstance(server_2, Plex) and not sync_from_jelly_to_plex:
logger("Sync from jellyfin -> plex is disabled", 1) logger("Sync from jellyfin -> plex is disabled", 1)
return False return False
if server_2_type == "jellyfin" and not sync_from_jelly_to_jellyfin: if isinstance(server_2, Jellyfin) and not sync_from_jelly_to_jellyfin:
logger("Sync from jellyfin -> jellyfin is disabled", 1) logger("Sync from jellyfin -> jellyfin is disabled", 1)
return False return False
if server_2_type == "emby" and not sync_from_jelly_to_emby: if isinstance(server_2, Emby) and not sync_from_jelly_to_emby:
logger("Sync from jellyfin -> emby is disabled", 1) logger("Sync from jellyfin -> emby is disabled", 1)
return False return False
if server_1_type == "emby": if isinstance(server_1, Emby):
if server_2_type == "plex" and not sync_from_emby_to_plex: if isinstance(server_2, Plex) and not sync_from_emby_to_plex:
logger("Sync from emby -> plex is disabled", 1) logger("Sync from emby -> plex is disabled", 1)
return False return False
if server_2_type == "jellyfin" and not sync_from_emby_to_jellyfin: if isinstance(server_2, Jellyfin) and not sync_from_emby_to_jellyfin:
logger("Sync from emby -> jellyfin is disabled", 1) logger("Sync from emby -> jellyfin is disabled", 1)
return False return False
if server_2_type == "emby" and not sync_from_emby_to_emby: if isinstance(server_2, Emby) and not sync_from_emby_to_emby:
logger("Sync from emby -> emby is disabled", 1) logger("Sync from emby -> emby is disabled", 1)
return False return False
@@ -91,24 +98,26 @@ def main_loop():
dryrun = str_to_bool(os.getenv("DRYRUN", "False")) dryrun = str_to_bool(os.getenv("DRYRUN", "False"))
logger(f"Dryrun: {dryrun}", 1) logger(f"Dryrun: {dryrun}", 1)
user_mapping = os.getenv("USER_MAPPING") user_mapping = os.getenv("USER_MAPPING", "")
if user_mapping:
user_mapping = json.loads(user_mapping.lower()) user_mapping = json.loads(user_mapping.lower())
logger(f"User Mapping: {user_mapping}", 1) logger(f"User Mapping: {user_mapping}", 1)
library_mapping = os.getenv("LIBRARY_MAPPING") library_mapping = os.getenv("LIBRARY_MAPPING", "")
if library_mapping:
library_mapping = json.loads(library_mapping) library_mapping = json.loads(library_mapping)
logger(f"Library Mapping: {library_mapping}", 1) logger(f"Library Mapping: {library_mapping}", 1)
# Create (black/white)lists # Create (black/white)lists
logger("Creating (black/white)lists", 1) logger("Creating (black/white)lists", 1)
blacklist_library = os.getenv("BLACKLIST_LIBRARY", None) blacklist_library = parse_string_to_list(os.getenv("BLACKLIST_LIBRARY", None))
whitelist_library = os.getenv("WHITELIST_LIBRARY", None) whitelist_library = parse_string_to_list(os.getenv("WHITELIST_LIBRARY", None))
blacklist_library_type = os.getenv("BLACKLIST_LIBRARY_TYPE", None) blacklist_library_type = parse_string_to_list(
whitelist_library_type = os.getenv("WHITELIST_LIBRARY_TYPE", None) os.getenv("BLACKLIST_LIBRARY_TYPE", None)
blacklist_users = os.getenv("BLACKLIST_USERS", None) )
whitelist_users = os.getenv("WHITELIST_USERS", None) whitelist_library_type = parse_string_to_list(
os.getenv("WHITELIST_LIBRARY_TYPE", None)
)
blacklist_users = parse_string_to_list(os.getenv("BLACKLIST_USERS", None))
whitelist_users = parse_string_to_list(os.getenv("WHITELIST_USERS", None))
( (
blacklist_library, blacklist_library,
@@ -140,13 +149,13 @@ def main_loop():
# Start server_2 at the next server in the list # Start server_2 at the next server in the list
for server_2 in servers[servers.index(server_1) + 1 :]: for server_2 in servers[servers.index(server_1) + 1 :]:
# Check if server 1 and server 2 are going to be synced in either direction, skip if not # Check if server 1 and server 2 are going to be synced in either direction, skip if not
if not should_sync_server( if not should_sync_server(server_1, server_2) and not should_sync_server(
server_1[0], server_2[0] server_2, server_1
) and not should_sync_server(server_2[0], server_1[0]): ):
continue continue
logger(f"Server 1: {server_1[0].capitalize()}: {server_1[1].info()}", 0) logger(f"Server 1: {type(server_1)}: {server_1.info()}", 0)
logger(f"Server 2: {server_2[0].capitalize()}: {server_2[1].info()}", 0) logger(f"Server 2: {type(server_2)}: {server_2.info()}", 0)
# Create users list # Create users list
logger("Creating users list", 1) logger("Creating users list", 1)
@@ -155,8 +164,8 @@ def main_loop():
) )
server_1_libraries, server_2_libraries = setup_libraries( server_1_libraries, server_2_libraries = setup_libraries(
server_1[1], server_1,
server_2[1], server_2,
blacklist_library, blacklist_library,
blacklist_library_type, blacklist_library_type,
whitelist_library, whitelist_library,
@@ -165,14 +174,10 @@ def main_loop():
) )
logger("Creating watched lists", 1) logger("Creating watched lists", 1)
server_1_watched = server_1[1].get_watched( server_1_watched = server_1.get_watched(server_1_users, server_1_libraries)
server_1_users, server_1_libraries
)
logger("Finished creating watched list server 1", 1) logger("Finished creating watched list server 1", 1)
server_2_watched = server_2[1].get_watched( server_2_watched = server_2.get_watched(server_2_users, server_2_libraries)
server_2_users, server_2_libraries
)
logger("Finished creating watched list server 2", 1) logger("Finished creating watched list server 2", 1)
logger(f"Server 1 watched: {server_1_watched}", 3) logger(f"Server 1 watched: {server_1_watched}", 3)
@@ -197,18 +202,18 @@ def main_loop():
1, 1,
) )
if should_sync_server(server_2[0], server_1[0]): if should_sync_server(server_2, server_1):
logger(f"Syncing {server_2[1].info()} -> {server_1[1].info()}", 0) logger(f"Syncing {server_2.info()} -> {server_1.info()}", 0)
server_1[1].update_watched( server_1.update_watched(
server_2_watched_filtered, server_2_watched_filtered,
user_mapping, user_mapping,
library_mapping, library_mapping,
dryrun, dryrun,
) )
if should_sync_server(server_1[0], server_2[0]): if should_sync_server(server_1, server_2):
logger(f"Syncing {server_1[1].info()} -> {server_2[1].info()}", 0) logger(f"Syncing {server_1.info()} -> {server_2.info()}", 0)
server_2[1].update_watched( server_2.update_watched(
server_1_watched_filtered, server_1_watched_filtered,
user_mapping, user_mapping,
library_mapping, library_mapping,
@@ -219,7 +224,7 @@ def main_loop():
def main(): def main():
run_only_once = str_to_bool(os.getenv("RUN_ONLY_ONCE", "False")) run_only_once = str_to_bool(os.getenv("RUN_ONLY_ONCE", "False"))
sleep_duration = float(os.getenv("SLEEP_DURATION", "3600")) sleep_duration = float(os.getenv("SLEEP_DURATION", "3600"))
times = [] times: list[float] = []
while True: while True:
try: try:
start = perf_counter() start = perf_counter()

View File

@@ -205,7 +205,7 @@ def get_user_library_watched(user, user_plex, library):
def find_video(plex_search, video_ids, videos=None): def find_video(plex_search, video_ids, videos=None):
try: try:
if not generate_guids and not generate_locations: if not generate_guids and not generate_locations:
return False, [] return None
if generate_locations: if generate_locations:
for location in plex_search.locations: for location in plex_search.locations:
@@ -226,7 +226,7 @@ def find_video(plex_search, video_ids, videos=None):
for episode in episodes: for episode in episodes:
episode_videos.append(episode) episode_videos.append(episode)
return True, episode_videos return episode_videos
if generate_guids: if generate_guids:
for guid in plex_search.guids: for guid in plex_search.guids:
@@ -244,11 +244,11 @@ def find_video(plex_search, video_ids, videos=None):
for episode in episodes: for episode in episodes:
episode_videos.append(episode) episode_videos.append(episode)
return True, episode_videos return episode_videos
return False, [] return None
except Exception: except Exception:
return False, [] return None
def get_video_status(plex_search, video_ids, videos): def get_video_status(plex_search, video_ids, videos):
@@ -286,31 +286,44 @@ def get_video_status(plex_search, video_ids, videos):
return None return None
def update_user_watched(user, user_plex, library, videos, dryrun): def update_user_watched(user, user_plex, library, watched_videos, dryrun):
try: try:
logger(f"Plex: Updating watched for {user.title} in library {library}", 1) logger(f"Plex: Updating watched for {user.title} in library {library}", 1)
( (
videos_shows_ids, watched_shows_ids,
videos_episodes_ids, watched_episodes_ids,
videos_movies_ids, watched_movies_ids,
) = generate_library_guids_dict(videos) ) = generate_library_guids_dict(watched_videos)
if (
not watched_movies_ids
and not watched_shows_ids
and not watched_episodes_ids
):
logger( logger(
f"Plex: mark list\nShows: {videos_shows_ids}\nEpisodes: {videos_episodes_ids}\nMovies: {videos_movies_ids}", f"Jellyfin: No videos to mark as watched for {user.title} in library {library}",
1,
)
return
logger(
f"Plex: mark list\nShows: {watched_shows_ids}\nEpisodes: {watched_episodes_ids}\nMovies: {watched_movies_ids}",
1, 1,
) )
library_videos = user_plex.library.section(library) library_videos = user_plex.library.section(library)
if videos_movies_ids: if watched_movies_ids:
for movies_search in library_videos.search(unwatched=True): for plex_movie in library_videos.search(unwatched=True):
video_status = get_video_status( watched_movie_status = get_video_status(
movies_search, videos_movies_ids, videos plex_movie, watched_movies_ids, watched_videos
) )
if video_status: if watched_movie_status:
if video_status["completed"]: if watched_movie_status["completed"]:
msg = f"Plex: {movies_search.title} as watched for {user.title} in {library}" msg = f"Plex: {plex_movie.title} as watched for {user.title} in {library}"
if not dryrun: if not dryrun:
logger(msg, 5) logger(msg, 5)
movies_search.markWatched() plex_movie.markWatched()
else: else:
logger(msg, 6) logger(msg, 6)
@@ -319,15 +332,15 @@ def update_user_watched(user, user_plex, library, videos, dryrun):
user_plex.friendlyName, user_plex.friendlyName,
user.title, user.title,
library, library,
movies_search.title, plex_movie.title,
None, None,
None, None,
) )
elif video_status["time"] > 60_000: elif watched_movie_status["time"] > 60_000:
msg = f"Plex: {movies_search.title} as partially watched for {floor(video_status['time'] / 60_000)} minutes for {user.title} in {library}" msg = f"Plex: {plex_movie.title} as partially watched for {floor(watched_movie_status['time'] / 60_000)} minutes for {user.title} in {library}"
if not dryrun: if not dryrun:
logger(msg, 5) logger(msg, 5)
movies_search.updateTimeline(video_status["time"]) plex_movie.updateTimeline(watched_movie_status["time"])
else: else:
logger(msg, 6) logger(msg, 6)
@@ -336,31 +349,33 @@ def update_user_watched(user, user_plex, library, videos, dryrun):
user_plex.friendlyName, user_plex.friendlyName,
user.title, user.title,
library, library,
movies_search.title, plex_movie.title,
duration=video_status["time"], duration=watched_movie_status["time"],
) )
else: else:
logger( logger(
f"Plex: Skipping movie {movies_search.title} as it is not in mark list for {user.title}", f"Plex: Skipping movie {plex_movie.title} as it is not in mark list for {user.title}",
1, 1,
) )
if videos_shows_ids and videos_episodes_ids: if watched_shows_ids and watched_episodes_ids:
for show_search in library_videos.search(unwatched=True): for plex_show in library_videos.search(unwatched=True):
show_found, episode_videos = find_video( watched_show_episodes_status = find_video(
show_search, videos_shows_ids, videos plex_show, watched_shows_ids, watched_videos
) )
if show_found: if watched_show_episodes_status:
for episode_search in show_search.episodes(): for plex_episode in plex_show.episodes():
video_status = get_video_status( watched_episode_status = get_video_status(
episode_search, videos_episodes_ids, episode_videos plex_episode,
watched_episodes_ids,
watched_show_episodes_status,
) )
if video_status: if watched_episode_status:
if video_status["completed"]: if watched_episode_status["completed"]:
msg = f"Plex: {show_search.title} {episode_search.title} as watched for {user.title} in {library}" msg = f"Plex: {plex_show.title} {plex_episode.title} as watched for {user.title} in {library}"
if not dryrun: if not dryrun:
logger(msg, 5) logger(msg, 5)
episode_search.markWatched() plex_episode.markWatched()
else: else:
logger(msg, 6) logger(msg, 6)
@@ -369,14 +384,16 @@ def update_user_watched(user, user_plex, library, videos, dryrun):
user_plex.friendlyName, user_plex.friendlyName,
user.title, user.title,
library, library,
show_search.title, plex_show.title,
episode_search.title, plex_episode.title,
) )
else: else:
msg = f"Plex: {show_search.title} {episode_search.title} as partially watched for {floor(video_status['time'] / 60_000)} minutes for {user.title} in {library}" msg = f"Plex: {plex_show.title} {plex_episode.title} as partially watched for {floor(watched_episode_status['time'] / 60_000)} minutes for {user.title} in {library}"
if not dryrun: if not dryrun:
logger(msg, 5) logger(msg, 5)
episode_search.updateTimeline(video_status["time"]) plex_episode.updateTimeline(
watched_episode_status["time"]
)
else: else:
logger(msg, 6) logger(msg, 6)
@@ -385,27 +402,21 @@ def update_user_watched(user, user_plex, library, videos, dryrun):
user_plex.friendlyName, user_plex.friendlyName,
user.title, user.title,
library, library,
show_search.title, plex_show.title,
episode_search.title, plex_episode.title,
video_status["time"], watched_episode_status["time"],
) )
else: else:
logger( logger(
f"Plex: Skipping episode {episode_search.title} as it is not in mark list for {user.title}", f"Plex: Skipping episode {plex_episode.title} as it is not in mark list for {user.title}",
3, 3,
) )
else: else:
logger( logger(
f"Plex: Skipping show {show_search.title} as it is not in mark list for {user.title}", f"Plex: Skipping show {plex_show.title} as it is not in mark list for {user.title}",
3, 3,
) )
if not videos_movies_ids and not videos_shows_ids and not videos_episodes_ids:
logger(
f"Jellyfin: No videos to mark as watched for {user.title} in library {library}",
1,
)
except Exception as e: except Exception as e:
logger( logger(
f"Plex: Failed to update watched for {user.title} in library {library}, Error: {e}", f"Plex: Failed to update watched for {user.title} in library {library}, Error: {e}",
@@ -477,7 +488,7 @@ class Plex:
logger(f"Plex: Failed to get users, Error: {e}", 2) logger(f"Plex: Failed to get users, Error: {e}", 2)
raise Exception(e) raise Exception(e)
def get_libraries(self): def get_libraries(self) -> dict[str, str]:
try: try:
output = {} output = {}
@@ -545,9 +556,6 @@ class Plex:
user_other = None user_other = None
# If type of user is dict # If type of user is dict
if user_mapping: if user_mapping:
if user in user_mapping.keys():
user_other = user_mapping[user]
elif user in user_mapping.values():
user_other = search_mapping(user_mapping, user) user_other = search_mapping(user_mapping, user)
for index, value in enumerate(self.users): for index, value in enumerate(self.users):
@@ -588,12 +596,9 @@ class Plex:
) )
continue continue
for library, videos in libraries.items(): for library, watched_videos in libraries.items():
library_other = None library_other = None
if library_mapping: if library_mapping:
if library in library_mapping.keys():
library_other = library_mapping[library]
elif library in library_mapping.values():
library_other = search_mapping(library_mapping, library) library_other = search_mapping(library_mapping, library)
# if library in plex library list # if library in plex library list
@@ -627,7 +632,7 @@ class Plex:
user, user,
user_plex, user_plex,
library, library,
videos, watched_videos,
dryrun, dryrun,
] ]
) )

View File

@@ -1,30 +1,36 @@
from plexapi.myplex import MyPlexAccount
from src.emby import Emby
from src.jellyfin import Jellyfin
from src.plex import Plex
from src.functions import ( from src.functions import (
logger, logger,
search_mapping, search_mapping,
) )
def generate_user_list(server): def generate_user_list(server: Plex | Jellyfin | Emby) -> list[str]:
# generate list of users from server 1 and server 2 # generate list of users from server 1 and server 2
server_type = server[0]
server_connection = server[1]
server_users = [] server_users: list[str] = []
if server_type == "plex": if isinstance(server, Plex):
for user in server_connection.users: for user in server.users:
server_users.append( server_users.append(
user.username.lower() if user.username else user.title.lower() user.username.lower() if user.username else user.title.lower()
) )
elif server_type in ["jellyfin", "emby"]: elif isinstance(server, (Jellyfin, Emby)):
server_users = [key.lower() for key in server_connection.users.keys()] server_users = [key.lower() for key in server.users.keys()]
return server_users return server_users
def combine_user_lists(server_1_users, server_2_users, user_mapping): def combine_user_lists(
server_1_users: list[str],
server_2_users: list[str],
user_mapping: dict[str, str] | None,
) -> dict[str, str]:
# combined list of overlapping users from plex and jellyfin # combined list of overlapping users from plex and jellyfin
users = {} users: dict[str, str] = {}
for server_1_user in server_1_users: for server_1_user in server_1_users:
if user_mapping: if user_mapping:
@@ -49,8 +55,10 @@ def combine_user_lists(server_1_users, server_2_users, user_mapping):
return users return users
def filter_user_lists(users, blacklist_users, whitelist_users): def filter_user_lists(
users_filtered = {} users: dict[str, str], blacklist_users: list[str], whitelist_users: list[str]
) -> dict[str, str]:
users_filtered: dict[str, str] = {}
for user in users: for user in users:
# whitelist_user is not empty and user lowercase is not in whitelist lowercase # whitelist_user is not empty and user lowercase is not in whitelist lowercase
if len(whitelist_users) > 0: if len(whitelist_users) > 0:
@@ -64,12 +72,13 @@ def filter_user_lists(users, blacklist_users, whitelist_users):
return users_filtered return users_filtered
def generate_server_users(server, users): def generate_server_users(
server_users = None server: Plex | Jellyfin | Emby,
users: dict[str, str],
if server[0] == "plex": ) -> list[MyPlexAccount] | dict[str, str] | None:
server_users = [] if isinstance(server, Plex):
for plex_user in server[1].users: plex_server_users: list[MyPlexAccount] = []
for plex_user in server.users:
username_title = ( username_title = (
plex_user.username if plex_user.username else plex_user.title plex_user.username if plex_user.username else plex_user.title
) )
@@ -78,22 +87,30 @@ def generate_server_users(server, users):
username_title.lower() in users.keys() username_title.lower() in users.keys()
or username_title.lower() in users.values() or username_title.lower() in users.values()
): ):
server_users.append(plex_user) plex_server_users.append(plex_user)
elif server[0] in ["jellyfin", "emby"]:
server_users = {} return plex_server_users
for jellyfin_user, jellyfin_id in server[1].users.items(): elif isinstance(server, (Jellyfin, Emby)):
jelly_emby_server_users: dict[str, str] = {}
for jellyfin_user, jellyfin_id in server.users.items():
if ( if (
jellyfin_user.lower() in users.keys() jellyfin_user.lower() in users.keys()
or jellyfin_user.lower() in users.values() or jellyfin_user.lower() in users.values()
): ):
server_users[jellyfin_user] = jellyfin_id jelly_emby_server_users[jellyfin_user] = jellyfin_id
return server_users return jelly_emby_server_users
return None
def setup_users( def setup_users(
server_1, server_2, blacklist_users, whitelist_users, user_mapping=None server_1: Plex | Jellyfin | Emby,
): server_2: Plex | Jellyfin | Emby,
blacklist_users: list[str],
whitelist_users: list[str],
user_mapping: dict[str, str] | None = None,
) -> tuple[list[MyPlexAccount] | dict[str, str], list[MyPlexAccount] | dict[str, str]]:
server_1_users = generate_user_list(server_1) server_1_users = generate_user_list(server_1)
server_2_users = generate_user_list(server_2) server_2_users = generate_user_list(server_2)
logger(f"Server 1 users: {server_1_users}", 1) logger(f"Server 1 users: {server_1_users}", 1)
@@ -111,12 +128,12 @@ def setup_users(
# Check if users is none or empty # Check if users is none or empty
if output_server_1_users is None or len(output_server_1_users) == 0: if output_server_1_users is None or len(output_server_1_users) == 0:
logger( logger(
f"No users found for server 1 {server_1[0]}, users: {server_1_users}, overlapping users {users}, filtered users {users_filtered}, server 1 users {server_1[1].users}" f"No users found for server 1 {type(server_1)}, users: {server_1_users}, overlapping users {users}, filtered users {users_filtered}, server 1 users {server_1.users}"
) )
if output_server_2_users is None or len(output_server_2_users) == 0: if output_server_2_users is None or len(output_server_2_users) == 0:
logger( logger(
f"No users found for server 2 {server_2[0]}, users: {server_2_users}, overlapping users {users} filtered users {users_filtered}, server 2 users {server_2[1].users}" f"No users found for server 2 {type(server_2)}, users: {server_2_users}, overlapping users {users} filtered users {users_filtered}, server 2 users {server_2.users}"
) )
if ( if (

View File

@@ -18,12 +18,12 @@ from src.black_white import setup_black_white_lists
def test_setup_black_white_lists(): def test_setup_black_white_lists():
# Simple # Simple
blacklist_library = "library1, library2" blacklist_library = ["library1", "library2"]
whitelist_library = "library1, library2" whitelist_library = ["library1", "library2"]
blacklist_library_type = "library_type1, library_type2" blacklist_library_type = ["library_type1", "library_type2"]
whitelist_library_type = "library_type1, library_type2" whitelist_library_type = ["library_type1", "library_type2"]
blacklist_users = "user1, user2" blacklist_users = ["user1", "user2"]
whitelist_users = "user1, user2" whitelist_users = ["user1", "user2"]
( (
results_blacklist_library, results_blacklist_library,
@@ -48,6 +48,15 @@ def test_setup_black_white_lists():
assert return_blacklist_users == ["user1", "user2"] assert return_blacklist_users == ["user1", "user2"]
assert return_whitelist_users == ["user1", "user2"] assert return_whitelist_users == ["user1", "user2"]
def test_library_mapping_black_white_list():
blacklist_library = ["library1", "library2"]
whitelist_library = ["library1", "library2"]
blacklist_library_type = ["library_type1", "library_type2"]
whitelist_library_type = ["library_type1", "library_type2"]
blacklist_users = ["user1", "user2"]
whitelist_users = ["user1", "user2"]
# Library Mapping and user mapping # Library Mapping and user mapping
library_mapping = {"library1": "library3"} library_mapping = {"library1": "library3"}
user_mapping = {"user1": "user3"} user_mapping = {"user1": "user3"}

View File

@@ -1,78 +0,0 @@
import sys
import os
# getting the name of the directory
# where the this file is present.
current = os.path.dirname(os.path.realpath(__file__))
# Getting the parent directory name
# where the current directory is present.
parent = os.path.dirname(current)
# adding the parent directory to
# the sys.path.
sys.path.append(parent)
from src.black_white import setup_black_white_lists
def test_setup_black_white_lists():
# Simple
blacklist_library = "library1, library2"
whitelist_library = "library1, library2"
blacklist_library_type = "library_type1, library_type2"
whitelist_library_type = "library_type1, library_type2"
blacklist_users = "user1, user2"
whitelist_users = "user1, user2"
(
results_blacklist_library,
return_whitelist_library,
return_blacklist_library_type,
return_whitelist_library_type,
return_blacklist_users,
return_whitelist_users,
) = setup_black_white_lists(
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
blacklist_users,
whitelist_users,
)
assert results_blacklist_library == ["library1", "library2"]
assert return_whitelist_library == ["library1", "library2"]
assert return_blacklist_library_type == ["library_type1", "library_type2"]
assert return_whitelist_library_type == ["library_type1", "library_type2"]
assert return_blacklist_users == ["user1", "user2"]
assert return_whitelist_users == ["user1", "user2"]
# Library Mapping and user mapping
library_mapping = {"library1": "library3"}
user_mapping = {"user1": "user3"}
(
results_blacklist_library,
return_whitelist_library,
return_blacklist_library_type,
return_whitelist_library_type,
return_blacklist_users,
return_whitelist_users,
) = setup_black_white_lists(
blacklist_library,
whitelist_library,
blacklist_library_type,
whitelist_library_type,
blacklist_users,
whitelist_users,
library_mapping,
user_mapping,
)
assert results_blacklist_library == ["library1", "library2", "library3"]
assert return_whitelist_library == ["library1", "library2", "library3"]
assert return_blacklist_library_type == ["library_type1", "library_type2"]
assert return_whitelist_library_type == ["library_type1", "library_type2"]
assert return_blacklist_users == ["user1", "user2", "user3"]
assert return_whitelist_users == ["user1", "user2", "user3"]

View File

@@ -128,8 +128,7 @@ def main():
expected_locations = expected_emby + expected_plex + expected_jellyfin expected_locations = expected_emby + expected_plex + expected_jellyfin
# Remove Custom Movies/TV Shows as they should not have guids # Remove Custom Movies/TV Shows as they should not have guids
expected_guids = [item for item in expected_locations if "Custom" not in item ] expected_guids = [item for item in expected_locations if "Custom" not in item]
expected_write = [ expected_write = [
"Plex/JellyPlex-CI/jellyplex_watched/Custom Movies/Movie Two (2021)", "Plex/JellyPlex-CI/jellyplex_watched/Custom Movies/Movie Two (2021)",
@@ -171,7 +170,7 @@ def main():
"Jellyfin/Jellyfin-Server/JellyUser/Custom Movies/Movie Three (2022)", "Jellyfin/Jellyfin-Server/JellyUser/Custom Movies/Movie Three (2022)",
"Jellyfin/Jellyfin-Server/JellyUser/Custom TV Shows/Greatest Show Ever (3000)/S01E03", "Jellyfin/Jellyfin-Server/JellyUser/Custom TV Shows/Greatest Show Ever (3000)/S01E03",
"Jellyfin/Jellyfin-Server/JellyUser/Movies/Tears of Steel", "Jellyfin/Jellyfin-Server/JellyUser/Movies/Tears of Steel",
"Jellyfin/Jellyfin-Server/JellyUser/Shows/Monarch: Legacy of Monsters/Parallels and Interiors/4" "Jellyfin/Jellyfin-Server/JellyUser/Shows/Monarch: Legacy of Monsters/Parallels and Interiors/4",
] ]
# Expected values for the mark.log file, dry-run is slightly different than write-run # Expected values for the mark.log file, dry-run is slightly different than write-run