507 lines
22 KiB
Python
507 lines
22 KiB
Python
from bs4 import BeautifulSoup
|
|
import requests
|
|
import json
|
|
import sys
|
|
from datetime import datetime
|
|
from dateutil import parser as dateutil
|
|
|
|
# --- REQUEST UTILS ---
|
|
# Semantic class to store remote errors
|
|
class FailedRequest:
|
|
def __init__(self, e):
|
|
self.exception = e
|
|
|
|
def format(self):
|
|
# If it's a rawtext error, print it
|
|
# Otherwise look for 'message' attribute
|
|
# Otherwise ask python to represent the exception
|
|
if isinstance(self.exception, str):
|
|
return self.exception
|
|
else: return getattr(self.exception, 'message', repr(self.exception))
|
|
|
|
# Strip < and > symbols to prevent content injection,
|
|
# and replace newlines with HTML line breaks <br>
|
|
def format_html(self):
|
|
return self.format().replace('<', "<").replace('>', ">").replace("\n", "<br>")
|
|
|
|
# Format a list of FailedRequest's
|
|
def format_errors(failures):
|
|
return list(map(lambda failure: failure.format(), failures))
|
|
|
|
def format_errors_html(failures):
|
|
return list(map(lambda failure: failure.format_html(), failures))
|
|
|
|
# Helper class to store paginated results
|
|
# self.data is a LIST of stuff, or a single error. When multiple errors can occur, see MultipleResults instead
|
|
# TODO: Maybe rename PaginatedResult (without plural) to indicate it's only one request
|
|
class PaginatedResults:
|
|
def __init__(self, total, data):
|
|
self.total = total
|
|
self.data = data
|
|
|
|
# Helper class to store multiple, potentially paginated, results
|
|
# Each of which can be failed or succeeded independently
|
|
class MultipleResults:
|
|
# Initialize me by calling MultipleResults().merge_with(map(lambda x: api.foo(x), entries))
|
|
def __init__(self):
|
|
self.successes = []
|
|
self.failures = []
|
|
|
|
# Helper function to insert/log an error
|
|
def error(self, reason):
|
|
print("[ERROR] %s" % reason)
|
|
self.failures.append(FailedRequest(reason))
|
|
return self
|
|
|
|
# Insert a single PaginatedResults instance into the current MultipleResults
|
|
def insert_paginated(self, result):
|
|
if isinstance(result, FailedRequest):
|
|
self.failures.append(result)
|
|
return self
|
|
if not isinstance(result, PaginatedResults):
|
|
return self.error("PROGRAMMING ERROR: MultipleResults.insert only takes a single FailedRequest or PaginatedResults")
|
|
# Now we have many items from that PaginatedResults to insert into successes
|
|
self.successes.extend(result.data)
|
|
return self
|
|
|
|
# Inserts a single successful result into the current MultipleResults
|
|
# TODO: Should make more typesafe by having a SuccessfulResult type
|
|
def insert(self, result):
|
|
if isinstance(result, FailedRequest):
|
|
self.failures.append(result)
|
|
return self
|
|
if isinstance(result, PaginatedResults):
|
|
return self.error("PROGRAMMING ERROR: MultipleRequests.insert only takes a single successful or FailedRequest. PaginatedResults should be inserted with MultipleRequests.insert_paginated")
|
|
self.successes.append(result)
|
|
|
|
# Merge successes and failures with another MultipleResults
|
|
def merge_with(self, results):
|
|
if not isinstance(results, MultipleResults):
|
|
return self.error("PROGRAMMING ERROR: MultipleResults.merge_with should only be called with another MultipleResults")
|
|
self.successes.extend(results.successes)
|
|
self.failures.extend(results.failures)
|
|
return self
|
|
|
|
# --- END REQUEST UTILS ---
|
|
|
|
|
|
# Helper class for using caches, you can use any other cache that implements the same API (get/set)
|
|
# Default TTL: 3600s (1h)
|
|
class Cache:
|
|
def __init__(self, ttl=3600):
|
|
self.dict = {}
|
|
self.ttl = ttl
|
|
|
|
# Use ttl=0 to disable
|
|
def get(self, key, ttl=None):
|
|
if ttl == None: ttl = self.ttl
|
|
if key in self.dict:
|
|
last_time_updated = (self.dict[key])[1]
|
|
time_diff = datetime.now() - last_time_updated
|
|
|
|
if time_diff.total_seconds() > ttl:
|
|
# Outdated data
|
|
return None
|
|
# Data still valid according to TTL
|
|
return (self.dict[key])[0]
|
|
|
|
else:
|
|
# No data
|
|
return None
|
|
|
|
def set(self, key, value):
|
|
self.dict[key] = [ value, datetime.now() ]
|
|
|
|
# Takes a successful plaintext response and parses it as HTML to extract the title
|
|
def html_title(content):
|
|
soup = BeautifulSoup(content, "lxml")
|
|
title = soup.find('title')
|
|
if title:
|
|
return title.text
|
|
else:
|
|
return "PeerTube Instance"
|
|
|
|
# Takes a successfully-parsed JSON response and extracts data/total from it to build pagination
|
|
def paginator(response):
|
|
if "data" not in response or "total" not in response:
|
|
return FailedRequest("The API response provided to paginator appears not to be paginated")
|
|
# TODO: check that total is an integer and that it's greater than len(data)
|
|
#return response["total"], response["data"]
|
|
return PaginatedResults(response["total"], response["data"])
|
|
|
|
class API:
|
|
# The PeertubeAPI is initialized with a caching backend and a default TTL, that can be overriden in specific
|
|
# API request calls. The caching backend should implement a get(key, ttl) and set(key, value) API.
|
|
# Also can specify the SepiaSearch instance here
|
|
# NOTE: The whole API must be oriented for ordered, not named arguments, because they would be tricky to implement
|
|
# in Context.insert_future_result()
|
|
def __init__(self, cache, ttl=3600, search="https://search.joinpeertube.org"):
|
|
self.cache = cache
|
|
self.ttl = ttl
|
|
# If search instance has no protocol set, assume https://
|
|
if not search.startswith("http"): search = "https://" + search
|
|
# Remove trailing slash
|
|
if search.endswith('/'): self.search_source = search[0:-1]
|
|
else: self.search_source = search
|
|
# Instance search API expects a filter and a sorting criteria. Here we provide
|
|
# higher-level categories that match requested URIs: trending, most-liked, recently-added, local
|
|
self.instance_filters = {
|
|
"trending": (None, "-trending"),
|
|
"most-liked": (None, "-likes"),
|
|
"local": ("local", "-publishedAt"),
|
|
"recently-added": (None, "-publishedAt"),
|
|
}
|
|
|
|
# Wrapper around requests.get() so that it cannot fail
|
|
# Usually, you want to call self.request(), which is an even higher-level wrapper
|
|
# If the request succeeds:
|
|
# - parse as JSON, or return None
|
|
# - if "error" field in JSON, return None
|
|
# - return the parsed JSON
|
|
# Else: return None
|
|
# In all cases where the function returns None, the errors are logged
|
|
# Use ttl=0 to disable Cache for this request
|
|
# Only use with JSON requests, otherwise use self.(plaintext_request)
|
|
def json_request(self, url):
|
|
print("[DEBUG] Requesting JSON URL %s" % url)
|
|
try:
|
|
# If serialization fails, we'll end up in `except` block
|
|
parsed_response = json.loads(requests.get(url).text)
|
|
if "error" in parsed_response:
|
|
print("[WARN] Remote peertube returned error for %s:\n%s" % (url, parsed_response["error"]))
|
|
return FailedRequest("Remote peertube server returned an error for URL %s:\n%s" % (url, parsed_response["error"]))
|
|
return parsed_response
|
|
except Exception as e:
|
|
print("[WARN] Error fetching page \"%s\":\n%s" % (url, e))
|
|
return FailedRequest(e)
|
|
|
|
# Wrapper around requests.get() so that it cannot fail
|
|
# Use with plaintext requests, for JSON requests use self.request()
|
|
def plaintext_request(self, url):
|
|
print("[DEBUG] Requesting plaintext URL %s" % url)
|
|
try:
|
|
return requests.get(url).text
|
|
except Exception as e:
|
|
print("[WARN] Error fetching page \"%s\":\n%s" % (url, e))
|
|
return FailedRequest(e)
|
|
|
|
# Useful wrapper method to reduce boilerplate
|
|
# args: parameters tuple to form the cache key
|
|
# url: string template to form URL from args tuple, where ${i} is arg[i]
|
|
# key: Key to extract from a successful response
|
|
# backend: the method to use for fetching URL (default: self.json_request), can be self.plaintext_request
|
|
# extractor: a lambda function to execute to extract stuff from a successful request, when key isn't set
|
|
def request(self, args, url, key=None, backend=None, extractor=None, ttl=None):
|
|
# WTF python? '/'.join(("foo)) => "f/o/o", not "foo"?! Special case when only one arg
|
|
if isinstance(args, str): args = (args, )
|
|
cache_key = '/'.join(args)
|
|
cached = self.cached(cache_key, ttl=ttl)
|
|
if cached == None:
|
|
# Defaults to making a JSON API request
|
|
if backend == None: backend = self.json_request
|
|
for i in range(0, len(args)):
|
|
url = url.replace("${%s}" % str(i), args[i])
|
|
res = backend(url)
|
|
if not isinstance(res, FailedRequest):
|
|
if key != None:
|
|
# Extract requested key from successful request
|
|
res = res[key]
|
|
elif extractor != None:
|
|
# Run extractor on result from successful request
|
|
res = extractor(res)
|
|
self.save(cache_key, res)
|
|
return res
|
|
return cached
|
|
|
|
# Returns an entry from cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name
|
|
def cached(self, key, ttl=None):
|
|
if ttl == None: ttl = self.ttl
|
|
caller = sys._getframe(2).f_code.co_name
|
|
key_name = "peertube-" + caller + "-" + key
|
|
res = self.cache.get(key_name, ttl=ttl)
|
|
if res == None:
|
|
print("[CACHE] Entry not found for %s: %s" % (caller, key))
|
|
return None
|
|
print("[CACHE] Found entry for %s: %s" % (caller, key))
|
|
return res
|
|
|
|
# Save an entry into cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name
|
|
def save(self, key, value):
|
|
# TODO: Maybe now that we call from two levels above (request_wrapper->request->save) we need to adapt the frame fetched
|
|
caller = sys._getframe(2).f_code.co_name
|
|
key_name = "peertube-" + caller + "-" + key
|
|
print("[CACHE] Saving entry for %s: %s" % (caller, key))
|
|
self.cache.set(key_name, value)
|
|
|
|
# Fetch instance name from its HTML source
|
|
def instance_name(self, domain, ttl=None):
|
|
return self.request(
|
|
(domain),
|
|
"https://${0}",
|
|
backend=self.plaintext_request,
|
|
extractor=html_title
|
|
)
|
|
|
|
# Search the configured self.search_source for `query`, returning `count` items after `start`
|
|
# NOTE: Returns a PaginatedResults upon success
|
|
def search(self, query, start=0, count=10, ttl=None):
|
|
return self.request(
|
|
(str(start), str(count), self.search_source, query),
|
|
# self.search_source already has protocol pre-pended
|
|
"${2}/api/v1/search/videos?start=${0}&count=${1}&search=${3}",
|
|
extractor=paginator
|
|
)
|
|
|
|
# Search a specific Peertube instance for `query`,
|
|
# returning `count` items after `start`. Slightly different URL format from SepiaSearch (self.search())
|
|
# NOTE: Returns a PaginatedResults upon success
|
|
def search_instance(self, domain, term, start=0, count=10, ttl=None):
|
|
return self.request(
|
|
(str(start), str(count), domain, term),
|
|
"https://${2}/api/v1/search/videos?start=${0}&count=${1}&search=${3}&sort=-match&searchTarget=local",
|
|
extractor=paginator
|
|
)
|
|
|
|
# Default category is local, other categories are: trending, most-liked, recently-added, local
|
|
# See self.instance_filters in self.__init()__ for sorting/filtering detail
|
|
# NOTE: Returns a PaginatedResults upon success
|
|
def instance_videos(self, domain, start=0, count=10, category="local", ttl=None):
|
|
if not category in self.instance_filters:
|
|
return FailedRequest("instance_videos called with bogus filter: %s" % category)
|
|
filt, sort = self.instance_filters[category]
|
|
url = "https://${2}/api/v1/videos?sort=" + sort + "&start=${0}&count=${1}"
|
|
if filt: url += "&filter=" + filt
|
|
return self.request(
|
|
(str(start), str(count), domain, category),
|
|
url,
|
|
extractor=paginator
|
|
)
|
|
|
|
def video(self, domain, id, ttl=None):
|
|
return self.request(
|
|
(domain, id),
|
|
"https://${0}/api/v1/videos/${1}",
|
|
)
|
|
|
|
def video_captions(self, domain, id, ttl=None):
|
|
return self.request(
|
|
(domain, id),
|
|
"https://${0}/api/v1/videos/${1}/captions",
|
|
# NOTE: Captions look like paginated content because they have 'total' and 'data' field
|
|
# However they are, and that's good, not paginated.
|
|
key="data"
|
|
)
|
|
|
|
def video_captions_proxy(self, domain, caption_id, ttl=None):
|
|
# URL is hardcoded to prevent further proxying. URL may change with updates, see captions API
|
|
# eg. https://kolektiva.media/api/v1/videos/9c9de5e8-0a1e-484a-b099-e80766180a6d/captions
|
|
# TODO: What if the captionPath doesn't follow this format on an instance? Should we really proxy ANYTHING returned by API?
|
|
return self.request(
|
|
(domain, caption_id),
|
|
"https://${0}/lazy-static/video-captions/${1}",
|
|
backend=self.plaintext_request
|
|
)
|
|
|
|
# NOTE: Returns a PaginatedResults upon success
|
|
def video_comments(self, domain, id, start=0, count=10, ttl=None):
|
|
return self.request(
|
|
(domain, id, str(start), str(count)),
|
|
"https://${0}/api/v1/videos/${1}/comment-threads?start=${2}&count=${3}",
|
|
extractor=paginator
|
|
)
|
|
|
|
def account(self, domain, name, ttl=None):
|
|
return self.request(
|
|
(domain, name),
|
|
"https://${0}/api/v1/accounts/${1}"
|
|
)
|
|
|
|
# NOTE: Returns a PaginatedResults upon success
|
|
def account_channels(self, domain, name, start=0, count=10, ttl=None):
|
|
return self.request(
|
|
(str(start), str(count), domain, name),
|
|
"https://${2}/api/v1/accounts/${3}/video-channels?start=${0}&count=${1}",
|
|
extractor=paginator
|
|
)
|
|
|
|
# NOTE: Returns a PaginatedResults upon success
|
|
def account_videos(self, domain, name, start=0, count=10, ttl=None):
|
|
return self.request(
|
|
(str(start), str(count), domain, name),
|
|
"https://${2}/api/v1/accounts/${3}/videos?start=${0}&count=${1}",
|
|
extractor=paginator
|
|
)
|
|
|
|
# Fetch information about multiple accounts, returned as a MultipleResults
|
|
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
|
|
def accounts(self, accounts, ttl=None):
|
|
results = MultipleResults()
|
|
for account in accounts:
|
|
results.insert(self.account(account.domain, account.name))
|
|
return results
|
|
|
|
# Fetch latest videos from multiple accounts, returned as MultipleResults
|
|
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
|
|
def accounts_videos(self, accounts, limit=None, sort=True, ttl=None):
|
|
api_limit = 10 if limit == None else limit
|
|
results = MultipleResults()
|
|
for account in accounts:
|
|
results.insert_paginated(self.account_videos(account.domain, account.name, count=api_limit))
|
|
if limit or sort:
|
|
# We also sort when limit is set, because otherwise limit will discard useful information
|
|
results.successes.sort(key = lambda vid: dateutil.isoparse(vid["createdAt"]), reverse=True)
|
|
if limit: results.successes = results.successes[0:limit]
|
|
return results
|
|
|
|
def channel(self, domain, name, ttl=None):
|
|
return self.request(
|
|
(domain, name),
|
|
"https://${0}/api/v1/video-channels/${1}"
|
|
)
|
|
|
|
# NOTE: Returns a PaginatedResults upon success
|
|
def channel_videos(self, domain, name, start=0, count=10, ttl=None):
|
|
return self.request(
|
|
(str(start), str(count), domain, name),
|
|
"https://${2}/api/v1/video-channels/${3}/videos?start=${0}&count=${1}",
|
|
extractor=paginator
|
|
)
|
|
|
|
# NOTE: Returns a PaginatedResults upon success
|
|
def channel_playlists(self, domain, name, start=0, count=10, ttl=None):
|
|
return self.request(
|
|
(str(start), str(count), domain, name),
|
|
"https://${2}/api/v1/video-channels/${3}/video-playlists?start=${0}&count=${1}",
|
|
extractor=paginator
|
|
)
|
|
|
|
# List of detailed info about local channel subscriptions
|
|
# Fetch information about multiple channels, returned as a MultipleResults
|
|
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
|
|
def channels(self, channels, ttl=None):
|
|
results = MultipleResults()
|
|
for channel in channels:
|
|
results.insert(self.channel(channel.domain, channel.name))
|
|
return results
|
|
|
|
# Fetch latest videos from multiple channels, returned as MultipleResults
|
|
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
|
|
def channels_videos(self, channels, limit=None, sort=True, ttl=None):
|
|
api_limit = 10 if limit == None else limit
|
|
results = MultipleResults()
|
|
for channel in channels:
|
|
results.insert_paginated(self.channel_videos(channel.domain, channel.name, count=api_limit))
|
|
if limit or sort:
|
|
# We also sort when limit is set, because otherwise limit will discard useful information
|
|
results.successes.sort(key = lambda vid: dateutil.isoparse(vid["createdAt"]), reverse=True)
|
|
if limit: results.successes = results.successes[0:limit]
|
|
return results
|
|
|
|
# Extra information about video, not contained directly in API result
|
|
# a is video result
|
|
class VideoInfo:
|
|
def __init__(self, api, a, args):
|
|
# If the video is being built from a failed request, return that request instead
|
|
if isinstance(a, FailedRequest):
|
|
print("[ERROR] A video request failed, yet you called VideoInfo about it. You should probably not make useless requests.")
|
|
return FailedRequest("A previous video request failed, not attempting to fetch extra information about it.")
|
|
|
|
quality = args.get("quality")
|
|
|
|
self.resolutions = []
|
|
self.video = None
|
|
|
|
self.files = a["files"]
|
|
if len(self.files) == 0:
|
|
self.files = ((a["streamingPlaylists"])[0])["files"]
|
|
|
|
self.default_res = None
|
|
|
|
for entry in self.files:
|
|
resolution = (entry["resolution"])["id"]
|
|
self.resolutions.append(entry["resolution"])
|
|
|
|
# chose the default quality
|
|
if resolution != 0 and quality == None:
|
|
if self.default_res == None:
|
|
self.default_res = resolution
|
|
self.video = entry["fileUrl"]
|
|
elif abs(720 - resolution) < abs(720 - self.default_res):
|
|
self.default_res = resolution
|
|
self.video = entry["fileUrl"]
|
|
|
|
if str(resolution) == str(quality):
|
|
self.video = entry["fileUrl"]
|
|
|
|
if quality == None:
|
|
self.quality = self.default_res
|
|
else:
|
|
self.quality = quality
|
|
|
|
self.no_quality_selected = not self.video
|
|
|
|
# --- IDENTIFIERS HELPERS ---
|
|
|
|
# This class Account can be either an actual user account or a channel
|
|
# TODO: Find another name to express that... Maybe Subscription? Maybe Identifier?
|
|
class Account:
|
|
def __init__(self, name, domain):
|
|
self.name = name
|
|
self.domain = domain
|
|
|
|
# Maybe useful?
|
|
def __str__(self):
|
|
return self.name + "@" + self.domain
|
|
|
|
# For debug prints
|
|
def __repr__(self):
|
|
return self.name + "@" + self.domain
|
|
|
|
# Builds an Account from one of the following syntaxes, additionally stripping extra whitespace and ignoring `#` as comments:
|
|
# - id@server
|
|
# - @id@server
|
|
# - http(s)://server/c/id
|
|
# - http(s)://server/a/id
|
|
# returns an Account instance
|
|
def parse(identifier):
|
|
identifier = identifier.split('#')[0].strip()
|
|
# Comment line is returned as empty string
|
|
if identifier == '': return None
|
|
|
|
requested_identifier = identifier
|
|
if identifier.startswith('http'):
|
|
identifier = identifier[4:]
|
|
# HTTPS?
|
|
if identifier.startswith('s'): identifier = identifier[1:]
|
|
# Remove ://
|
|
identifier = identifier[3:]
|
|
parts = identifier.split('/')
|
|
if len(parts) < 3:
|
|
print("[ERROR] Misformed URL for identifier, not enough components: %s" % requested_identifier)
|
|
return None
|
|
if parts[1] == 'a' or parts[1] == 'c':
|
|
# Account or channel found, take the next part
|
|
return Account(parts[2], parts[0])
|
|
print("[ERROR] Identifier not understood: %s" % requested_identifier)
|
|
return None
|
|
# Not an HTTP URL, we assume user@server or @user@server address
|
|
return unsafe_account_parser(identifier)
|
|
|
|
# Takes an account in the [@]user@server form and makes sure it makes sense
|
|
# Returns an Account in that case, None otherwise
|
|
# This function does not further sanitize the string (eg. strip whitespace or remove comments), please use
|
|
# Account.parse() for that, which also supports more account formats
|
|
def unsafe_account_parser(identifier):
|
|
requested_identifier = identifier
|
|
# Remove first @, if any
|
|
if identifier.startswith('@'): identifier = identifier[1:]
|
|
parts = identifier.split('@')
|
|
if len(parts) != 2:
|
|
print("[ERROR] Wrong identifier, because we are expecting a single '@': %s" % requested_identifier)
|
|
return None
|
|
if len(parts[0]) == 0 or len(parts[1]) == 0:
|
|
print("[ERROR] Wrong identifier, because one part is empty before/after '@': %s" % requested_identifier)
|
|
return None
|
|
return Account(parts[0], parts[1])
|
|
# --- END IDENTIFIERS HELPERS ---
|