simpleertube/peertube.py

507 lines
22 KiB
Python

from bs4 import BeautifulSoup
import requests
import json
import sys
from datetime import datetime
from dateutil import parser as dateutil
# --- REQUEST UTILS ---
# Semantic class to store remote errors
class FailedRequest:
def __init__(self, e):
self.exception = e
def format(self):
# If it's a rawtext error, print it
# Otherwise look for 'message' attribute
# Otherwise ask python to represent the exception
if isinstance(self.exception, str):
return self.exception
else: return getattr(self.exception, 'message', repr(self.exception))
# Strip < and > symbols to prevent content injection,
# and replace newlines with HTML line breaks <br>
def format_html(self):
return self.format().replace('<', "&lt;").replace('>', "&gt;").replace("\n", "<br>")
# Format a list of FailedRequest's
def format_errors(failures):
return list(map(lambda failure: failure.format(), failures))
def format_errors_html(failures):
return list(map(lambda failure: failure.format_html(), failures))
# Helper class to store paginated results
# self.data is a LIST of stuff, or a single error. When multiple errors can occur, see MultipleResults instead
# TODO: Maybe rename PaginatedResult (without plural) to indicate it's only one request
class PaginatedResults:
def __init__(self, total, data):
self.total = total
self.data = data
# Helper class to store multiple, potentially paginated, results
# Each of which can be failed or succeeded independently
class MultipleResults:
# Initialize me by calling MultipleResults().merge_with(map(lambda x: api.foo(x), entries))
def __init__(self):
self.successes = []
self.failures = []
# Helper function to insert/log an error
def error(self, reason):
print("[ERROR] %s" % reason)
self.failures.append(FailedRequest(reason))
return self
# Insert a single PaginatedResults instance into the current MultipleResults
def insert_paginated(self, result):
if isinstance(result, FailedRequest):
self.failures.append(result)
return self
if not isinstance(result, PaginatedResults):
return self.error("PROGRAMMING ERROR: MultipleResults.insert only takes a single FailedRequest or PaginatedResults")
# Now we have many items from that PaginatedResults to insert into successes
self.successes.extend(result.data)
return self
# Inserts a single successful result into the current MultipleResults
# TODO: Should make more typesafe by having a SuccessfulResult type
def insert(self, result):
if isinstance(result, FailedRequest):
self.failures.append(result)
return self
if isinstance(result, PaginatedResults):
return self.error("PROGRAMMING ERROR: MultipleRequests.insert only takes a single successful or FailedRequest. PaginatedResults should be inserted with MultipleRequests.insert_paginated")
self.successes.append(result)
# Merge successes and failures with another MultipleResults
def merge_with(self, results):
if not isinstance(results, MultipleResults):
return self.error("PROGRAMMING ERROR: MultipleResults.merge_with should only be called with another MultipleResults")
self.successes.extend(results.successes)
self.failures.extend(results.failures)
return self
# --- END REQUEST UTILS ---
# Helper class for using caches, you can use any other cache that implements the same API (get/set)
# Default TTL: 3600s (1h)
class Cache:
def __init__(self, ttl=3600):
self.dict = {}
self.ttl = ttl
# Use ttl=0 to disable
def get(self, key, ttl=None):
if ttl == None: ttl = self.ttl
if key in self.dict:
last_time_updated = (self.dict[key])[1]
time_diff = datetime.now() - last_time_updated
if time_diff.total_seconds() > ttl:
# Outdated data
return None
# Data still valid according to TTL
return (self.dict[key])[0]
else:
# No data
return None
def set(self, key, value):
self.dict[key] = [ value, datetime.now() ]
# Takes a successful plaintext response and parses it as HTML to extract the title
def html_title(content):
soup = BeautifulSoup(content, "lxml")
title = soup.find('title')
if title:
return title.text
else:
return "PeerTube Instance"
# Takes a successfully-parsed JSON response and extracts data/total from it to build pagination
def paginator(response):
if "data" not in response or "total" not in response:
return FailedRequest("The API response provided to paginator appears not to be paginated")
# TODO: check that total is an integer and that it's greater than len(data)
#return response["total"], response["data"]
return PaginatedResults(response["total"], response["data"])
class API:
# The PeertubeAPI is initialized with a caching backend and a default TTL, that can be overriden in specific
# API request calls. The caching backend should implement a get(key, ttl) and set(key, value) API.
# Also can specify the SepiaSearch instance here
# NOTE: The whole API must be oriented for ordered, not named arguments, because they would be tricky to implement
# in Context.insert_future_result()
def __init__(self, cache, ttl=3600, search="https://search.joinpeertube.org"):
self.cache = cache
self.ttl = ttl
# If search instance has no protocol set, assume https://
if not search.startswith("http"): search = "https://" + search
# Remove trailing slash
if search.endswith('/'): self.search_source = search[0:-1]
else: self.search_source = search
# Instance search API expects a filter and a sorting criteria. Here we provide
# higher-level categories that match requested URIs: trending, most-liked, recently-added, local
self.instance_filters = {
"trending": (None, "-trending"),
"most-liked": (None, "-likes"),
"local": ("local", "-publishedAt"),
"recently-added": (None, "-publishedAt"),
}
# Wrapper around requests.get() so that it cannot fail
# Usually, you want to call self.request(), which is an even higher-level wrapper
# If the request succeeds:
# - parse as JSON, or return None
# - if "error" field in JSON, return None
# - return the parsed JSON
# Else: return None
# In all cases where the function returns None, the errors are logged
# Use ttl=0 to disable Cache for this request
# Only use with JSON requests, otherwise use self.(plaintext_request)
def json_request(self, url):
print("[DEBUG] Requesting JSON URL %s" % url)
try:
# If serialization fails, we'll end up in `except` block
parsed_response = json.loads(requests.get(url).text)
if "error" in parsed_response:
print("[WARN] Remote peertube returned error for %s:\n%s" % (url, parsed_response["error"]))
return FailedRequest("Remote peertube server returned an error for URL %s:\n%s" % (url, parsed_response["error"]))
return parsed_response
except Exception as e:
print("[WARN] Error fetching page \"%s\":\n%s" % (url, e))
return FailedRequest(e)
# Wrapper around requests.get() so that it cannot fail
# Use with plaintext requests, for JSON requests use self.request()
def plaintext_request(self, url):
print("[DEBUG] Requesting plaintext URL %s" % url)
try:
return requests.get(url).text
except Exception as e:
print("[WARN] Error fetching page \"%s\":\n%s" % (url, e))
return FailedRequest(e)
# Useful wrapper method to reduce boilerplate
# args: parameters tuple to form the cache key
# url: string template to form URL from args tuple, where ${i} is arg[i]
# key: Key to extract from a successful response
# backend: the method to use for fetching URL (default: self.json_request), can be self.plaintext_request
# extractor: a lambda function to execute to extract stuff from a successful request, when key isn't set
def request(self, args, url, key=None, backend=None, extractor=None, ttl=None):
# WTF python? '/'.join(("foo)) => "f/o/o", not "foo"?! Special case when only one arg
if isinstance(args, str): args = (args, )
cache_key = '/'.join(args)
cached = self.cached(cache_key, ttl=ttl)
if cached == None:
# Defaults to making a JSON API request
if backend == None: backend = self.json_request
for i in range(0, len(args)):
url = url.replace("${%s}" % str(i), args[i])
res = backend(url)
if not isinstance(res, FailedRequest):
if key != None:
# Extract requested key from successful request
res = res[key]
elif extractor != None:
# Run extractor on result from successful request
res = extractor(res)
self.save(cache_key, res)
return res
return cached
# Returns an entry from cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name
def cached(self, key, ttl=None):
if ttl == None: ttl = self.ttl
caller = sys._getframe(2).f_code.co_name
key_name = "peertube-" + caller + "-" + key
res = self.cache.get(key_name, ttl=ttl)
if res == None:
print("[CACHE] Entry not found for %s: %s" % (caller, key))
return None
print("[CACHE] Found entry for %s: %s" % (caller, key))
return res
# Save an entry into cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name
def save(self, key, value):
# TODO: Maybe now that we call from two levels above (request_wrapper->request->save) we need to adapt the frame fetched
caller = sys._getframe(2).f_code.co_name
key_name = "peertube-" + caller + "-" + key
print("[CACHE] Saving entry for %s: %s" % (caller, key))
self.cache.set(key_name, value)
# Fetch instance name from its HTML source
def instance_name(self, domain, ttl=None):
return self.request(
(domain),
"https://${0}",
backend=self.plaintext_request,
extractor=html_title
)
# Search the configured self.search_source for `query`, returning `count` items after `start`
# NOTE: Returns a PaginatedResults upon success
def search(self, query, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), self.search_source, query),
# self.search_source already has protocol pre-pended
"${2}/api/v1/search/videos?start=${0}&count=${1}&search=${3}",
extractor=paginator
)
# Search a specific Peertube instance for `query`,
# returning `count` items after `start`. Slightly different URL format from SepiaSearch (self.search())
# NOTE: Returns a PaginatedResults upon success
def search_instance(self, domain, term, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), domain, term),
"https://${2}/api/v1/search/videos?start=${0}&count=${1}&search=${3}&sort=-match&searchTarget=local",
extractor=paginator
)
# Default category is local, other categories are: trending, most-liked, recently-added, local
# See self.instance_filters in self.__init()__ for sorting/filtering detail
# NOTE: Returns a PaginatedResults upon success
def instance_videos(self, domain, start=0, count=10, category="local", ttl=None):
if not category in self.instance_filters:
return FailedRequest("instance_videos called with bogus filter: %s" % category)
filt, sort = self.instance_filters[category]
url = "https://${2}/api/v1/videos?sort=" + sort + "&start=${0}&count=${1}"
if filt: url += "&filter=" + filt
return self.request(
(str(start), str(count), domain, category),
url,
extractor=paginator
)
def video(self, domain, id, ttl=None):
return self.request(
(domain, id),
"https://${0}/api/v1/videos/${1}",
)
def video_captions(self, domain, id, ttl=None):
return self.request(
(domain, id),
"https://${0}/api/v1/videos/${1}/captions",
# NOTE: Captions look like paginated content because they have 'total' and 'data' field
# However they are, and that's good, not paginated.
key="data"
)
def video_captions_proxy(self, domain, caption_id, ttl=None):
# URL is hardcoded to prevent further proxying. URL may change with updates, see captions API
# eg. https://kolektiva.media/api/v1/videos/9c9de5e8-0a1e-484a-b099-e80766180a6d/captions
# TODO: What if the captionPath doesn't follow this format on an instance? Should we really proxy ANYTHING returned by API?
return self.request(
(domain, caption_id),
"https://${0}/lazy-static/video-captions/${1}",
backend=self.plaintext_request
)
# NOTE: Returns a PaginatedResults upon success
def video_comments(self, domain, id, start=0, count=10, ttl=None):
return self.request(
(domain, id, str(start), str(count)),
"https://${0}/api/v1/videos/${1}/comment-threads?start=${2}&count=${3}",
extractor=paginator
)
def account(self, domain, name, ttl=None):
return self.request(
(domain, name),
"https://${0}/api/v1/accounts/${1}"
)
# NOTE: Returns a PaginatedResults upon success
def account_channels(self, domain, name, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), domain, name),
"https://${2}/api/v1/accounts/${3}/video-channels?start=${0}&count=${1}",
extractor=paginator
)
# NOTE: Returns a PaginatedResults upon success
def account_videos(self, domain, name, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), domain, name),
"https://${2}/api/v1/accounts/${3}/videos?start=${0}&count=${1}",
extractor=paginator
)
# Fetch information about multiple accounts, returned as a MultipleResults
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
def accounts(self, accounts, ttl=None):
results = MultipleResults()
for account in accounts:
results.insert(self.account(account.domain, account.name))
return results
# Fetch latest videos from multiple accounts, returned as MultipleResults
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
def accounts_videos(self, accounts, limit=None, sort=True, ttl=None):
api_limit = 10 if limit == None else limit
results = MultipleResults()
for account in accounts:
results.insert_paginated(self.account_videos(account.domain, account.name, count=api_limit))
if limit or sort:
# We also sort when limit is set, because otherwise limit will discard useful information
results.successes.sort(key = lambda vid: dateutil.isoparse(vid["createdAt"]), reverse=True)
if limit: results.successes = results.successes[0:limit]
return results
def channel(self, domain, name, ttl=None):
return self.request(
(domain, name),
"https://${0}/api/v1/video-channels/${1}"
)
# NOTE: Returns a PaginatedResults upon success
def channel_videos(self, domain, name, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), domain, name),
"https://${2}/api/v1/video-channels/${3}/videos?start=${0}&count=${1}",
extractor=paginator
)
# NOTE: Returns a PaginatedResults upon success
def channel_playlists(self, domain, name, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), domain, name),
"https://${2}/api/v1/video-channels/${3}/video-playlists?start=${0}&count=${1}",
extractor=paginator
)
# List of detailed info about local channel subscriptions
# Fetch information about multiple channels, returned as a MultipleResults
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
def channels(self, channels, ttl=None):
results = MultipleResults()
for channel in channels:
results.insert(self.channel(channel.domain, channel.name))
return results
# Fetch latest videos from multiple channels, returned as MultipleResults
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
def channels_videos(self, channels, limit=None, sort=True, ttl=None):
api_limit = 10 if limit == None else limit
results = MultipleResults()
for channel in channels:
results.insert_paginated(self.channel_videos(channel.domain, channel.name, count=api_limit))
if limit or sort:
# We also sort when limit is set, because otherwise limit will discard useful information
results.successes.sort(key = lambda vid: dateutil.isoparse(vid["createdAt"]), reverse=True)
if limit: results.successes = results.successes[0:limit]
return results
# Extra information about video, not contained directly in API result
# a is video result
class VideoInfo:
def __init__(self, api, a, args):
# If the video is being built from a failed request, return that request instead
if isinstance(a, FailedRequest):
print("[ERROR] A video request failed, yet you called VideoInfo about it. You should probably not make useless requests.")
return FailedRequest("A previous video request failed, not attempting to fetch extra information about it.")
quality = args.get("quality")
self.resolutions = []
self.video = None
self.files = a["files"]
if len(self.files) == 0:
self.files = ((a["streamingPlaylists"])[0])["files"]
self.default_res = None
for entry in self.files:
resolution = (entry["resolution"])["id"]
self.resolutions.append(entry["resolution"])
# chose the default quality
if resolution != 0 and quality == None:
if self.default_res == None:
self.default_res = resolution
self.video = entry["fileUrl"]
elif abs(720 - resolution) < abs(720 - self.default_res):
self.default_res = resolution
self.video = entry["fileUrl"]
if str(resolution) == str(quality):
self.video = entry["fileUrl"]
if quality == None:
self.quality = self.default_res
else:
self.quality = quality
self.no_quality_selected = not self.video
# --- IDENTIFIERS HELPERS ---
# This class Account can be either an actual user account or a channel
# TODO: Find another name to express that... Maybe Subscription? Maybe Identifier?
class Account:
def __init__(self, name, domain):
self.name = name
self.domain = domain
# Maybe useful?
def __str__(self):
return self.name + "@" + self.domain
# For debug prints
def __repr__(self):
return self.name + "@" + self.domain
# Builds an Account from one of the following syntaxes, additionally stripping extra whitespace and ignoring `#` as comments:
# - id@server
# - @id@server
# - http(s)://server/c/id
# - http(s)://server/a/id
# returns an Account instance
def parse(identifier):
identifier = identifier.split('#')[0].strip()
# Comment line is returned as empty string
if identifier == '': return None
requested_identifier = identifier
if identifier.startswith('http'):
identifier = identifier[4:]
# HTTPS?
if identifier.startswith('s'): identifier = identifier[1:]
# Remove ://
identifier = identifier[3:]
parts = identifier.split('/')
if len(parts) < 3:
print("[ERROR] Misformed URL for identifier, not enough components: %s" % requested_identifier)
return None
if parts[1] == 'a' or parts[1] == 'c':
# Account or channel found, take the next part
return Account(parts[2], parts[0])
print("[ERROR] Identifier not understood: %s" % requested_identifier)
return None
# Not an HTTP URL, we assume user@server or @user@server address
return unsafe_account_parser(identifier)
# Takes an account in the [@]user@server form and makes sure it makes sense
# Returns an Account in that case, None otherwise
# This function does not further sanitize the string (eg. strip whitespace or remove comments), please use
# Account.parse() for that, which also supports more account formats
def unsafe_account_parser(identifier):
requested_identifier = identifier
# Remove first @, if any
if identifier.startswith('@'): identifier = identifier[1:]
parts = identifier.split('@')
if len(parts) != 2:
print("[ERROR] Wrong identifier, because we are expecting a single '@': %s" % requested_identifier)
return None
if len(parts[0]) == 0 or len(parts[1]) == 0:
print("[ERROR] Wrong identifier, because one part is empty before/after '@': %s" % requested_identifier)
return None
return Account(parts[0], parts[1])
# --- END IDENTIFIERS HELPERS ---