diff --git a/src/connection.py b/src/connection.py index 8fe20f8..2083078 100644 --- a/src/connection.py +++ b/src/connection.py @@ -66,11 +66,11 @@ def generate_server_connections() -> list[Plex | Jellyfin | Emby]: for i, url in enumerate(plex_baseurl): server = Plex( - baseurl=url.strip(), + base_url=url.strip(), token=plex_token[i].strip(), - username=None, + user_name=None, password=None, - servername=None, + server_name=None, ssl_bypass=ssl_bypass, ) @@ -92,11 +92,11 @@ def generate_server_connections() -> list[Plex | Jellyfin | Emby]: for i, username in enumerate(plex_username): server = Plex( - baseurl=None, + base_url=None, token=None, - username=username.strip(), + user_name=username.strip(), password=plex_password[i].strip(), - servername=plex_servername[i].strip(), + server_name=plex_servername[i].strip(), ssl_bypass=ssl_bypass, ) diff --git a/src/plex.py b/src/plex.py index 197c512..063e215 100644 --- a/src/plex.py +++ b/src/plex.py @@ -10,7 +10,8 @@ from requests.adapters import HTTPAdapter as RequestsHTTPAdapter from plexapi.video import Show, Episode, Movie from plexapi.server import PlexServer -from plexapi.myplex import MyPlexAccount +from plexapi.myplex import MyPlexAccount, MyPlexUser +from plexapi.library import MovieSection, ShowSection from src.functions import ( search_mapping, @@ -53,7 +54,7 @@ def extract_guids_from_item(item: Movie | Show | Episode) -> dict[str, str]: guids: dict[str, str] = dict( guid.id.split("://") for guid in item.guids - if guid.id is not None and len(guid.id.strip()) > 0 + if guid.id and len(guid.id.strip()) > 0 ) return guids @@ -69,13 +70,13 @@ def extract_identifiers_from_item(item: Movie | Show | Episode) -> MediaIdentifi if generate_locations else tuple() ), - imdb_id=guids.get("imdb", None), - tvdb_id=guids.get("tvdb", None), - tmdb_id=guids.get("tmdb", None), + imdb_id=guids.get("imdb"), + tvdb_id=guids.get("tvdb"), + tmdb_id=guids.get("tmdb"), ) -def get_mediaitem(item: Movie | Episode, completed=True) -> MediaItem: +def get_mediaitem(item: Movie | Episode, completed: bool) -> MediaItem: return MediaItem( identifiers=extract_identifiers_from_item(item), status=WatchedStatus(completed=completed, time=item.viewOffset), @@ -115,6 +116,7 @@ def update_user_watched( msg = f"Plex: {plex_movie.title} as watched for {user.title} in {library_name}" if not dryrun: plex_movie.markWatched() + logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}") log_marked( "Plex", @@ -154,7 +156,7 @@ def update_user_watched( if check_same_identifiers( plex_show_identifiers, stored_series.identifiers ): - logger.info(f"Found matching show for '{plex_show.title}'") + logger.trace(f"Found matching show for '{plex_show.title}'") # Now update episodes. # Get the list of Plex episodes for this show. plex_episodes = plex_show.episodes() @@ -216,46 +218,53 @@ def update_user_watched( class Plex: def __init__( self, - baseurl=None, - token=None, - username=None, - password=None, - servername=None, - ssl_bypass=False, + base_url: str | None = None, + token: str | None = None, + user_name: str | None = None, + password: str | None = None, + server_name: str | None = None, + ssl_bypass: bool = False, session=None, ): - self.server_type = "Plex" - self.baseurl = baseurl - self.token = token - self.username = username - self.password = password - self.servername = servername - self.ssl_bypass = ssl_bypass + self.server_type: str = "Plex" + self.ssl_bypass: bool = ssl_bypass if ssl_bypass: # Session for ssl bypass session = requests.Session() # By pass ssl hostname check https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186 session.mount("https://", HostNameIgnoringAdapter()) self.session = session - self.plex = self.login(self.baseurl, self.token) - self.admin_user = self.plex.myPlexAccount() - self.users = self.get_users() + self.plex: PlexServer = self.login( + base_url, token, user_name, password, server_name + ) - def login(self, baseurl, token): + self.base_url: str = self.plex._baseurl + + self.admin_user: MyPlexAccount = self.plex.myPlexAccount() + self.users: list[MyPlexUser | MyPlexAccount] = self.get_users() + + def login( + self, + base_url: str | None, + token: str | None, + user_name: str | None, + password: str | None, + server_name: str | None, + ) -> PlexServer: try: - if baseurl and token: - plex = PlexServer(baseurl, token, session=self.session) - elif self.username and self.password and self.servername: + if base_url and token: + plex: PlexServer = PlexServer(base_url, token, session=self.session) + elif user_name and password and server_name: # Login via plex account - account = MyPlexAccount(self.username, self.password) - plex = account.resource(self.servername).connect() + account = MyPlexAccount(user_name, password) + plex = account.resource(server_name).connect() else: raise Exception("No complete plex credentials provided") return plex except Exception as e: - if self.username: - msg = f"Failed to login via plex account {self.username}" + if user_name: + msg = f"Failed to login via plex account {user_name}" logger.error(f"Plex: Failed to login, {msg}, Error: {e}") else: logger.error(f"Plex: Failed to login, Error: {e}") @@ -264,9 +273,9 @@ class Plex: def info(self) -> str: return f"Plex {self.plex.friendlyName}: {self.plex.version}" - def get_users(self): + def get_users(self) -> list[MyPlexUser | MyPlexAccount]: try: - users = self.plex.myPlexAccount().users() + users: list[MyPlexUser | MyPlexAccount] = self.plex.myPlexAccount().users() # append self to users users.append(self.plex.myPlexAccount()) @@ -302,7 +311,9 @@ class Plex: logger.error(f"Plex: Failed to get libraries, Error: {e}") raise Exception(e) - def get_user_library_watched(self, user_name, user_plex, library) -> LibraryData: + def get_user_library_watched( + self, user_name: str, user_plex: PlexServer, library: MovieSection | ShowSection + ) -> LibraryData: try: logger.info( f"Plex: Generating watched for {user_name} in library {library.title}", @@ -353,9 +364,9 @@ class Plex: if generate_locations else tuple() ), - imdb_id=show_guids.get("imdb", None), - tvdb_id=show_guids.get("tvdb", None), - tmdb_id=show_guids.get("tmdb", None), + imdb_id=show_guids.get("imdb"), + tvdb_id=show_guids.get("tvdb"), + tmdb_id=show_guids.get("tmdb"), ), episodes=episode_mediaitem, ) @@ -369,7 +380,9 @@ class Plex: ) return LibraryData(title=library.title) - def get_watched(self, users, sync_libraries) -> dict[str, UserData]: + def get_watched( + self, users: list[MyPlexUser | MyPlexAccount], sync_libraries: list[str] + ) -> dict[str, UserData]: try: users_watched: dict[str, UserData] = {} @@ -379,10 +392,7 @@ class Plex: else: token = user.get_token(self.plex.machineIdentifier) if token: - user_plex = self.login( - self.plex._baseurl, - token, - ) + user_plex = self.login(self.base_url, token, None, None, None) else: logger.error( f"Plex: Failed to get token for {user.title}, skipping", @@ -445,15 +455,19 @@ class Plex: user_plex = self.plex else: if isinstance(user, str): - logger.warning( + logger.debug( f"Plex: {user} is not a plex object, attempting to get object for user", ) user = self.plex.myPlexAccount().user(user) + if not isinstance(user, MyPlexUser): + logger.error(f"Plex: {user} failed to get PlexUser") + continue + token = user.get_token(self.plex.machineIdentifier) if token: user_plex = PlexServer( - self.plex._baseurl, + self.base_url, token, session=self.session, ) @@ -463,6 +477,10 @@ class Plex: ) continue + if not user_plex: + logger.error(f"Plex: {user} Failed to get PlexServer") + continue + for library_name in user_data.libraries: library_data = user_data.libraries[library_name] library_other = None