More typing

Signed-off-by: Luis Garcia <git@luigi311.com>
pull/210/head
Luis Garcia 2024-11-12 22:51:05 -07:00
parent 46fa5e7c9a
commit bf633c75d1
4 changed files with 111 additions and 81 deletions

View File

@ -1,5 +1,6 @@
import os import os
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Callable
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv(override=True) load_dotenv(override=True)
@ -8,11 +9,11 @@ log_file = os.getenv("LOG_FILE", os.getenv("LOGFILE", "log.log"))
mark_file = os.getenv("MARK_FILE", os.getenv("MARKFILE", "mark.log")) mark_file = os.getenv("MARK_FILE", os.getenv("MARKFILE", "mark.log"))
def logger(message: str, log_type=0): def logger(message: str, log_type: int = 0):
debug = str_to_bool(os.getenv("DEBUG", "False")) debug = str_to_bool(os.getenv("DEBUG", "False"))
debug_level = os.getenv("DEBUG_LEVEL", "info").lower() debug_level = os.getenv("DEBUG_LEVEL", "info").lower()
output = str(message) output: str | None = str(message)
if log_type == 0: if log_type == 0:
pass pass
elif log_type == 1 and (debug and debug_level in ("info", "debug")): elif log_type == 1 and (debug and debug_level in ("info", "debug")):
@ -42,12 +43,9 @@ def log_marked(
username: str, username: str,
library: str, library: str,
movie_show: str, movie_show: str,
episode: str = None, episode: str | None = None,
duration=None, duration: float | None = None,
): ):
if mark_file is None:
return
output = f"{server_type}/{server_name}/{username}/{library}/{movie_show}" output = f"{server_type}/{server_name}/{username}/{library}/{movie_show}"
if episode: if episode:
@ -69,7 +67,7 @@ def str_to_bool(value: str) -> bool:
# Search for nested element in list # Search for nested element in list
def contains_nested(element, lst): def contains_nested(element: str, lst: list[tuple[str] | None] | tuple[str] | None):
if lst is None: if lst is None:
return None return None
@ -116,33 +114,39 @@ def match_list(
def future_thread_executor( def future_thread_executor(
args: list, threads: int = None, override_threads: bool = False args: list[tuple[Callable[..., Any], ...]],
): threads: int | None = None,
futures_list = [] override_threads: bool = False,
results = [] ) -> list[Any]:
results: list[Any] = []
workers = min(int(os.getenv("MAX_THREADS", 32)), os.cpu_count() * 2) # Determine the number of workers, defaulting to 1 if os.cpu_count() returns None
if threads: max_threads_env: int = int(os.getenv("MAX_THREADS", 32))
cpu_threads: int = os.cpu_count() or 1 # Default to 1 if os.cpu_count() is None
workers: int = min(max_threads_env, cpu_threads * 2)
# Adjust workers based on threads parameter and override_threads flag
if threads is not None:
workers = min(threads, workers) workers = min(threads, workers)
if override_threads: if override_threads:
workers = threads workers = threads if threads is not None else workers
# If only one worker, run in main thread to avoid overhead # If only one worker, run in main thread to avoid overhead
if workers == 1: if workers == 1:
results = []
for arg in args: for arg in args:
results.append(arg[0](*arg[1:])) results.append(arg[0](*arg[1:]))
return results return results
with ThreadPoolExecutor(max_workers=workers) as executor: with ThreadPoolExecutor(max_workers=workers) as executor:
futures_list: list[Future[Any]] = []
for arg in args: for arg in args:
# * arg unpacks the list into actual arguments # * arg unpacks the list into actual arguments
futures_list.append(executor.submit(*arg)) futures_list.append(executor.submit(*arg))
for future in futures_list: for out in futures_list:
try: try:
result = future.result() result = out.result()
results.append(result) results.append(result)
except Exception as e: except Exception as e:
raise Exception(e) raise Exception(e)

View File

@ -2,7 +2,7 @@
import traceback, os import traceback, os
from math import floor from math import floor
from typing import Literal from typing import Any, Literal
from dotenv import load_dotenv from dotenv import load_dotenv
import requests import requests
from packaging.version import parse, Version from packaging.version import parse, Version
@ -101,10 +101,12 @@ class JellyfinEmby:
query: str, query: str,
query_type: Literal["get", "post"], query_type: Literal["get", "post"],
identifiers: dict[str, str] | None = None, identifiers: dict[str, str] | None = None,
json: dict | None = None, json: dict[str, float] | None = None,
) -> dict | list[dict]: ) -> dict[str, Any] | list[dict[str, Any]] | None:
try: try:
results = None results: (
dict[str, list[Any] | dict[str, str]] | list[dict[str, Any]] | None
) = None
if query_type == "get": if query_type == "get":
response = self.session.get( response = self.session.get(
@ -140,7 +142,7 @@ class JellyfinEmby:
raise Exception("Query result is not of type list or dict") raise Exception("Query result is not of type list or dict")
# append identifiers to results # append identifiers to results
if identifiers: if identifiers and results:
results["Identifiers"] = identifiers results["Identifiers"] = identifiers
return results return results
@ -158,13 +160,13 @@ class JellyfinEmby:
try: try:
query_string = "/System/Info/Public" query_string = "/System/Info/Public"
response: dict = self.query(query_string, "get") response: dict[str, Any] = self.query(query_string, "get")
if response: if response:
if name_only: if name_only:
return response.get("ServerName") return response["ServerName"]
elif version_only: elif version_only:
return parse(response.get("Version")) return parse(response["Version"])
return f"{self.server_type} {response.get('ServerName')}: {response.get('Version')}" return f"{self.server_type} {response.get('ServerName')}: {response.get('Version')}"
else: else:
@ -176,15 +178,16 @@ class JellyfinEmby:
def get_users(self) -> dict[str, str]: def get_users(self) -> dict[str, str]:
try: try:
users = {} users: dict[str, str] = {}
query_string = "/Users" query_string = "/Users"
response = self.query(query_string, "get") response: list[dict[str, str | bool]] = self.query(query_string, "get")
# If response is not empty # If response is not empty
if response: if response:
for user in response: for user in response:
users[user["Name"]] = user["Id"] if isinstance(user["Name"], str) and isinstance(user["Id"], str):
users[user["Name"]] = user["Id"]
return users return users
except Exception as e: except Exception as e:
@ -421,7 +424,9 @@ class JellyfinEmby:
logger(traceback.format_exc(), 2) logger(traceback.format_exc(), 2)
return {} return {}
def get_watched(self, users, sync_libraries): def get_watched(
self, users: dict[str, str], sync_libraries: list[str]
):
try: try:
users_watched = {} users_watched = {}
watched = [] watched = []
@ -437,7 +442,7 @@ class JellyfinEmby:
if library_title not in sync_libraries: if library_title not in sync_libraries:
continue continue
identifiers = { identifiers: dict[str, str] = {
"library_id": library_id, "library_id": library_id,
"library_title": library_title, "library_title": library_title,
} }
@ -454,8 +459,8 @@ class JellyfinEmby:
if len(library["Items"]) == 0: if len(library["Items"]) == 0:
continue continue
library_id = library["Identifiers"]["library_id"] library_id: str = library["Identifiers"]["library_id"]
library_title = library["Identifiers"]["library_title"] library_title: str = library["Identifiers"]["library_title"]
# Get all library types excluding "Folder" # Get all library types excluding "Folder"
types = set( types = set(

View File

@ -4,16 +4,15 @@ from src.functions import (
search_mapping, search_mapping,
) )
def check_skip_logic( def check_skip_logic(
library_title, library_title: str,
library_type, library_type: str,
blacklist_library, blacklist_library: list[str],
whitelist_library, whitelist_library: list[str],
blacklist_library_type, blacklist_library_type: list[str],
whitelist_library_type, whitelist_library_type: list[str],
library_mapping=None, library_mapping: dict[str, str] | None = None,
): ) -> str | None:
skip_reason = None skip_reason = None
library_other = None library_other = None
if library_mapping: if library_mapping:
@ -48,11 +47,11 @@ def check_skip_logic(
def check_blacklist_logic( def check_blacklist_logic(
library_title, library_title: str,
library_type, library_type: str,
blacklist_library, blacklist_library: list[str],
blacklist_library_type, blacklist_library_type: list[str],
library_other=None, library_other: str | None = None,
): ):
skip_reason = None skip_reason = None
if isinstance(library_type, (list, tuple, set)): if isinstance(library_type, (list, tuple, set)):
@ -84,11 +83,11 @@ def check_blacklist_logic(
def check_whitelist_logic( def check_whitelist_logic(
library_title, library_title: str,
library_type, library_type: str,
whitelist_library, whitelist_library: list[str],
whitelist_library_type, whitelist_library_type: list[str],
library_other=None, library_other: str | None = None,
): ):
skip_reason = None skip_reason = None
if len(whitelist_library_type) > 0: if len(whitelist_library_type) > 0:
@ -131,14 +130,14 @@ def check_whitelist_logic(
def filter_libaries( def filter_libaries(
server_libraries, server_libraries: dict[str, str],
blacklist_library, blacklist_library: list[str],
blacklist_library_type, blacklist_library_type: list[str],
whitelist_library, whitelist_library: list[str],
whitelist_library_type, whitelist_library_type: list[str],
library_mapping=None, library_mapping: dict[str, str] | None = None,
): ) -> list[str]:
filtered_libaries = [] filtered_libaries: list[str] = []
for library in server_libraries: for library in server_libraries:
skip_reason = check_skip_logic( skip_reason = check_skip_logic(
library, library,
@ -162,12 +161,12 @@ def filter_libaries(
def setup_libraries( def setup_libraries(
server_1, server_1,
server_2, server_2,
blacklist_library, blacklist_library: list[str],
blacklist_library_type, blacklist_library_type: list[str],
whitelist_library, whitelist_library: list[str],
whitelist_library_type, whitelist_library_type: list[str],
library_mapping=None, library_mapping: dict[str, str] | None = None,
): ) -> tuple[list[str], list[str]]:
server_1_libraries = server_1.get_libraries() server_1_libraries = server_1.get_libraries()
server_2_libraries = server_2.get_libraries() server_2_libraries = server_2.get_libraries()
logger(f"Server 1 libraries: {server_1_libraries}", 1) logger(f"Server 1 libraries: {server_1_libraries}", 1)
@ -201,14 +200,16 @@ def setup_libraries(
return output_server_1_libaries, output_server_2_libaries return output_server_1_libaries, output_server_2_libaries
def show_title_dict(user_list: dict): def show_title_dict(user_list) -> dict[str, list[tuple[str] | None]]:
try: try:
show_output_dict = {} if not isinstance(user_list, dict):
return {}
show_output_dict: dict[str, list[tuple[str] | None]] = {}
show_output_dict["locations"] = [] show_output_dict["locations"] = []
show_counter = 0 # Initialize a counter for the current show position show_counter = 0 # Initialize a counter for the current show position
show_output_keys = user_list.keys() show_output_keys = [dict(x) for x in list(user_list.keys())]
show_output_keys = [dict(x) for x in list(show_output_keys)]
for show_key in show_output_keys: for show_key in show_output_keys:
for provider_key, provider_value in show_key.items(): for provider_key, provider_value in show_key.items():
# Skip title # Skip title
@ -233,9 +234,19 @@ def show_title_dict(user_list: dict):
return {} return {}
def episode_title_dict(user_list: dict): def episode_title_dict(
user_list,
) -> dict[
str, list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None]
]:
try: try:
episode_output_dict = {} if not isinstance(user_list, dict):
return {}
episode_output_dict: dict[
str,
list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None],
] = {}
episode_output_dict["completed"] = [] episode_output_dict["completed"] = []
episode_output_dict["time"] = [] episode_output_dict["time"] = []
episode_output_dict["locations"] = [] episode_output_dict["locations"] = []
@ -293,12 +304,18 @@ def episode_title_dict(user_list: dict):
return {} return {}
def movies_title_dict(user_list: dict): def movies_title_dict(
user_list,
) -> dict[str, list[str | bool | int | tuple[str] | None]]:
try: try:
movies_output_dict = {} if not isinstance(user_list, list):
movies_output_dict["completed"] = [] return {}
movies_output_dict["time"] = []
movies_output_dict["locations"] = [] movies_output_dict: dict[str, list[str | bool | int | tuple[str] | None]] = {
"completed": [],
"time": [],
"locations": [],
}
movie_counter = 0 # Initialize a counter for the current movie position movie_counter = 0 # Initialize a counter for the current movie position
for movie in user_list: for movie in user_list:
@ -325,7 +342,11 @@ def movies_title_dict(user_list: dict):
return {} return {}
def generate_library_guids_dict(user_list: dict): def generate_library_guids_dict(user_list) -> tuple[
dict[str, list[tuple[str] | None]],
dict[str, list[str | bool | int | tuple[str] | dict[str, str | tuple[str]] | None]],
dict[str, list[str | bool | int | tuple[str] | None]],
]:
# Handle the case where user_list is empty or does not contain the expected keys and values # Handle the case where user_list is empty or does not contain the expected keys and values
if not user_list: if not user_list:
return {}, {}, {} return {}, {}, {}

View File

@ -488,7 +488,7 @@ class Plex:
logger(f"Plex: Failed to get users, Error: {e}", 2) logger(f"Plex: Failed to get users, Error: {e}", 2)
raise Exception(e) raise Exception(e)
def get_libraries(self): def get_libraries(self) -> dict[str, str]:
try: try:
output = {} output = {}