JellyPlex-Watched/src/jellyfin_emby.py

827 lines
34 KiB
Python

# Functions for Jellyfin and Emby
import traceback, os
from math import floor
from dotenv import load_dotenv
import requests
from packaging import version
from src.functions import (
logger,
search_mapping,
contains_nested,
log_marked,
str_to_bool,
)
from src.library import (
check_skip_logic,
generate_library_guids_dict,
)
load_dotenv(override=True)
generate_guids = str_to_bool(os.getenv("GENERATE_GUIDS", "True"))
generate_locations = str_to_bool(os.getenv("GENERATE_LOCATIONS", "True"))
def get_guids(server_type, item):
if item.get("Name"):
guids = {"title": item.get("Name")}
else:
logger(f"{server_type}: Name not found in {item.get('Id')}", 1)
guids = {"title": None}
if "ProviderIds" in item:
guids.update({k.lower(): v for k, v in item["ProviderIds"].items()})
else:
logger(f"{server_type}: ProviderIds not found in {item.get('Name')}", 1)
if "MediaSources" in item:
guids["locations"] = tuple(
[x["Path"].split("/")[-1] for x in item["MediaSources"] if "Path" in x]
)
else:
logger(f"{server_type}: MediaSources not found in {item.get('Name')}", 1)
guids["locations"] = tuple()
if "UserData" in item:
guids["status"] = {
"completed": item["UserData"]["Played"],
# Convert ticks to milliseconds to match Plex
"time": floor(item["UserData"]["PlaybackPositionTicks"] / 10000),
}
else:
logger(f"{server_type}: UserData not found in {item.get('Name')}", 1)
guids["status"] = {}
return guids
def get_video_status(server_video, videos_ids, videos):
video_status = None
if generate_locations:
if "MediaSources" in server_video:
for video_location in server_video["MediaSources"]:
if "Path" in video_location:
if (
contains_nested(
video_location["Path"].split("/")[-1],
videos_ids["locations"],
)
is not None
):
for video in videos:
if (
contains_nested(
video_location["Path"].split("/")[-1],
video["locations"],
)
is not None
):
video_status = video["status"]
break
break
if generate_guids:
if not video_status:
for (
video_provider_source,
video_provider_id,
) in server_video["ProviderIds"].items():
if video_provider_source.lower() in videos_ids:
if (
video_provider_id.lower()
in videos_ids[video_provider_source.lower()]
):
for video in videos:
if video_provider_id.lower() in video.get(
video_provider_source.lower(), []
):
video_status = video["status"]
break
break
return video_status
class JellyfinEmby:
def __init__(self, server_type, baseurl, token, headers):
if server_type not in ["Jellyfin", "Emby"]:
raise Exception(f"Server type {server_type} not supported")
self.server_type = server_type
self.baseurl = baseurl
self.token = token
self.headers = headers
self.timeout = int(os.getenv("REQUEST_TIMEOUT", 300))
if not self.baseurl:
raise Exception(f"{self.server_type} baseurl not set")
if not self.token:
raise Exception(f"{self.server_type} token not set")
self.session = requests.Session()
self.users = self.get_users()
def query(self, query, query_type, identifiers=None, json=None):
try:
results = None
if query_type == "get":
response = self.session.get(
self.baseurl + query, headers=self.headers, timeout=self.timeout
)
if response.status_code not in [200, 204]:
raise Exception(
f"Query failed with status {response.status_code} {response.reason}"
)
if response.status_code == 204:
results = None
else:
results = response.json()
elif query_type == "post":
response = self.session.post(
self.baseurl + query,
headers=self.headers,
json=json,
timeout=self.timeout,
)
if response.status_code not in [200, 204]:
raise Exception(
f"Query failed with status {response.status_code} {response.reason}"
)
if response.status_code == 204:
results = None
else:
results = response.json()
if results is not None:
if not isinstance(results, list) and not isinstance(results, dict):
raise Exception("Query result is not of type list or dict")
# append identifiers to results
if identifiers:
results["Identifiers"] = identifiers
return results
except Exception as e:
logger(
f"{self.server_type}: Query {query_type} {query}\nResults {results}\n{e}",
2,
)
raise Exception(e)
def info(self) -> str:
try:
query_string = "/System/Info/Public"
response = self.query(query_string, "get")
if response:
return f"{self.server_type} {response['ServerName']}: {response['Version']}"
else:
return None
except Exception as e:
logger(f"{self.server_type}: Get server name failed {e}", 2)
raise Exception(e)
def get_server_version(self):
try:
response = self.query("/System/Info/Public", "get")
if response:
return version.parse(response["Version"])
else:
return None
except Exception as e:
logger(f"{self.server_type}: Get server version failed: {e}", 2)
raise Exception(e)
def get_users(self):
try:
users = {}
query_string = "/Users"
response = self.query(query_string, "get")
# If response is not empty
if response:
for user in response:
users[user["Name"]] = user["Id"]
return users
except Exception as e:
logger(f"{self.server_type}: Get users failed {e}", 2)
raise Exception(e)
def get_libraries(self):
try:
libraries = {}
# Theres no way to get all libraries so individually get list of libraries from all users
users = self.get_users()
for _, user_id in users.items():
user_libraries = self.query(f"/Users/{user_id}/Views", "get")
for library in user_libraries["Items"]:
library_id = library["Id"]
library_title = library["Name"]
# Get library items to check the type
media_info = self.query(
f"/Users/{user_id}/Items"
+ f"?ParentId={library_id}&Filters=IsPlayed&Recursive=True&excludeItemTypes=Folder&limit=100",
"get",
)
types = set(
[
x["Type"]
for x in media_info["Items"]
if x["Type"] in ["Movie", "Series", "Episode"]
]
)
all_types = set([x["Type"] for x in media_info["Items"]])
if not types:
logger(
f"{self.server_type}: Skipping Library {library_title} found wanted types: {all_types}",
1,
)
else:
libraries[library_title] = str(types)
return libraries
except Exception as e:
logger(f"{self.server_type}: Get libraries failed {e}", 2)
raise Exception(e)
def get_user_library_watched(
self, user_name, user_id, library_type, library_id, library_title
):
try:
user_name = user_name.lower()
user_watched = {}
logger(
f"{self.server_type}: Generating watched for {user_name} in library {library_title}",
0,
)
# Movies
if library_type == "Movie":
user_watched[library_title] = []
watched = self.query(
f"/Users/{user_id}/Items"
+ f"?ParentId={library_id}&Filters=IsPlayed&IncludeItemTypes=Movie&Recursive=True&Fields=ItemCounts,ProviderIds,MediaSources",
"get",
).get("Items", [])
in_progress = self.query(
f"/Users/{user_id}/Items"
+ f"?ParentId={library_id}&Filters=IsResumable&IncludeItemTypes=Movie&Recursive=True&Fields=ItemCounts,ProviderIds,MediaSources",
"get",
).get("Items", [])
for movie in watched + in_progress:
# Skip if theres no user data which means the movie has not been watched
if "UserData" not in movie:
continue
# Skip if theres no media tied to the movie
if "MediaSources" not in movie or movie["MediaSources"] == {}:
continue
# Skip if not watched or watched less than a minute
if (
movie["UserData"]["Played"] == True
or movie["UserData"]["PlaybackPositionTicks"] > 600000000
):
logger(
f"{self.server_type}: Adding {movie.get('Name')} to {user_name} watched list",
3,
)
# Get the movie's GUIDs
movie_guids = get_guids(self.server_type, movie)
# Append the movie dictionary to the list for the given user and library
user_watched[library_title].append(movie_guids)
logger(
f"{self.server_type}: Added {movie_guids} to {user_name} watched list",
3,
)
# TV Shows
if library_type in ["Series", "Episode"]:
# Initialize an empty dictionary for the given user and library
user_watched[library_title] = {}
# Retrieve a list of watched TV shows
watched_shows = self.query(
f"/Users/{user_id}/Items"
+ f"?ParentId={library_id}&isPlaceHolder=false&IncludeItemTypes=Series&Recursive=True&Fields=ProviderIds,Path,RecursiveItemCount",
"get",
).get("Items", [])
# Filter the list of shows to only include those that have been partially or fully watched
watched_shows_filtered = []
for show in watched_shows:
if "UserData" not in show:
continue
if "PlayedPercentage" in show["UserData"]:
if show["UserData"]["PlayedPercentage"] > 0:
watched_shows_filtered.append(show)
# Retrieve the watched/partially watched list of episodes of each watched show
for show in watched_shows_filtered:
logger(
f"{self.server_type}: Adding {show.get('Name')} to {user_name} watched list",
3,
)
show_guids = {k.lower(): v for k, v in show["ProviderIds"].items()}
show_guids["title"] = show["Name"]
show_guids["locations"] = (
tuple([show["Path"].split("/")[-1]])
if "Path" in show
else tuple()
)
show_display_name = (
show_guids["title"]
if show_guids["title"]
else show_guids["locations"]
)
show_guids = frozenset(show_guids.items())
show_episodes = self.query(
f"/Shows/{show['Id']}/Episodes"
+ f"?userId={user_id}&isPlaceHolder=false&Fields=ProviderIds,MediaSources",
"get",
).get("Items", [])
# Iterate through the episodes
# Create a list to store the episodes
mark_episodes_list = []
for episode in show_episodes:
if "UserData" not in episode:
continue
if (
"MediaSources" not in episode
or episode["MediaSources"] == {}
):
continue
# If watched or watched more than a minute
if (
episode["UserData"]["Played"] == True
or episode["UserData"]["PlaybackPositionTicks"] > 600000000
):
episode_guids = get_guids(self.server_type, episode)
mark_episodes_list.append(episode_guids)
if mark_episodes_list:
# Add the show dictionary to the user's watched list
if show_guids not in user_watched[library_title]:
user_watched[library_title][show_guids] = []
user_watched[library_title][
show_guids
] = mark_episodes_list
for episode in mark_episodes_list:
logger(
f"{self.server_type}: Added {episode} to {user_name} {show_display_name} watched list",
1,
)
logger(
f"{self.server_type}: Got watched for {user_name} in library {library_title}",
1,
)
if library_title in user_watched:
logger(
f"{self.server_type}: {user_watched[library_title]}", 3
)
return user_watched
except Exception as e:
logger(
f"{self.server_type}: Failed to get watched for {user_name} in library {library_title}, Error: {e}",
2,
)
logger(traceback.format_exc(), 2)
return {}
def get_watched(
self,
users,
sync_libraries
):
try:
users_watched = {}
watched = []
for user_name, user_id in users.items():
libraries = []
all_libraries = self.query(f"/Users/{user_id}/Views", "get")
for library in all_libraries["Items"]:
library_id = library["Id"]
library_title = library["Name"]
identifiers = {
"library_id": library_id,
"library_title": library_title,
}
libraries.append(
self.query(
f"/Users/{user_id}/Items"
+ f"?ParentId={library_id}&Filters=IsPlayed&Recursive=True&excludeItemTypes=Folder&limit=100",
"get",
identifiers=identifiers,
)
)
for library in libraries:
if len(library["Items"]) == 0:
continue
library_id = library["Identifiers"]["library_id"]
library_title = library["Identifiers"]["library_title"]
if library_title not in sync_libraries:
continue
# Get all library types excluding "Folder"
types = set(
[
x["Type"]
for x in library["Items"]
if x["Type"] in ["Movie", "Series", "Episode"]
]
)
for library_type in types:
# Get watched for user
watched = self.get_user_library_watched(
user_name,
user_id,
library_type,
library_id,
library_title,
)
if user_name.lower() not in users_watched:
users_watched[user_name.lower()] = {}
users_watched[user_name.lower()].update(watched)
return users_watched
except Exception as e:
logger(f"{self.server_type}: Failed to get watched, Error: {e}", 2)
raise Exception(e)
def update_user_watched(
self, user_name, user_id, library, library_id, videos, update_partial, dryrun
):
try:
logger(
f"{self.server_type}: Updating watched for {user_name} in library {library}",
1,
)
(
videos_shows_ids,
videos_episodes_ids,
videos_movies_ids,
) = generate_library_guids_dict(videos)
if (
not videos_movies_ids
and not videos_shows_ids
and not videos_episodes_ids
):
logger(
f"{self.server_type}: No videos to mark as watched for {user_name} in library {library}",
1,
)
return
logger(
f"{self.server_type}: mark list\nShows: {videos_shows_ids}\nEpisodes: {videos_episodes_ids}\nMovies: {videos_movies_ids}",
1,
)
if videos_movies_ids:
jellyfin_search = self.query(
f"/Users/{user_id}/Items"
+ f"?SortBy=SortName&SortOrder=Ascending&Recursive=True&ParentId={library_id}"
+ "&isPlayed=false&Fields=ItemCounts,ProviderIds,MediaSources&IncludeItemTypes=Movie",
"get",
)
for jellyfin_video in jellyfin_search["Items"]:
movie_status = get_video_status(
jellyfin_video, videos_movies_ids, videos
)
if movie_status:
jellyfin_video_id = jellyfin_video["Id"]
if movie_status["completed"]:
msg = f"{self.server_type}: {jellyfin_video.get('Name')} as watched for {user_name} in {library}"
if not dryrun:
logger(msg, 5)
self.query(
f"/Users/{user_id}/PlayedItems/{jellyfin_video_id}",
"post",
)
else:
logger(msg, 6)
log_marked(
user_name,
library,
jellyfin_video.get("Name"),
)
elif update_partial:
msg = f"{self.server_type}: {jellyfin_video.get('Name')} as partially watched for {floor(movie_status['time'] / 60_000)} minutes for {user_name} in {library}"
if not dryrun:
logger(msg, 5)
playback_position_payload = {
"PlaybackPositionTicks": movie_status["time"]
* 10_000,
}
self.query(
f"/Users/{user_id}/Items/{jellyfin_video_id}/UserData",
"post",
json=playback_position_payload,
)
else:
logger(msg, 6)
log_marked(
user_name,
library,
jellyfin_video.get("Name"),
duration=floor(movie_status["time"] / 60_000),
)
else:
logger(
f"{self.server_type}: Skipping movie {jellyfin_video.get('Name')} as it is not in mark list for {user_name}",
3,
)
# TV Shows
if videos_shows_ids and videos_episodes_ids:
jellyfin_search = self.query(
f"/Users/{user_id}/Items"
+ f"?SortBy=SortName&SortOrder=Ascending&Recursive=True&ParentId={library_id}"
+ "&Fields=ItemCounts,ProviderIds,Path&IncludeItemTypes=Series",
"get",
)
jellyfin_shows = [x for x in jellyfin_search["Items"]]
for jellyfin_show in jellyfin_shows:
show_found = False
episode_videos = []
if generate_locations:
if "Path" in jellyfin_show:
if (
contains_nested(
jellyfin_show["Path"].split("/")[-1],
videos_shows_ids["locations"],
)
is not None
):
show_found = True
for shows, episodes in videos.items():
show = {k: v for k, v in shows}
if (
contains_nested(
jellyfin_show["Path"].split("/")[-1],
show["locations"],
)
is not None
):
for episode in episodes:
episode_videos.append(episode)
break
if generate_guids:
if not show_found:
for show_provider_source, show_provider_id in jellyfin_show[
"ProviderIds"
].items():
if show_provider_source.lower() in videos_shows_ids:
if (
show_provider_id.lower()
in videos_shows_ids[
show_provider_source.lower()
]
):
show_found = True
for show, episodes in videos.items():
show = {k: v for k, v in show}
if show_provider_id.lower() in show.get(
show_provider_source.lower(), []
):
for episode in episodes:
episode_videos.append(episode)
break
if show_found:
logger(
f"{self.server_type}: Updating watched for {user_name} in library {library} for show {jellyfin_show.get('Name')}",
1,
)
jellyfin_show_id = jellyfin_show["Id"]
jellyfin_episodes = self.query(
f"/Shows/{jellyfin_show_id}/Episodes"
+ f"?userId={user_id}&Fields=ItemCounts,ProviderIds,MediaSources",
"get",
)
for jellyfin_episode in jellyfin_episodes["Items"]:
episode_status = get_video_status(
jellyfin_episode, videos_episodes_ids, episode_videos
)
if episode_status:
jellyfin_episode_id = jellyfin_episode["Id"]
if episode_status["completed"]:
msg = (
f"{self.server_type}: {jellyfin_episode['SeriesName']} {jellyfin_episode['SeasonName']} Episode {jellyfin_episode.get('IndexNumber')} {jellyfin_episode.get('Name')}"
+ f" as watched for {user_name} in {library}"
)
if not dryrun:
logger(msg, 5)
self.query(
f"/Users/{user_id}/PlayedItems/{jellyfin_episode_id}",
"post",
)
else:
logger(msg, 6)
log_marked(
user_name,
library,
jellyfin_episode.get("SeriesName"),
jellyfin_episode.get("Name"),
)
elif update_partial:
msg = (
f"{self.server_type}: {jellyfin_episode['SeriesName']} {jellyfin_episode['SeasonName']} Episode {jellyfin_episode.get('IndexNumber')} {jellyfin_episode.get('Name')}"
+ f" as partially watched for {floor(episode_status['time'] / 60_000)} minutes for {user_name} in {library}"
)
if not dryrun:
logger(msg, 5)
playback_position_payload = {
"PlaybackPositionTicks": episode_status[
"time"
]
* 10_000,
}
self.query(
f"/Users/{user_id}/Items/{jellyfin_episode_id}/UserData",
"post",
json=playback_position_payload,
)
else:
logger(msg, 6)
log_marked(
user_name,
library,
jellyfin_episode.get("SeriesName"),
jellyfin_episode.get("Name"),
duration=floor(episode_status["time"] / 60_000),
)
else:
logger(
f"{self.server_type}: Skipping episode {jellyfin_episode.get('Name')} as it is not in mark list for {user_name}",
3,
)
else:
logger(
f"{self.server_type}: Skipping show {jellyfin_show.get('Name')} as it is not in mark list for {user_name}",
3,
)
except Exception as e:
logger(
f"{self.server_type}: Error updating watched for {user_name} in library {library}, {e}",
2,
)
logger(traceback.format_exc(), 2)
raise Exception(e)
def update_watched(
self, watched_list, user_mapping=None, library_mapping=None, dryrun=False
):
try:
server_version = self.get_server_version()
update_partial = self.is_partial_update_supported(server_version)
if not update_partial:
logger(
f"{self.server_type}: Server version {server_version} does not support updating playback position.",
2,
)
for user, libraries in watched_list.items():
logger(f"{self.server_type}: Updating for entry {user}, {libraries}", 1)
user_other = None
user_name = None
if user_mapping:
if user in user_mapping.keys():
user_other = user_mapping[user]
elif user in user_mapping.values():
user_other = search_mapping(user_mapping, user)
user_id = None
for key in self.users:
if user.lower() == key.lower():
user_id = self.users[key]
user_name = key
break
elif user_other and user_other.lower() == key.lower():
user_id = self.users[key]
user_name = key
break
if not user_id:
logger(f"{user} {user_other} not found in Jellyfin", 2)
continue
jellyfin_libraries = self.query(
f"/Users/{user_id}/Views",
"get",
)
jellyfin_libraries = [x for x in jellyfin_libraries["Items"]]
for library, videos in libraries.items():
library_other = None
if library_mapping:
if library in library_mapping.keys():
library_other = library_mapping[library]
elif library in library_mapping.values():
library_other = search_mapping(library_mapping, library)
if library.lower() not in [
x["Name"].lower() for x in jellyfin_libraries
]:
if library_other:
if library_other.lower() in [
x["Name"].lower() for x in jellyfin_libraries
]:
logger(
f"{self.server_type}: Library {library} not found, but {library_other} found, using {library_other}",
1,
)
library = library_other
else:
logger(
f"{self.server_type}: Library {library} or {library_other} not found in library list",
1,
)
continue
else:
logger(
f"{self.server_type}: Library {library} not found in library list",
1,
)
continue
library_id = None
for jellyfin_library in jellyfin_libraries:
if jellyfin_library["Name"] == library:
library_id = jellyfin_library["Id"]
continue
if library_id:
self.update_user_watched(
user_name,
user_id,
library,
library_id,
videos,
update_partial,
dryrun,
)
except Exception as e:
logger(f"{self.server_type}: Error updating watched, {e}", 2)
raise Exception(e)