Add support for env file support, set via ENV_FILE
Signed-off-by: Luis Garcia <git@luigi311.com>
This commit is contained in:
380
src/plex.py
380
src/plex.py
@@ -1,6 +1,4 @@
|
||||
import os
|
||||
import requests
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from urllib3.poolmanager import PoolManager
|
||||
@@ -17,6 +15,7 @@ from src.functions import (
|
||||
search_mapping,
|
||||
log_marked,
|
||||
str_to_bool,
|
||||
get_env_value,
|
||||
)
|
||||
from src.watched import (
|
||||
LibraryData,
|
||||
@@ -28,11 +27,6 @@ from src.watched import (
|
||||
check_same_identifiers,
|
||||
)
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
generate_guids = str_to_bool(os.getenv("GENERATE_GUIDS", "True"))
|
||||
generate_locations = str_to_bool(os.getenv("GENERATE_LOCATIONS", "True"))
|
||||
|
||||
|
||||
# Bypass hostname validation for ssl. Taken from https://github.com/pkkid/python-plexapi/issues/143#issuecomment-775485186
|
||||
class HostNameIgnoringAdapter(RequestsHTTPAdapter):
|
||||
@@ -48,7 +42,9 @@ class HostNameIgnoringAdapter(RequestsHTTPAdapter):
|
||||
)
|
||||
|
||||
|
||||
def extract_guids_from_item(item: Movie | Show | Episode) -> dict[str, str]:
|
||||
def extract_guids_from_item(
|
||||
item: Movie | Show | Episode, generate_guids: bool = True
|
||||
) -> dict[str, str]:
|
||||
# If GENERATE_GUIDS is set to False, then return an empty dict
|
||||
if not generate_guids:
|
||||
return {}
|
||||
@@ -62,7 +58,11 @@ def extract_guids_from_item(item: Movie | Show | Episode) -> dict[str, str]:
|
||||
return guids
|
||||
|
||||
|
||||
def extract_identifiers_from_item(item: Movie | Show | Episode) -> MediaIdentifiers:
|
||||
def extract_identifiers_from_item(
|
||||
item: Movie | Show | Episode,
|
||||
generate_guids: bool = True,
|
||||
generate_locations: bool = True,
|
||||
) -> MediaIdentifiers:
|
||||
guids = extract_guids_from_item(item)
|
||||
|
||||
return MediaIdentifiers(
|
||||
@@ -78,165 +78,25 @@ def extract_identifiers_from_item(item: Movie | Show | Episode) -> MediaIdentifi
|
||||
)
|
||||
|
||||
|
||||
def get_mediaitem(item: Movie | Episode, completed: bool) -> MediaItem:
|
||||
def get_mediaitem(
|
||||
item: Movie | Episode,
|
||||
completed: bool,
|
||||
generate_guids: bool = True,
|
||||
generate_locations: bool = True,
|
||||
) -> MediaItem:
|
||||
return MediaItem(
|
||||
identifiers=extract_identifiers_from_item(item),
|
||||
identifiers=extract_identifiers_from_item(
|
||||
item, generate_guids, generate_locations
|
||||
),
|
||||
status=WatchedStatus(completed=completed, time=item.viewOffset),
|
||||
)
|
||||
|
||||
|
||||
def update_user_watched(
|
||||
user: MyPlexAccount,
|
||||
user_plex: PlexServer,
|
||||
library_data: LibraryData,
|
||||
library_name: str,
|
||||
dryrun: bool,
|
||||
) -> None:
|
||||
# If there are no movies or shows to update, exit early.
|
||||
if not library_data.series and not library_data.movies:
|
||||
return
|
||||
|
||||
logger.info(f"Plex: Updating watched for {user.title} in library {library_name}")
|
||||
library_section = user_plex.library.section(library_name)
|
||||
if not library_section:
|
||||
logger.error(
|
||||
f"Plex: Library {library_name} not found for {user.title}, skipping",
|
||||
)
|
||||
return
|
||||
|
||||
# Update movies.
|
||||
if library_data.movies:
|
||||
# Search for Plex movies that are currently marked as unwatched.
|
||||
for plex_movie in library_section.search(unwatched=True):
|
||||
plex_identifiers = extract_identifiers_from_item(plex_movie)
|
||||
# Check each stored movie for a match.
|
||||
for stored_movie in library_data.movies:
|
||||
if check_same_identifiers(plex_identifiers, stored_movie.identifiers):
|
||||
# If the stored movie is marked as watched (or has enough progress),
|
||||
# update the Plex movie accordingly.
|
||||
if stored_movie.status.completed:
|
||||
msg = f"Plex: {plex_movie.title} as watched for {user.title} in {library_name}"
|
||||
if not dryrun:
|
||||
try:
|
||||
plex_movie.markWatched()
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Plex: Failed to mark {plex_movie.title} as watched, Error: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}")
|
||||
log_marked(
|
||||
"Plex",
|
||||
user_plex.friendlyName,
|
||||
user.title,
|
||||
library_name,
|
||||
plex_movie.title,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
else:
|
||||
msg = f"Plex: {plex_movie.title} as partially watched for {floor(stored_movie.status.time / 60_000)} minutes for {user.title} in {library_name}"
|
||||
if not dryrun:
|
||||
try:
|
||||
plex_movie.updateTimeline(stored_movie.status.time)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Plex: Failed to update {plex_movie.title} timeline, Error: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}")
|
||||
log_marked(
|
||||
"Plex",
|
||||
user_plex.friendlyName,
|
||||
user.title,
|
||||
library_name,
|
||||
plex_movie.title,
|
||||
duration=stored_movie.status.time,
|
||||
)
|
||||
# Once matched, no need to check further.
|
||||
break
|
||||
|
||||
# Update TV Shows (series/episodes).
|
||||
if library_data.series:
|
||||
# For each Plex show in the library section:
|
||||
plex_shows = library_section.search(unwatched=True)
|
||||
for plex_show in plex_shows:
|
||||
# Extract identifiers from the Plex show.
|
||||
plex_show_identifiers = extract_identifiers_from_item(plex_show)
|
||||
# Try to find a matching series in your stored library.
|
||||
for stored_series in library_data.series:
|
||||
if check_same_identifiers(
|
||||
plex_show_identifiers, stored_series.identifiers
|
||||
):
|
||||
logger.trace(f"Found matching show for '{plex_show.title}'")
|
||||
# Now update episodes.
|
||||
# Get the list of Plex episodes for this show.
|
||||
plex_episodes = plex_show.episodes()
|
||||
for plex_episode in plex_episodes:
|
||||
plex_episode_identifiers = extract_identifiers_from_item(
|
||||
plex_episode
|
||||
)
|
||||
for stored_ep in stored_series.episodes:
|
||||
if check_same_identifiers(
|
||||
plex_episode_identifiers, stored_ep.identifiers
|
||||
):
|
||||
if stored_ep.status.completed:
|
||||
msg = f"Plex: {plex_show.title} {plex_episode.title} as watched for {user.title} in {library_name}"
|
||||
if not dryrun:
|
||||
try:
|
||||
plex_episode.markWatched()
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Plex: Failed to mark {plex_show.title} {plex_episode.title} as watched, Error: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.success(
|
||||
f"{'[DRYRUN] ' if dryrun else ''}{msg}"
|
||||
)
|
||||
log_marked(
|
||||
"Plex",
|
||||
user_plex.friendlyName,
|
||||
user.title,
|
||||
library_name,
|
||||
plex_show.title,
|
||||
plex_episode.title,
|
||||
)
|
||||
else:
|
||||
msg = f"Plex: {plex_show.title} {plex_episode.title} as partially watched for {floor(stored_ep.status.time / 60_000)} minutes for {user.title} in {library_name}"
|
||||
if not dryrun:
|
||||
try:
|
||||
plex_episode.updateTimeline(
|
||||
stored_ep.status.time
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Plex: Failed to update {plex_show.title} {plex_episode.title} timeline, Error: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.success(
|
||||
f"{'[DRYRUN] ' if dryrun else ''}{msg}"
|
||||
)
|
||||
log_marked(
|
||||
"Plex",
|
||||
user_plex.friendlyName,
|
||||
user.title,
|
||||
library_name,
|
||||
plex_show.title,
|
||||
plex_episode.title,
|
||||
stored_ep.status.time,
|
||||
)
|
||||
break # Found a matching episode.
|
||||
break # Found a matching show.
|
||||
|
||||
|
||||
# class plex accept base url and token and username and password but default with none
|
||||
class Plex:
|
||||
def __init__(
|
||||
self,
|
||||
env,
|
||||
base_url: str | None = None,
|
||||
token: str | None = None,
|
||||
user_name: str | None = None,
|
||||
@@ -245,6 +105,8 @@ class Plex:
|
||||
ssl_bypass: bool = False,
|
||||
session: requests.Session | None = None,
|
||||
) -> None:
|
||||
self.env = env
|
||||
|
||||
self.server_type: str = "Plex"
|
||||
self.ssl_bypass: bool = ssl_bypass
|
||||
if ssl_bypass:
|
||||
@@ -346,7 +208,20 @@ class Plex:
|
||||
unwatched=False
|
||||
) + library_videos.search(inProgress=True):
|
||||
if video.isWatched or video.viewOffset >= 60000:
|
||||
watched.movies.append(get_mediaitem(video, video.isWatched))
|
||||
watched.movies.append(
|
||||
get_mediaitem(
|
||||
video,
|
||||
video.isWatched,
|
||||
str_to_bool(
|
||||
get_env_value(self.env, "GENERATE_GUIDS", "True")
|
||||
),
|
||||
str_to_bool(
|
||||
get_env_value(
|
||||
self.env, "GENERATE_LOCATIONS", "True"
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
elif library.type == "show":
|
||||
# Keep track of processed shows to reduce duplicate shows
|
||||
@@ -365,7 +240,18 @@ class Plex:
|
||||
viewOffset__gte=60_000
|
||||
):
|
||||
episode_mediaitem.append(
|
||||
get_mediaitem(episode, episode.isWatched)
|
||||
get_mediaitem(
|
||||
episode,
|
||||
episode.isWatched,
|
||||
str_to_bool(
|
||||
get_env_value(self.env, "GENERATE_GUIDS", "True")
|
||||
),
|
||||
str_to_bool(
|
||||
get_env_value(
|
||||
self.env, "GENERATE_LOCATIONS", "True"
|
||||
)
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
if episode_mediaitem:
|
||||
@@ -380,7 +266,11 @@ class Plex:
|
||||
for location in show.locations
|
||||
]
|
||||
)
|
||||
if generate_locations
|
||||
if str_to_bool(
|
||||
get_env_value(
|
||||
self.env, "GENERATE_LOCATIONS", "True"
|
||||
)
|
||||
)
|
||||
else tuple()
|
||||
),
|
||||
imdb_id=show_guids.get("imdb"),
|
||||
@@ -442,6 +332,170 @@ class Plex:
|
||||
logger.error(f"Plex: Failed to get watched, Error: {e}")
|
||||
return {}
|
||||
|
||||
def update_user_watched(
|
||||
self,
|
||||
user: MyPlexAccount,
|
||||
user_plex: PlexServer,
|
||||
library_data: LibraryData,
|
||||
library_name: str,
|
||||
dryrun: bool,
|
||||
) -> None:
|
||||
# If there are no movies or shows to update, exit early.
|
||||
if not library_data.series and not library_data.movies:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
f"Plex: Updating watched for {user.title} in library {library_name}"
|
||||
)
|
||||
library_section = user_plex.library.section(library_name)
|
||||
if not library_section:
|
||||
logger.error(
|
||||
f"Plex: Library {library_name} not found for {user.title}, skipping",
|
||||
)
|
||||
return
|
||||
|
||||
# Update movies.
|
||||
if library_data.movies:
|
||||
# Search for Plex movies that are currently marked as unwatched.
|
||||
for plex_movie in library_section.search(unwatched=True):
|
||||
plex_identifiers = extract_identifiers_from_item(plex_movie)
|
||||
# Check each stored movie for a match.
|
||||
for stored_movie in library_data.movies:
|
||||
if check_same_identifiers(
|
||||
plex_identifiers, stored_movie.identifiers
|
||||
):
|
||||
# If the stored movie is marked as watched (or has enough progress),
|
||||
# update the Plex movie accordingly.
|
||||
if stored_movie.status.completed:
|
||||
msg = f"Plex: {plex_movie.title} as watched for {user.title} in {library_name}"
|
||||
if not dryrun:
|
||||
try:
|
||||
plex_movie.markWatched()
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Plex: Failed to mark {plex_movie.title} as watched, Error: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}")
|
||||
log_marked(
|
||||
"Plex",
|
||||
user_plex.friendlyName,
|
||||
user.title,
|
||||
library_name,
|
||||
plex_movie.title,
|
||||
None,
|
||||
None,
|
||||
mark_file=get_env_value(
|
||||
self.env, "MARK_FILE", "mark.log"
|
||||
),
|
||||
)
|
||||
else:
|
||||
msg = f"Plex: {plex_movie.title} as partially watched for {floor(stored_movie.status.time / 60_000)} minutes for {user.title} in {library_name}"
|
||||
if not dryrun:
|
||||
try:
|
||||
plex_movie.updateTimeline(stored_movie.status.time)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Plex: Failed to update {plex_movie.title} timeline, Error: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.success(f"{'[DRYRUN] ' if dryrun else ''}{msg}")
|
||||
log_marked(
|
||||
"Plex",
|
||||
user_plex.friendlyName,
|
||||
user.title,
|
||||
library_name,
|
||||
plex_movie.title,
|
||||
duration=stored_movie.status.time,
|
||||
mark_file=get_env_value(
|
||||
self.env, "MARK_FILE", "mark.log"
|
||||
),
|
||||
)
|
||||
# Once matched, no need to check further.
|
||||
break
|
||||
|
||||
# Update TV Shows (series/episodes).
|
||||
if library_data.series:
|
||||
# For each Plex show in the library section:
|
||||
plex_shows = library_section.search(unwatched=True)
|
||||
for plex_show in plex_shows:
|
||||
# Extract identifiers from the Plex show.
|
||||
plex_show_identifiers = extract_identifiers_from_item(plex_show)
|
||||
# Try to find a matching series in your stored library.
|
||||
for stored_series in library_data.series:
|
||||
if check_same_identifiers(
|
||||
plex_show_identifiers, stored_series.identifiers
|
||||
):
|
||||
logger.trace(f"Found matching show for '{plex_show.title}'")
|
||||
# Now update episodes.
|
||||
# Get the list of Plex episodes for this show.
|
||||
plex_episodes = plex_show.episodes()
|
||||
for plex_episode in plex_episodes:
|
||||
plex_episode_identifiers = extract_identifiers_from_item(
|
||||
plex_episode
|
||||
)
|
||||
for stored_ep in stored_series.episodes:
|
||||
if check_same_identifiers(
|
||||
plex_episode_identifiers, stored_ep.identifiers
|
||||
):
|
||||
if stored_ep.status.completed:
|
||||
msg = f"Plex: {plex_show.title} {plex_episode.title} as watched for {user.title} in {library_name}"
|
||||
if not dryrun:
|
||||
try:
|
||||
plex_episode.markWatched()
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Plex: Failed to mark {plex_show.title} {plex_episode.title} as watched, Error: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.success(
|
||||
f"{'[DRYRUN] ' if dryrun else ''}{msg}"
|
||||
)
|
||||
log_marked(
|
||||
"Plex",
|
||||
user_plex.friendlyName,
|
||||
user.title,
|
||||
library_name,
|
||||
plex_show.title,
|
||||
plex_episode.title,
|
||||
mark_file=get_env_value(
|
||||
self.env, "MARK_FILE", "mark.log"
|
||||
),
|
||||
)
|
||||
else:
|
||||
msg = f"Plex: {plex_show.title} {plex_episode.title} as partially watched for {floor(stored_ep.status.time / 60_000)} minutes for {user.title} in {library_name}"
|
||||
if not dryrun:
|
||||
try:
|
||||
plex_episode.updateTimeline(
|
||||
stored_ep.status.time
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Plex: Failed to update {plex_show.title} {plex_episode.title} timeline, Error: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
logger.success(
|
||||
f"{'[DRYRUN] ' if dryrun else ''}{msg}"
|
||||
)
|
||||
log_marked(
|
||||
"Plex",
|
||||
user_plex.friendlyName,
|
||||
user.title,
|
||||
library_name,
|
||||
plex_show.title,
|
||||
plex_episode.title,
|
||||
stored_ep.status.time,
|
||||
mark_file=get_env_value(
|
||||
self.env, "MARK_FILE", "mark.log"
|
||||
),
|
||||
)
|
||||
break # Found a matching episode.
|
||||
break # Found a matching show.
|
||||
|
||||
def update_watched(
|
||||
self,
|
||||
watched_list: dict[str, UserData],
|
||||
@@ -525,7 +579,7 @@ class Plex:
|
||||
continue
|
||||
|
||||
try:
|
||||
update_user_watched(
|
||||
self.update_user_watched(
|
||||
user,
|
||||
user_plex,
|
||||
library_data,
|
||||
|
||||
Reference in New Issue
Block a user