simpleertube/utils.py

187 lines
7.7 KiB
Python

from quart import render_template
from datetime import datetime
from dateutil import parser as dateutil
#import peertube
from peertube import PaginatedResults, FailedRequest, MultipleResults, Account
import html2text
import sys
# --- TEMPLATING HELPERS ---
# Feed me a template name, a peertube.Api.method() result,
# and a lamba to build the template context from result, on success
# If passed result is in fact an iterable (list/tuple) of results,
# they will all be checked for success
# NOTE: Non-critical failures should be passed as failures context variable
async def render(template, result, context):
if hasattr(result, '__iter__'):
failures = []
for res in result:
if isinstance(res, FailedRequest):
failures.append(res.format_html())
if failures:
# One or more failures occurred
return await render_template(
"error.html",
error_number = "500",
error_reasons = failures,
), 500
elif isinstance(result, FailedRequest):
return await render_template(
"error.html",
error_number = "500",
error_reasons = [result.format_html()],
), 500
context = context(result)
try:
return await render_template(
template,
**context,
)
except Exception as e:
return await render_error(FailedRequest(e), context)
# Feed me a single peertube.Api that failed (FailedRequest class), i will render an error
# Also you can add the original context as a dictionary
async def render_error(result, context = {}):
if not isinstance(result, FailedRequest):
print("[ERROR] Called render_error with a non-failed result:\n%s" % result)
return
return await render_template(
"error.html",
error_number = "500",
error_reasons = [result.format_html()],
**context
), 500
# Feed me an error code and a reason, i will render an error
async def render_custom_error(reason, code=500):
return await render_template(
"error.html",
error_number = str(code),
error_reasons = [reason],
), code
# Feed me a peertube.Api result and i will output plaintext, or render an error
async def render_plaintext(result):
if isinstance(result, FailedRequest):
return await render_template(
"error.html",
error_number = "500",
error_reasons = [result.format_html()],
), 500
return result
# --- END TEMPLATING UTILS ---
# --- COMMENT UTILS ---
h2t = html2text.HTML2Text()
h2t.ignore_links = True
# NOTE: Python makes it really hard to accept both boolean and stringy `enabled` values
# This function MUST be called with successful results, don't pass it anything that hasn't been fail-proofed
def comments(api, domain, id, enabled=True, start=0, count=10):
# Skip comments querying/formatting when API previously returned comments are disabled
if enabled != True and str(enabled).lower() != "true": return ""
comments = api.video_comments(domain, id, start=start, count=count)
if isinstance(comments, FailedRequest):
return comments
# Successful request, get comment count from PaginatedResults
if not isinstance(comments, PaginatedResults):
return FailedRequest("WTF. comments() util was called with something that is not PaginatedResults")
# TODO: do some cleanup
#total_comments = comments.total
#comments = comments.data
# Strip the HTML from the comments and convert them to plain text
new_comments = []
for comment in comments.data:
text = h2t.handle(comment["text"]).strip().strip("\n")
comment["text"] = text
new_comments.append(comment)
comments.data = new_comments
#print(total_comments)
#print(len(new_comments))
#return total_comments, new_comments
return comments
# --- END COMMENT UTILS ---
# --- SUBSCRIPTION UTILS ---
# Manage Subscriptions to remote channels and accounts (see INDEX ROUTE in main.py)
class Subscriptions:
def __init__(self, folder, api, cache, ttl=3600):
self.api = api
self.cache = cache
self.folder = folder
self.ttl = ttl
# Returns an entry from cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name
def cached(self, key, ttl=None):
if ttl == None: ttl = self.ttl
caller = sys._getframe(1).f_code.co_name
key_name = "subscriptions-" + caller + "-" + key
res = self.cache.get(key_name, ttl=ttl)
if res == None:
print("[CACHE] Entry not found for %s: %s" % (caller, key))
return None
print("[CACHE] Found entry for %s: %s" % (caller, key))
return res
# Save an entry into cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name
def save(self, key, value):
caller = sys._getframe(1).f_code.co_name
key_name = "subscriptions-" + caller + "-" + key
print("[CACHE] Saving entry for %s: %s" % (caller, key))
self.cache.set(key_name, value)
# Load subscriptions from a file called `kind`.list (60s cache)
def load(self, kind):
print("[INFO] Refreshing subscriptions %s from %s/%s.list" % (kind, self.folder, kind))
try:
with open(self.folder + '/' + kind + '.list', 'r') as f:
subscriptions = map(Account.parse, f.read().splitlines())
except Exception as e:
print("[INFO] No `%s.list` file to load for local subscriptions in %s, or the file isn't readable" % (kind, self.folder))
subscriptions = []
# Remove comment entries and empty lines
results = list(filter(lambda entry: isinstance(entry, Account), subscriptions))
print("[DEBUG] %s subscriptions: %s" % (kind, results))
return results
# Get the info about local subscriptions for accounts and channels
def info(self, ttl=None):
results = MultipleResults().merge_with(self.api.accounts(self.accounts(), ttl=ttl)) \
.merge_with(self.api.channels(self.channels(), ttl=ttl))
return results
# Get the latest `limit` videos from accounts and channels subscriptions combined. Returns a list of successes and failures
# NOTE: duplicates are not handled, why would you add both an account and the corresponding channel?
def videos(self, page=None, sort=True, lang=None, ttl=None):
results = MultipleResults().merge_with(self.api.accounts_videos(self.accounts(), ttl=ttl, sort=False)) \
.merge_with(self.api.channels_videos(self.channels(), ttl=ttl, sort=False))
if lang != None:
results.successes = list(filter(lambda vid: vid["language"]["id"] == lang or vid["language"]["id"] == None, results.successes))
if page or sort:
results.successes.sort(key = lambda vid: dateutil.isoparse(vid["createdAt"]), reverse=True)
if page: results.successes = results.successes[15*(page-1):15*page]
return results
# List of locally-subscribed accounts (accounts.list)
# Enforce 60s refresh by default
def accounts(self, ttl=60):
cached = self.cached("accounts", ttl=ttl)
if cached == None:
# Refresh local accounts subscriptions
res = self.load("accounts")
self.save("accounts", res)
return res
return cached
# List of locally-subscribed channels (channels.list)
# Enforce 60s refresh by default
def channels(self, ttl=60):
cached = self.cached("channels", ttl=ttl)
if cached == None:
# Refresh local channels subscriptions
res = self.load("channels")
self.save("channels", res)
return res
return cached