from bs4 import BeautifulSoup import requests import json import sys from datetime import datetime from dateutil import parser as dateutil # --- REQUEST UTILS --- # Semantic class to store remote errors class FailedRequest: def __init__(self, e): self.exception = e def format(self): # If it's a rawtext error, print it # Otherwise look for 'message' attribute # Otherwise ask python to represent the exception if isinstance(self.exception, str): return self.exception else: return getattr(self.exception, 'message', repr(self.exception)) # Strip < and > symbols to prevent content injection, # and replace newlines with HTML line breaks
def format_html(self): return self.format().replace('<', "<").replace('>', ">").replace("\n", "
") # Format a list of FailedRequest's def format_errors(failures): return list(map(lambda failure: failure.format(), failures)) def format_errors_html(failures): return list(map(lambda failure: failure.format_html(), failures)) # Helper class to store paginated results # self.data is a LIST of stuff, or a single error. When multiple errors can occur, see MultipleResults instead # TODO: Maybe rename PaginatedResult (without plural) to indicate it's only one request class PaginatedResults: def __init__(self, total, data): self.total = total self.data = data # Helper class to store multiple, potentially paginated, results # Each of which can be failed or succeeded independently class MultipleResults: # Initialize me by calling MultipleResults().merge_with(map(lambda x: api.foo(x), entries)) def __init__(self): self.successes = [] self.failures = [] # Helper function to insert/log an error def error(self, reason): print("[ERROR] %s" % reason) self.failures.append(FailedRequest(reason)) return self # Insert a single PaginatedResults instance into the current MultipleResults def insert_paginated(self, result): if isinstance(result, FailedRequest): self.failures.append(result) return self if not isinstance(result, PaginatedResults): return self.error("PROGRAMMING ERROR: MultipleResults.insert only takes a single FailedRequest or PaginatedResults") # Now we have many items from that PaginatedResults to insert into successes self.successes.extend(result.data) return self # Inserts a single successful result into the current MultipleResults # TODO: Should make more typesafe by having a SuccessfulResult type def insert(self, result): if isinstance(result, FailedRequest): self.failures.append(result) return self if isinstance(result, PaginatedResults): return self.error("PROGRAMMING ERROR: MultipleRequests.insert only takes a single successful or FailedRequest. PaginatedResults should be inserted with MultipleRequests.insert_paginated") self.successes.append(result) # Merge successes and failures with another MultipleResults def merge_with(self, results): if not isinstance(results, MultipleResults): return self.error("PROGRAMMING ERROR: MultipleResults.merge_with should only be called with another MultipleResults") self.successes.extend(results.successes) self.failures.extend(results.failures) return self # --- END REQUEST UTILS --- # Helper class for using caches, you can use any other cache that implements the same API (get/set) # Default TTL: 3600s (1h) class Cache: def __init__(self, ttl=3600): self.dict = {} self.ttl = ttl # Use ttl=0 to disable def get(self, key, ttl=None): if ttl == None: ttl = self.ttl if key in self.dict: last_time_updated = (self.dict[key])[1] time_diff = datetime.now() - last_time_updated if time_diff.total_seconds() > ttl: # Outdated data return None # Data still valid according to TTL return (self.dict[key])[0] else: # No data return None def set(self, key, value): self.dict[key] = [ value, datetime.now() ] # Takes a successful plaintext response and parses it as HTML to extract the title def html_title(content): soup = BeautifulSoup(content, "lxml") title = soup.find('title') if title: return title.text else: return "PeerTube Instance" # Takes a successfully-parsed JSON response and extracts data/total from it to build pagination def paginator(response): if "data" not in response or "total" not in response: return FailedRequest("The API response provided to paginator appears not to be paginated") # TODO: check that total is an integer and that it's greater than len(data) #return response["total"], response["data"] return PaginatedResults(response["total"], response["data"]) class API: # The PeertubeAPI is initialized with a caching backend and a default TTL, that can be overriden in specific # API request calls. The caching backend should implement a get(key, ttl) and set(key, value) API. # Also can specify the SepiaSearch instance here # NOTE: The whole API must be oriented for ordered, not named arguments, because they would be tricky to implement # in Context.insert_future_result() def __init__(self, cache, ttl=3600, search="https://search.joinpeertube.org"): self.cache = cache self.ttl = ttl # If search instance has no protocol set, assume https:// if not search.startswith("http"): search = "https://" + search # Remove trailing slash if search.endswith('/'): self.search_source = search[0:-1] else: self.search_source = search # Instance search API expects a filter and a sorting criteria. Here we provide # higher-level categories that match requested URIs: trending, most-liked, recently-added, local self.instance_filters = { "trending": (None, "-trending"), "most-liked": (None, "-likes"), "local": ("local", "-publishedAt"), "recently-added": (None, "-publishedAt"), } # Wrapper around requests.get() so that it cannot fail # Usually, you want to call self.request(), which is an even higher-level wrapper # If the request succeeds: # - parse as JSON, or return None # - if "error" field in JSON, return None # - return the parsed JSON # Else: return None # In all cases where the function returns None, the errors are logged # Use ttl=0 to disable Cache for this request # Only use with JSON requests, otherwise use self.(plaintext_request) def json_request(self, url): print("[DEBUG] Requesting JSON URL %s" % url) try: # If serialization fails, we'll end up in `except` block parsed_response = json.loads(requests.get(url).text) if "error" in parsed_response: print("[WARN] Remote peertube returned error for %s:\n%s" % (url, parsed_response["error"])) return FailedRequest("Remote peertube server returned an error for URL %s:\n%s" % (url, parsed_response["error"])) return parsed_response except Exception as e: print("[WARN] Error fetching page \"%s\":\n%s" % (url, e)) return FailedRequest(e) # Wrapper around requests.get() so that it cannot fail # Use with plaintext requests, for JSON requests use self.request() def plaintext_request(self, url): print("[DEBUG] Requesting plaintext URL %s" % url) try: return requests.get(url).text except Exception as e: print("[WARN] Error fetching page \"%s\":\n%s" % (url, e)) return FailedRequest(e) # Useful wrapper method to reduce boilerplate # args: parameters tuple to form the cache key # url: string template to form URL from args tuple, where ${i} is arg[i] # key: Key to extract from a successful response # backend: the method to use for fetching URL (default: self.json_request), can be self.plaintext_request # extractor: a lambda function to execute to extract stuff from a successful request, when key isn't set def request(self, args, url, key=None, backend=None, extractor=None, ttl=None): # WTF python? '/'.join(("foo)) => "f/o/o", not "foo"?! Special case when only one arg if isinstance(args, str): args = (args, ) cache_key = '/'.join(args) cached = self.cached(cache_key, ttl=ttl) if cached == None: # Defaults to making a JSON API request if backend == None: backend = self.json_request for i in range(0, len(args)): url = url.replace("${%s}" % str(i), args[i]) res = backend(url) if not isinstance(res, FailedRequest): if key != None: # Extract requested key from successful request res = res[key] elif extractor != None: # Run extractor on result from successful request res = extractor(res) self.save(cache_key, res) return res return cached # Returns an entry from cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name def cached(self, key, ttl=None): if ttl == None: ttl = self.ttl caller = sys._getframe(2).f_code.co_name key_name = "peertube-" + caller + "-" + key res = self.cache.get(key_name, ttl=ttl) if res == None: print("[CACHE] Entry not found for %s: %s" % (caller, key)) return None print("[CACHE] Found entry for %s: %s" % (caller, key)) return res # Save an entry into cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name def save(self, key, value): # TODO: Maybe now that we call from two levels above (request_wrapper->request->save) we need to adapt the frame fetched caller = sys._getframe(2).f_code.co_name key_name = "peertube-" + caller + "-" + key print("[CACHE] Saving entry for %s: %s" % (caller, key)) self.cache.set(key_name, value) # Fetch instance name from its HTML source def instance_name(self, domain, ttl=None): return self.request( (domain), "https://${0}", backend=self.plaintext_request, extractor=html_title ) # Search the configured self.search_source for `query`, returning `count` items after `start` # NOTE: Returns a PaginatedResults upon success def search(self, query, start=0, count=10, ttl=None): return self.request( (str(start), str(count), self.search_source, query), # self.search_source already has protocol pre-pended "${2}/api/v1/search/videos?start=${0}&count=${1}&search=${3}", extractor=paginator ) # Search a specific Peertube instance for `query`, # returning `count` items after `start`. Slightly different URL format from SepiaSearch (self.search()) # NOTE: Returns a PaginatedResults upon success def search_instance(self, domain, term, start=0, count=10, ttl=None): return self.request( (str(start), str(count), domain, term), "https://${2}/api/v1/search/videos?start=${0}&count=${1}&search=${3}&sort=-match&searchTarget=local", extractor=paginator ) # Default category is local, other categories are: trending, most-liked, recently-added, local # See self.instance_filters in self.__init()__ for sorting/filtering detail # NOTE: Returns a PaginatedResults upon success def instance_videos(self, domain, start=0, count=10, category="local", ttl=None): if not category in self.instance_filters: return FailedRequest("instance_videos called with bogus filter: %s" % category) filt, sort = self.instance_filters[category] url = "https://${2}/api/v1/videos?sort=" + sort + "&start=${0}&count=${1}" if filt: url += "&filter=" + filt return self.request( (str(start), str(count), domain, category), url, extractor=paginator ) def video(self, domain, id, ttl=None): return self.request( (domain, id), "https://${0}/api/v1/videos/${1}", ) def video_captions(self, domain, id, ttl=None): return self.request( (domain, id), "https://${0}/api/v1/videos/${1}/captions", # NOTE: Captions look like paginated content because they have 'total' and 'data' field # However they are, and that's good, not paginated. key="data" ) def video_captions_proxy(self, domain, caption_id, ttl=None): # URL is hardcoded to prevent further proxying. URL may change with updates, see captions API # eg. https://kolektiva.media/api/v1/videos/9c9de5e8-0a1e-484a-b099-e80766180a6d/captions # TODO: What if the captionPath doesn't follow this format on an instance? Should we really proxy ANYTHING returned by API? return self.request( (domain, caption_id), "https://${0}/lazy-static/video-captions/${1}", backend=self.plaintext_request ) # NOTE: Returns a PaginatedResults upon success def video_comments(self, domain, id, start=0, count=10, ttl=None): return self.request( (domain, id, str(start), str(count)), "https://${0}/api/v1/videos/${1}/comment-threads?start=${2}&count=${3}", extractor=paginator ) def account(self, domain, name, ttl=None): return self.request( (domain, name), "https://${0}/api/v1/accounts/${1}" ) # NOTE: Returns a PaginatedResults upon success def account_channels(self, domain, name, start=0, count=10, ttl=None): return self.request( (str(start), str(count), domain, name), "https://${2}/api/v1/accounts/${3}/video-channels?start=${0}&count=${1}", extractor=paginator ) # NOTE: Returns a PaginatedResults upon success def account_videos(self, domain, name, start=0, count=10, ttl=None): return self.request( (str(start), str(count), domain, name), "https://${2}/api/v1/accounts/${3}/videos?start=${0}&count=${1}", extractor=paginator ) # Fetch information about multiple accounts, returned as a MultipleResults # NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple. def accounts(self, accounts, ttl=None): results = MultipleResults() for account in accounts: results.insert(self.account(account.domain, account.name)) return results # Fetch latest videos from multiple accounts, returned as MultipleResults # NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple. def accounts_videos(self, accounts, limit=None, sort=True, ttl=None): api_limit = 10 if limit == None else limit results = MultipleResults() for account in accounts: results.insert_paginated(self.account_videos(account.domain, account.name, count=api_limit)) if limit or sort: # We also sort when limit is set, because otherwise limit will discard useful information results.successes.sort(key = lambda vid: dateutil.isoparse(vid["createdAt"]), reverse=True) if limit: results.successes = results.successes[0:limit] return results def channel(self, domain, name, ttl=None): return self.request( (domain, name), "https://${0}/api/v1/video-channels/${1}" ) # NOTE: Returns a PaginatedResults upon success def channel_videos(self, domain, name, start=0, count=10, ttl=None): return self.request( (str(start), str(count), domain, name), "https://${2}/api/v1/video-channels/${3}/videos?start=${0}&count=${1}", extractor=paginator ) # NOTE: Returns a PaginatedResults upon success def channel_playlists(self, domain, name, start=0, count=10, ttl=None): return self.request( (str(start), str(count), domain, name), "https://${2}/api/v1/video-channels/${3}/video-playlists?start=${0}&count=${1}", extractor=paginator ) # List of detailed info about local channel subscriptions # Fetch information about multiple channels, returned as a MultipleResults # NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple. def channels(self, channels, ttl=None): results = MultipleResults() for channel in channels: results.insert(self.channel(channel.domain, channel.name)) return results # Fetch latest videos from multiple channels, returned as MultipleResults # NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple. def channels_videos(self, channels, limit=None, sort=True, ttl=None): api_limit = 10 if limit == None else limit results = MultipleResults() for channel in channels: results.insert_paginated(self.channel_videos(channel.domain, channel.name, count=api_limit)) if limit or sort: # We also sort when limit is set, because otherwise limit will discard useful information results.successes.sort(key = lambda vid: dateutil.isoparse(vid["createdAt"]), reverse=True) if limit: results.successes = results.successes[0:limit] return results # Extra information about video, not contained directly in API result # a is video result class VideoInfo: def __init__(self, api, a, args): # If the video is being built from a failed request, return that request instead if isinstance(a, FailedRequest): print("[ERROR] A video request failed, yet you called VideoInfo about it. You should probably not make useless requests.") return FailedRequest("A previous video request failed, not attempting to fetch extra information about it.") quality = args.get("quality") self.resolutions = [] self.video = None self.files = a["files"] if len(self.files) == 0: self.files = ((a["streamingPlaylists"])[0])["files"] self.default_res = None for entry in self.files: resolution = (entry["resolution"])["id"] self.resolutions.append(entry["resolution"]) # chose the default quality if resolution != 0 and quality == None: if self.default_res == None: self.default_res = resolution self.video = entry["fileUrl"] elif abs(720 - resolution) < abs(720 - self.default_res): self.default_res = resolution self.video = entry["fileUrl"] if str(resolution) == str(quality): self.video = entry["fileUrl"] if quality == None: self.quality = self.default_res else: self.quality = quality self.no_quality_selected = not self.video # --- IDENTIFIERS HELPERS --- # This class Account can be either an actual user account or a channel # TODO: Find another name to express that... Maybe Subscription? Maybe Identifier? class Account: def __init__(self, name, domain): self.name = name self.domain = domain # Builds an Account from one of the following syntaxes, additionally stripping extra whitespace and ignoring `#` as comments: # - id@server # - @id@server # - http(s)://server/c/id # - http(s)://server/a/id # returns an Account instance def parse(identifier): identifier = identifier.split('#')[0].strip() # Comment line is returned as empty string if identifier == '': return None requested_identifier = identifier if identifier.startswith('http'): identifier = identifier[4:] # HTTPS? if identifier.startswith('s'): identifier = identifier[1:] # Remove :// identifier = identifier[3:] parts = identifier.split('/') if len(parts) < 3: print("[ERROR] Misformed URL for identifier, not enough components: %s" % requested_identifier) return None if parts[1] == 'a' or parts[1] == 'c': # Account or channel found, take the next part return Account(parts[2], parts[0]) print("[ERROR] Identifier not understood: %s" % requested_identifier) return None # Not an HTTP URL, we assume user@server or @user@server address return unsafe_account_parser(identifier) # Takes an account in the [@]user@server form and makes sure it makes sense # Returns an Account in that case, None otherwise # This function does not further sanitize the string (eg. strip whitespace or remove comments), please use # Account.parse() for that, which also supports more account formats def unsafe_account_parser(identifier): requested_identifier = identifier # Remove first @, if any if identifier.startswith('@'): identifier = identifier[1:] parts = identifier.split('@') if len(parts) != 2: print("[ERROR] Wrong identifier, because we are expecting a single '@': %s" % requested_identifier) return None if len(parts[0]) == 0 or len(parts[1]) == 0: print("[ERROR] Wrong identifier, because one part is empty before/after '@': %s" % requested_identifier) return None return Account(parts[0], parts[1]) # --- END IDENTIFIERS HELPERS ---