Add lots of static typing
Signed-off-by: Luis Garcia <git@luigi311.com>
This commit is contained in:
@@ -2,14 +2,14 @@ from src.functions import logger, search_mapping
|
||||
|
||||
|
||||
def setup_black_white_lists(
|
||||
blacklist_library: str,
|
||||
whitelist_library: str,
|
||||
blacklist_library_type: str,
|
||||
whitelist_library_type: str,
|
||||
blacklist_users: str,
|
||||
whitelist_users: str,
|
||||
library_mapping=None,
|
||||
user_mapping=None,
|
||||
blacklist_library: list[str] | None,
|
||||
whitelist_library: list[str] | None,
|
||||
blacklist_library_type: list[str] | None,
|
||||
whitelist_library_type: list[str] | None,
|
||||
blacklist_users: list[str] | None,
|
||||
whitelist_users: list[str] | None,
|
||||
library_mapping: dict[str, str] | None = None,
|
||||
user_mapping: dict[str, str] | None = None,
|
||||
):
|
||||
blacklist_library, blacklist_library_type, blacklist_users = setup_x_lists(
|
||||
blacklist_library,
|
||||
@@ -40,53 +40,44 @@ def setup_black_white_lists(
|
||||
|
||||
|
||||
def setup_x_lists(
|
||||
xlist_library,
|
||||
xlist_library_type,
|
||||
xlist_users,
|
||||
xlist_type,
|
||||
library_mapping=None,
|
||||
user_mapping=None,
|
||||
):
|
||||
xlist_library: list[str] | None,
|
||||
xlist_library_type: list[str] | None,
|
||||
xlist_users: list[str] | None,
|
||||
xlist_type: str | None,
|
||||
library_mapping: dict[str, str] | None = None,
|
||||
user_mapping: dict[str, str] | None = None,
|
||||
) -> tuple[list[str], list[str], list[str]]:
|
||||
out_library: list[str] = []
|
||||
if xlist_library:
|
||||
if len(xlist_library) > 0:
|
||||
xlist_library = xlist_library.split(",")
|
||||
xlist_library = [x.strip() for x in xlist_library]
|
||||
if library_mapping:
|
||||
temp_library = []
|
||||
for library in xlist_library:
|
||||
library_other = search_mapping(library_mapping, library)
|
||||
if library_other:
|
||||
temp_library.append(library_other)
|
||||
out_library = [x.strip() for x in xlist_library]
|
||||
if library_mapping:
|
||||
temp_library: list[str] = []
|
||||
for library in xlist_library:
|
||||
library_other = search_mapping(library_mapping, library)
|
||||
if library_other:
|
||||
temp_library.append(library_other)
|
||||
|
||||
xlist_library = xlist_library + temp_library
|
||||
else:
|
||||
xlist_library = []
|
||||
logger(f"{xlist_type}list Library: {xlist_library}", 1)
|
||||
out_library = out_library + temp_library
|
||||
logger(f"{xlist_type}list Library: {xlist_library}", 1)
|
||||
|
||||
out_library_type: list[str] = []
|
||||
if xlist_library_type:
|
||||
if len(xlist_library_type) > 0:
|
||||
xlist_library_type = xlist_library_type.split(",")
|
||||
xlist_library_type = [x.lower().strip() for x in xlist_library_type]
|
||||
else:
|
||||
xlist_library_type = []
|
||||
logger(f"{xlist_type}list Library Type: {xlist_library_type}", 1)
|
||||
out_library_type = [x.lower().strip() for x in xlist_library_type]
|
||||
|
||||
logger(f"{xlist_type}list Library Type: {out_library_type}", 1)
|
||||
|
||||
out_users: list[str] = []
|
||||
if xlist_users:
|
||||
if len(xlist_users) > 0:
|
||||
xlist_users = xlist_users.split(",")
|
||||
xlist_users = [x.lower().strip() for x in xlist_users]
|
||||
if user_mapping:
|
||||
temp_users = []
|
||||
for user in xlist_users:
|
||||
user_other = search_mapping(user_mapping, user)
|
||||
if user_other:
|
||||
temp_users.append(user_other)
|
||||
out_users = [x.lower().strip() for x in xlist_users]
|
||||
if user_mapping:
|
||||
temp_users: list[str] = []
|
||||
for user in out_users:
|
||||
user_other = search_mapping(user_mapping, user)
|
||||
if user_other:
|
||||
temp_users.append(user_other)
|
||||
|
||||
xlist_users = xlist_users + temp_users
|
||||
else:
|
||||
xlist_users = []
|
||||
else:
|
||||
xlist_users = []
|
||||
logger(f"{xlist_type}list Users: {xlist_users}", 1)
|
||||
out_users = out_users + temp_users
|
||||
|
||||
return xlist_library, xlist_library_type, xlist_users
|
||||
logger(f"{xlist_type}list Users: {out_users}", 1)
|
||||
|
||||
return out_library, out_library_type, out_users
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
from typing import Literal
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from src.functions import logger, str_to_bool
|
||||
@@ -9,24 +10,26 @@ from src.emby import Emby
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
def jellyfin_emby_server_connection(server_baseurl, server_token, server_type):
|
||||
servers = []
|
||||
def jellyfin_emby_server_connection(
|
||||
server_baseurl: str, server_token: str, server_type: Literal["jellyfin", "emby"]
|
||||
) -> list[tuple[Literal["jellyfin", "emby"], Jellyfin | Emby]]:
|
||||
servers: list[tuple[Literal["jellyfin", "emby"], Jellyfin | Emby]] = []
|
||||
|
||||
server_baseurl = server_baseurl.split(",")
|
||||
server_token = server_token.split(",")
|
||||
server_baseurls = server_baseurl.split(",")
|
||||
server_tokens = server_token.split(",")
|
||||
|
||||
if len(server_baseurl) != len(server_token):
|
||||
if len(server_baseurls) != len(server_tokens):
|
||||
raise Exception(
|
||||
f"{server_type.upper()}_BASEURL and {server_type.upper()}_TOKEN must have the same number of entries"
|
||||
)
|
||||
|
||||
for i, baseurl in enumerate(server_baseurl):
|
||||
for i, baseurl in enumerate(server_baseurls):
|
||||
baseurl = baseurl.strip()
|
||||
if baseurl[-1] == "/":
|
||||
baseurl = baseurl[:-1]
|
||||
|
||||
if server_type == "jellyfin":
|
||||
server = Jellyfin(baseurl=baseurl, token=server_token[i].strip())
|
||||
server = Jellyfin(baseurl=baseurl, token=server_tokens[i].strip())
|
||||
servers.append(
|
||||
(
|
||||
"jellyfin",
|
||||
@@ -35,7 +38,7 @@ def jellyfin_emby_server_connection(server_baseurl, server_token, server_type):
|
||||
)
|
||||
|
||||
elif server_type == "emby":
|
||||
server = Emby(baseurl=baseurl, token=server_token[i].strip())
|
||||
server = Emby(baseurl=baseurl, token=server_tokens[i].strip())
|
||||
servers.append(
|
||||
(
|
||||
"emby",
|
||||
@@ -50,8 +53,12 @@ def jellyfin_emby_server_connection(server_baseurl, server_token, server_type):
|
||||
return servers
|
||||
|
||||
|
||||
def generate_server_connections():
|
||||
servers = []
|
||||
def generate_server_connections() -> (
|
||||
list[tuple[Literal["plex", "jellyfin", "emby"], Plex | Jellyfin | Emby]]
|
||||
):
|
||||
servers: list[
|
||||
tuple[Literal["plex", "jellyfin", "emby"], Plex | Jellyfin | Emby]
|
||||
] = []
|
||||
|
||||
plex_baseurl = os.getenv("PLEX_BASEURL", None)
|
||||
plex_token = os.getenv("PLEX_TOKEN", None)
|
||||
@@ -120,7 +127,6 @@ def generate_server_connections():
|
||||
|
||||
jellyfin_baseurl = os.getenv("JELLYFIN_BASEURL", None)
|
||||
jellyfin_token = os.getenv("JELLYFIN_TOKEN", None)
|
||||
|
||||
if jellyfin_baseurl and jellyfin_token:
|
||||
servers.extend(
|
||||
jellyfin_emby_server_connection(
|
||||
@@ -130,8 +136,8 @@ def generate_server_connections():
|
||||
|
||||
emby_baseurl = os.getenv("EMBY_BASEURL", None)
|
||||
emby_token = os.getenv("EMBY_TOKEN", None)
|
||||
|
||||
if emby_baseurl and emby_token:
|
||||
|
||||
servers.extend(
|
||||
jellyfin_emby_server_connection(emby_baseurl, emby_token, "emby")
|
||||
)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from src.jellyfin_emby import JellyfinEmby
|
||||
from packaging.version import (parse, Version)
|
||||
from packaging.version import parse, Version
|
||||
|
||||
|
||||
class Emby(JellyfinEmby):
|
||||
|
||||
@@ -62,7 +62,7 @@ def log_marked(
|
||||
|
||||
# Reimplementation of distutils.util.strtobool due to it being deprecated
|
||||
# Source: https://github.com/PostHog/posthog/blob/01e184c29d2c10c43166f1d40a334abbc3f99d8a/posthog/utils.py#L668
|
||||
def str_to_bool(value: any) -> bool:
|
||||
def str_to_bool(value: str) -> bool:
|
||||
if not value:
|
||||
return False
|
||||
return str(value).lower() in ("y", "yes", "t", "true", "on", "1")
|
||||
@@ -84,7 +84,7 @@ def contains_nested(element, lst):
|
||||
|
||||
|
||||
# Get mapped value
|
||||
def search_mapping(dictionary: dict, key_value: str):
|
||||
def search_mapping(dictionary: dict[str, str], key_value: str) -> str | None:
|
||||
if key_value in dictionary.keys():
|
||||
return dictionary[key_value]
|
||||
elif key_value.lower() in dictionary.keys():
|
||||
@@ -100,8 +100,10 @@ def search_mapping(dictionary: dict, key_value: str):
|
||||
|
||||
|
||||
# Return list of objects that exist in both lists including mappings
|
||||
def match_list(list1, list2, list_mapping=None):
|
||||
output = []
|
||||
def match_list(
|
||||
list1: list[str], list2: list[str], list_mapping: dict[str, str] | None = None
|
||||
) -> list[str]:
|
||||
output: list[str] = []
|
||||
for element in list1:
|
||||
if element in list2:
|
||||
output.append(element)
|
||||
@@ -146,3 +148,11 @@ def future_thread_executor(
|
||||
raise Exception(e)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def parse_string_to_list(string: str | None) -> list[str]:
|
||||
output: list[str] = []
|
||||
if string and len(string) > 0:
|
||||
output = string.split(",")
|
||||
|
||||
return output
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from src.jellyfin_emby import JellyfinEmby
|
||||
from packaging.version import (parse, Version)
|
||||
from packaging.version import parse, Version
|
||||
|
||||
|
||||
class Jellyfin(JellyfinEmby):
|
||||
|
||||
@@ -2,10 +2,10 @@
|
||||
|
||||
import traceback, os
|
||||
from math import floor
|
||||
from typing import Union
|
||||
from typing import Literal
|
||||
from dotenv import load_dotenv
|
||||
import requests
|
||||
from packaging.version import (parse, Version)
|
||||
from packaging.version import parse, Version
|
||||
|
||||
from src.functions import (
|
||||
logger,
|
||||
@@ -22,39 +22,6 @@ generate_guids = str_to_bool(os.getenv("GENERATE_GUIDS", "True"))
|
||||
generate_locations = str_to_bool(os.getenv("GENERATE_LOCATIONS", "True"))
|
||||
|
||||
|
||||
def get_guids(server_type, item):
|
||||
if item.get("Name"):
|
||||
guids = {"title": item.get("Name")}
|
||||
else:
|
||||
logger(f"{server_type}: Name not found in {item.get('Id')}", 1)
|
||||
guids = {"title": None}
|
||||
|
||||
if "ProviderIds" in item:
|
||||
guids.update({k.lower(): v for k, v in item["ProviderIds"].items()})
|
||||
else:
|
||||
logger(f"{server_type}: ProviderIds not found in {item.get('Name')}", 1)
|
||||
|
||||
if "MediaSources" in item:
|
||||
guids["locations"] = tuple(
|
||||
[x["Path"].split("/")[-1] for x in item["MediaSources"] if "Path" in x]
|
||||
)
|
||||
else:
|
||||
logger(f"{server_type}: MediaSources not found in {item.get('Name')}", 1)
|
||||
guids["locations"] = tuple()
|
||||
|
||||
if "UserData" in item:
|
||||
guids["status"] = {
|
||||
"completed": item["UserData"]["Played"],
|
||||
# Convert ticks to milliseconds to match Plex
|
||||
"time": floor(item["UserData"]["PlaybackPositionTicks"] / 10000),
|
||||
}
|
||||
else:
|
||||
logger(f"{server_type}: UserData not found in {item.get('Name')}", 1)
|
||||
guids["status"] = {}
|
||||
|
||||
return guids
|
||||
|
||||
|
||||
def get_video_status(server_video, videos_ids, videos):
|
||||
video_status = None
|
||||
|
||||
@@ -104,7 +71,13 @@ def get_video_status(server_video, videos_ids, videos):
|
||||
|
||||
|
||||
class JellyfinEmby:
|
||||
def __init__(self, server_type, baseurl, token, headers):
|
||||
def __init__(
|
||||
self,
|
||||
server_type: Literal["Jellyfin", "Emby"],
|
||||
baseurl: str,
|
||||
token: str,
|
||||
headers: dict[str, str],
|
||||
):
|
||||
if server_type not in ["Jellyfin", "Emby"]:
|
||||
raise Exception(f"Server type {server_type} not supported")
|
||||
self.server_type = server_type
|
||||
@@ -123,7 +96,13 @@ class JellyfinEmby:
|
||||
self.users = self.get_users()
|
||||
self.server_name = self.info(name_only=True)
|
||||
|
||||
def query(self, query, query_type, identifiers=None, json=None):
|
||||
def query(
|
||||
self,
|
||||
query: str,
|
||||
query_type: Literal["get", "post"],
|
||||
identifiers: dict[str, str] | None = None,
|
||||
json: dict | None = None,
|
||||
) -> dict | list[dict]:
|
||||
try:
|
||||
results = None
|
||||
|
||||
@@ -173,19 +152,21 @@ class JellyfinEmby:
|
||||
)
|
||||
raise Exception(e)
|
||||
|
||||
def info(self, name_only: bool = False, version_only: bool = False) -> Union[str | Version]:
|
||||
def info(
|
||||
self, name_only: bool = False, version_only: bool = False
|
||||
) -> str | Version | None:
|
||||
try:
|
||||
query_string = "/System/Info/Public"
|
||||
|
||||
response = self.query(query_string, "get")
|
||||
response: dict = self.query(query_string, "get")
|
||||
|
||||
if response:
|
||||
if name_only:
|
||||
return f"{response['ServerName']}"
|
||||
return response.get("ServerName")
|
||||
elif version_only:
|
||||
return parse(response["Version"])
|
||||
|
||||
return f"{self.server_type} {response['ServerName']}: {response['Version']}"
|
||||
return parse(response.get("Version"))
|
||||
|
||||
return f"{self.server_type} {response.get('ServerName')}: {response.get('Version')}"
|
||||
else:
|
||||
return None
|
||||
|
||||
@@ -193,7 +174,7 @@ class JellyfinEmby:
|
||||
logger(f"{self.server_type}: Get server name failed {e}", 2)
|
||||
raise Exception(e)
|
||||
|
||||
def get_users(self):
|
||||
def get_users(self) -> dict[str, str]:
|
||||
try:
|
||||
users = {}
|
||||
|
||||
@@ -210,7 +191,45 @@ class JellyfinEmby:
|
||||
logger(f"{self.server_type}: Get users failed {e}", 2)
|
||||
raise Exception(e)
|
||||
|
||||
def get_libraries(self):
|
||||
def get_guids(self, item: dict):
|
||||
guids: dict[str, str | tuple[str] | dict[str, bool | int]] = {}
|
||||
|
||||
if item.get("Name"):
|
||||
guids["title"] = item.get("Name")
|
||||
else:
|
||||
logger(f"{self.server_type}: Name not found in {item.get('Id')}", 1)
|
||||
guids["title"] = None
|
||||
|
||||
if "ProviderIds" in item:
|
||||
guids.update({k.lower(): v for k, v in item["ProviderIds"].items()})
|
||||
else:
|
||||
logger(
|
||||
f"{self.server_type}: ProviderIds not found in {item.get('Name')}", 1
|
||||
)
|
||||
|
||||
if "MediaSources" in item:
|
||||
guids["locations"] = tuple(
|
||||
[x["Path"].split("/")[-1] for x in item["MediaSources"] if "Path" in x]
|
||||
)
|
||||
else:
|
||||
logger(
|
||||
f"{self.server_type}: MediaSources not found in {item.get('Name')}", 1
|
||||
)
|
||||
guids["locations"] = tuple()
|
||||
|
||||
if "UserData" in item:
|
||||
guids["status"] = {
|
||||
"completed": item["UserData"]["Played"],
|
||||
# Convert ticks to milliseconds to match Plex
|
||||
"time": floor(item["UserData"]["PlaybackPositionTicks"] / 10000),
|
||||
}
|
||||
else:
|
||||
logger(f"{self.server_type}: UserData not found in {item.get('Name')}", 1)
|
||||
guids["status"] = {}
|
||||
|
||||
return guids
|
||||
|
||||
def get_libraries(self) -> dict[str, str]:
|
||||
try:
|
||||
libraries = {}
|
||||
|
||||
@@ -218,7 +237,7 @@ class JellyfinEmby:
|
||||
users = self.get_users()
|
||||
|
||||
for _, user_id in users.items():
|
||||
user_libraries = self.query(f"/Users/{user_id}/Views", "get")
|
||||
user_libraries: dict = self.query(f"/Users/{user_id}/Views", "get")
|
||||
for library in user_libraries["Items"]:
|
||||
library_id = library["Id"]
|
||||
library_title = library["Name"]
|
||||
@@ -299,7 +318,7 @@ class JellyfinEmby:
|
||||
)
|
||||
|
||||
# Get the movie's GUIDs
|
||||
movie_guids = get_guids(self.server_type, movie)
|
||||
movie_guids = self.get_guids(movie)
|
||||
|
||||
# Append the movie dictionary to the list for the given user and library
|
||||
user_watched[library_title].append(movie_guids)
|
||||
@@ -370,7 +389,7 @@ class JellyfinEmby:
|
||||
episode["UserData"]["Played"] == True
|
||||
or episode["UserData"]["PlaybackPositionTicks"] > 600000000
|
||||
):
|
||||
episode_guids = get_guids(self.server_type, episode)
|
||||
episode_guids = self.get_guids(episode)
|
||||
mark_episodes_list.append(episode_guids)
|
||||
|
||||
if mark_episodes_list:
|
||||
|
||||
39
src/main.py
39
src/main.py
@@ -1,10 +1,12 @@
|
||||
import os, traceback, json
|
||||
from typing import Literal
|
||||
from dotenv import load_dotenv
|
||||
from time import sleep, perf_counter
|
||||
|
||||
from src.library import setup_libraries
|
||||
from src.functions import (
|
||||
logger,
|
||||
parse_string_to_list,
|
||||
str_to_bool,
|
||||
)
|
||||
from src.users import setup_users
|
||||
@@ -17,7 +19,10 @@ from src.connection import generate_server_connections
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
def should_sync_server(server_1_type, server_2_type):
|
||||
def should_sync_server(
|
||||
server_1_type: Literal["plex", "jellyfin", "emby"],
|
||||
server_2_type: Literal["plex", "jellyfin", "emby"],
|
||||
) -> bool:
|
||||
sync_from_plex_to_jellyfin = str_to_bool(
|
||||
os.getenv("SYNC_FROM_PLEX_TO_JELLYFIN", "True")
|
||||
)
|
||||
@@ -91,24 +96,26 @@ def main_loop():
|
||||
dryrun = str_to_bool(os.getenv("DRYRUN", "False"))
|
||||
logger(f"Dryrun: {dryrun}", 1)
|
||||
|
||||
user_mapping = os.getenv("USER_MAPPING")
|
||||
if user_mapping:
|
||||
user_mapping = json.loads(user_mapping.lower())
|
||||
logger(f"User Mapping: {user_mapping}", 1)
|
||||
user_mapping = os.getenv("USER_MAPPING", "")
|
||||
user_mapping = json.loads(user_mapping.lower())
|
||||
logger(f"User Mapping: {user_mapping}", 1)
|
||||
|
||||
library_mapping = os.getenv("LIBRARY_MAPPING")
|
||||
if library_mapping:
|
||||
library_mapping = json.loads(library_mapping)
|
||||
logger(f"Library Mapping: {library_mapping}", 1)
|
||||
library_mapping = os.getenv("LIBRARY_MAPPING", "")
|
||||
library_mapping = json.loads(library_mapping)
|
||||
logger(f"Library Mapping: {library_mapping}", 1)
|
||||
|
||||
# Create (black/white)lists
|
||||
logger("Creating (black/white)lists", 1)
|
||||
blacklist_library = os.getenv("BLACKLIST_LIBRARY", None)
|
||||
whitelist_library = os.getenv("WHITELIST_LIBRARY", None)
|
||||
blacklist_library_type = os.getenv("BLACKLIST_LIBRARY_TYPE", None)
|
||||
whitelist_library_type = os.getenv("WHITELIST_LIBRARY_TYPE", None)
|
||||
blacklist_users = os.getenv("BLACKLIST_USERS", None)
|
||||
whitelist_users = os.getenv("WHITELIST_USERS", None)
|
||||
blacklist_library = parse_string_to_list(os.getenv("BLACKLIST_LIBRARY", None))
|
||||
whitelist_library = parse_string_to_list(os.getenv("WHITELIST_LIBRARY", None))
|
||||
blacklist_library_type = parse_string_to_list(
|
||||
os.getenv("BLACKLIST_LIBRARY_TYPE", None)
|
||||
)
|
||||
whitelist_library_type = parse_string_to_list(
|
||||
os.getenv("WHITELIST_LIBRARY_TYPE", None)
|
||||
)
|
||||
blacklist_users = parse_string_to_list(os.getenv("BLACKLIST_USERS", None))
|
||||
whitelist_users = parse_string_to_list(os.getenv("WHITELIST_USERS", None))
|
||||
|
||||
(
|
||||
blacklist_library,
|
||||
@@ -219,7 +226,7 @@ def main_loop():
|
||||
def main():
|
||||
run_only_once = str_to_bool(os.getenv("RUN_ONLY_ONCE", "False"))
|
||||
sleep_duration = float(os.getenv("SLEEP_DURATION", "3600"))
|
||||
times = []
|
||||
times: list[float] = []
|
||||
while True:
|
||||
try:
|
||||
start = perf_counter()
|
||||
|
||||
14
src/plex.py
14
src/plex.py
@@ -295,7 +295,11 @@ def update_user_watched(user, user_plex, library, watched_videos, dryrun):
|
||||
watched_movies_ids,
|
||||
) = generate_library_guids_dict(watched_videos)
|
||||
|
||||
if not watched_movies_ids and not watched_shows_ids and not watched_episodes_ids:
|
||||
if (
|
||||
not watched_movies_ids
|
||||
and not watched_shows_ids
|
||||
and not watched_episodes_ids
|
||||
):
|
||||
logger(
|
||||
f"Jellyfin: No videos to mark as watched for {user.title} in library {library}",
|
||||
1,
|
||||
@@ -362,7 +366,9 @@ def update_user_watched(user, user_plex, library, watched_videos, dryrun):
|
||||
if watched_show_episodes_status:
|
||||
for plex_episode in plex_show.episodes():
|
||||
watched_episode_status = get_video_status(
|
||||
plex_episode, watched_episodes_ids, watched_show_episodes_status
|
||||
plex_episode,
|
||||
watched_episodes_ids,
|
||||
watched_show_episodes_status,
|
||||
)
|
||||
if watched_episode_status:
|
||||
if watched_episode_status["completed"]:
|
||||
@@ -385,7 +391,9 @@ def update_user_watched(user, user_plex, library, watched_videos, dryrun):
|
||||
msg = f"Plex: {plex_show.title} {plex_episode.title} as partially watched for {floor(watched_episode_status['time'] / 60_000)} minutes for {user.title} in {library}"
|
||||
if not dryrun:
|
||||
logger(msg, 5)
|
||||
plex_episode.updateTimeline(watched_episode_status["time"])
|
||||
plex_episode.updateTimeline(
|
||||
watched_episode_status["time"]
|
||||
)
|
||||
else:
|
||||
logger(msg, 6)
|
||||
|
||||
|
||||
54
src/users.py
54
src/users.py
@@ -1,15 +1,22 @@
|
||||
from typing import Literal
|
||||
from plexapi.myplex import MyPlexAccount
|
||||
from src.emby import Emby
|
||||
from src.jellyfin import Jellyfin
|
||||
from src.plex import Plex
|
||||
from src.functions import (
|
||||
logger,
|
||||
search_mapping,
|
||||
)
|
||||
|
||||
|
||||
def generate_user_list(server):
|
||||
def generate_user_list(
|
||||
server: tuple[Literal["plex", "jellyfin", "emby"], Plex | Jellyfin | Emby]
|
||||
) -> list[str]:
|
||||
# generate list of users from server 1 and server 2
|
||||
server_type = server[0]
|
||||
server_connection = server[1]
|
||||
|
||||
server_users = []
|
||||
server_users: list[str] = []
|
||||
if server_type == "plex":
|
||||
for user in server_connection.users:
|
||||
server_users.append(
|
||||
@@ -22,9 +29,13 @@ def generate_user_list(server):
|
||||
return server_users
|
||||
|
||||
|
||||
def combine_user_lists(server_1_users, server_2_users, user_mapping):
|
||||
def combine_user_lists(
|
||||
server_1_users: list[str],
|
||||
server_2_users: list[str],
|
||||
user_mapping: dict[str, str] | None,
|
||||
) -> dict[str, str]:
|
||||
# combined list of overlapping users from plex and jellyfin
|
||||
users = {}
|
||||
users: dict[str, str] = {}
|
||||
|
||||
for server_1_user in server_1_users:
|
||||
if user_mapping:
|
||||
@@ -49,8 +60,10 @@ def combine_user_lists(server_1_users, server_2_users, user_mapping):
|
||||
return users
|
||||
|
||||
|
||||
def filter_user_lists(users, blacklist_users, whitelist_users):
|
||||
users_filtered = {}
|
||||
def filter_user_lists(
|
||||
users: dict[str, str], blacklist_users: list[str], whitelist_users: list[str]
|
||||
) -> dict[str, str]:
|
||||
users_filtered: dict[str, str] = {}
|
||||
for user in users:
|
||||
# whitelist_user is not empty and user lowercase is not in whitelist lowercase
|
||||
if len(whitelist_users) > 0:
|
||||
@@ -64,11 +77,12 @@ def filter_user_lists(users, blacklist_users, whitelist_users):
|
||||
return users_filtered
|
||||
|
||||
|
||||
def generate_server_users(server, users):
|
||||
server_users = None
|
||||
|
||||
def generate_server_users(
|
||||
server: tuple[Literal["plex", "jellyfin", "emby"], Plex | Jellyfin | Emby],
|
||||
users: dict[str, str],
|
||||
) -> list[MyPlexAccount] | dict[str, str] | None:
|
||||
if server[0] == "plex":
|
||||
server_users = []
|
||||
plex_server_users: list[MyPlexAccount] = []
|
||||
for plex_user in server[1].users:
|
||||
username_title = (
|
||||
plex_user.username if plex_user.username else plex_user.title
|
||||
@@ -78,22 +92,30 @@ def generate_server_users(server, users):
|
||||
username_title.lower() in users.keys()
|
||||
or username_title.lower() in users.values()
|
||||
):
|
||||
server_users.append(plex_user)
|
||||
plex_server_users.append(plex_user)
|
||||
|
||||
return plex_server_users
|
||||
elif server[0] in ["jellyfin", "emby"]:
|
||||
server_users = {}
|
||||
jelly_emby_server_users: dict[str, str] = {}
|
||||
for jellyfin_user, jellyfin_id in server[1].users.items():
|
||||
if (
|
||||
jellyfin_user.lower() in users.keys()
|
||||
or jellyfin_user.lower() in users.values()
|
||||
):
|
||||
server_users[jellyfin_user] = jellyfin_id
|
||||
jelly_emby_server_users[jellyfin_user] = jellyfin_id
|
||||
|
||||
return server_users
|
||||
return jelly_emby_server_users
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def setup_users(
|
||||
server_1, server_2, blacklist_users, whitelist_users, user_mapping=None
|
||||
):
|
||||
server_1: tuple[Literal["plex", "jellyfin", "emby"], Plex | Jellyfin | Emby],
|
||||
server_2: tuple[Literal["plex", "jellyfin", "emby"], Plex | Jellyfin | Emby],
|
||||
blacklist_users: list[str],
|
||||
whitelist_users: list[str],
|
||||
user_mapping: dict[str, str] | None = None,
|
||||
) -> tuple[list[MyPlexAccount] | dict[str, str], list[MyPlexAccount] | dict[str, str]]:
|
||||
server_1_users = generate_user_list(server_1)
|
||||
server_2_users = generate_user_list(server_2)
|
||||
logger(f"Server 1 users: {server_1_users}", 1)
|
||||
|
||||
Reference in New Issue
Block a user