Compare commits

...

78 Commits

Author SHA1 Message Date
southerntofu e495305434 Proper journalctl-friendly logger 2021-09-06 19:03:45 +00:00
southerntofu a50a1fd0bf Fix account subscriptions debug print 2021-08-04 21:23:26 +02:00
southerntofu a09ed96eb6 Also display date on single video page 2021-08-04 21:17:35 +02:00
southerntofu aedc4352ee Human-friendly and semantic date for videos in lists 2021-08-04 21:12:33 +02:00
southerntofu fe0b83bd0e First attempt at multiple pages of homepage subscriptions 2021-08-04 20:54:23 +02:00
southerntofu 90c054e258 Homepage videos can be filtered by lang (eg. /fr) 2021-08-04 14:03:48 +02:00
southerntofu 41ca401cd5 Non-critical failures can be applied to all pages, not just index 2021-08-04 13:09:37 +02:00
southerntofu 1d42b5fef0 Homepage uses new Context API, and unifies failed requests into "failures" context key 2021-08-04 13:08:38 +02:00
southerntofu fc8872efeb Fix template import 2021-08-04 12:38:58 +02:00
southerntofu 16ef5eb24b Reuse videos macro in index.html (uniform display of video listings) 2021-08-04 12:38:31 +02:00
southerntofu 12a6592640 Reuse base.html template for index.html 2021-08-04 12:31:51 +02:00
southerntofu 64f2d15dab Reuse base.html in search.html 2021-08-04 12:26:41 +02:00
southerntofu c7cf3eddaa More consistent naming for accounts templates 2021-08-04 12:11:52 +02:00
southerntofu 00d74023dd More consistent template names for channels 2021-08-04 12:10:33 +02:00
southerntofu 9493f79dcb More consistent naming for search templates 2021-08-04 12:07:12 +02:00
southerntofu bf8d3b9275 Subscriptions use the new MultipleResults API 2021-08-03 20:10:27 +02:00
southerntofu 7c787a6502 Except index page, all pages use the same macro for displaying video listings 2021-08-02 22:26:35 +02:00
southerntofu 98e7ba8ae0 Errors triggered via utils.render() inherit original context 2021-08-02 22:24:51 +02:00
southerntofu aaa081eea9 Same key for search results whether global or per-instance 2021-08-02 20:06:37 +02:00
southerntofu bcb1018256 Finally fix pagination 2021-08-02 18:03:44 +02:00
southerntofu cb2569c0c1 VideoInfo is currently not merged with basic video response 2021-08-02 14:45:37 +02:00
southerntofu bb1780fefd Overall don't run useless requests 2021-08-02 00:30:24 +02:00
southerntofu 53703352d7 Introduce Context.insert_future_result() to prevent running requests when one failed previously 2021-08-01 23:53:59 +02:00
southerntofu 574f480a43 Channels routes use new Context API 2021-08-01 21:54:49 +02:00
southerntofu adb95d1c79 Account routes use new Context API, better pagination error handling 2021-08-01 21:29:30 +02:00
southerntofu 1973690e4a Video route uses new Context API 2021-08-01 20:51:47 +02:00
southerntofu 0d481ef6b7 Instance routes use new Context API 2021-08-01 19:51:00 +02:00
southerntofu f088cee3a2 Start new Context API, currently only for search 2021-08-01 19:45:38 +02:00
southerntofu 707f4d36eb Don't forget pagination URL on search page 2021-08-01 18:18:37 +02:00
southerntofu 19c9f0cada Avatar URL is always relative to queried instance, not remote instance 2021-08-01 18:10:08 +02:00
southerntofu e6dcab763a Woops i want to merge lists not frankensteinize them 2021-08-01 17:32:03 +02:00
southerntofu e15fc55c93 Everything now uses the new API.request() wrapper 2021-08-01 17:15:48 +02:00
southerntofu 48461ccc07 Woops fix wrong order of things 2021-08-01 16:45:29 +02:00
southerntofu c78b2e2c09 More avatar fallbacks 2021-08-01 16:43:02 +02:00
southerntofu 677326008a Follow instance links 2021-08-01 16:38:25 +02:00
southerntofu f65e4df27a Don't fail on missing avatar 2021-08-01 16:24:01 +02:00
southerntofu 71a3285c70 Allow newlines in error, still preventing HTML injection 2021-08-01 14:27:36 +02:00
southerntofu 0b837783ff No requests should hard-fail as long as remote Peertube server respects the API 2021-08-01 13:06:55 +02:00
southerntofu 84d01821c0 error.html can render several errors 2021-08-01 12:35:19 +02:00
southerntofu 77a2bd1431 Better error formatting before passing to templates 2021-08-01 12:13:31 +02:00
southerntofu a5095e707e Ignore subscriptions in repository 2021-08-01 11:54:45 +02:00
southerntofu c2034732d2 Fix homepage subscriptions, handle multiple failures gracefully 2021-08-01 11:51:22 +02:00
southerntofu a6ac579832 Everything uses new API, only homepage is broken 2021-08-01 02:12:24 +02:00
southerntofu 009ce82b5a Add deprecation notices, use new API for account info, unify "instance" as instance name in templates 2021-08-01 00:04:18 +02:00
southerntofu 0523e16b02 Accounts use the new API 2021-07-31 23:37:43 +02:00
southerntofu 9561c49e24 Captions use new API 2021-07-31 22:34:47 +02:00
southerntofu 1f3ba92dba Video and comments use new API 2021-07-31 22:24:12 +02:00
southerntofu 0b87b922e2 Move build_channel_or_account_name to utils.py 2021-07-31 17:25:24 +02:00
southerntofu 96505b96cc Update instance routes to new API 2021-07-31 17:22:26 +02:00
southerntofu 77226ac8f2 Reimplement search features with new API, move helpers to utils.py 2021-07-31 16:05:11 +02:00
southerntofu 9e295430df Introduce an error-proof Peertube API, use it for search 2021-07-31 15:35:48 +02:00
southerntofu 59ad38651d First draft for a better API, fails due to lack of context in templates?! 2021-07-31 15:09:21 +02:00
metalune 0100f9bff6 fix clickable links on instance homepage 2021-07-31 12:46:57 +02:00
metalune f4bdebb9f5 Make video thumbnails clickable on instance homepage 2021-07-31 12:45:04 +02:00
metalune a624de1ec3 Fix check for faulty channel identifier not working 2021-07-31 12:12:35 +02:00
metalune c56dbd0f38 Fix check for faulty account identifier not working 2021-07-31 12:11:22 +02:00
metalune 726faaa28d Also add statuscode 200 check for latest acocunt videos 2021-07-31 12:08:57 +02:00
metalune 7c3d3531e8 Add check for statuscode 200 when fetching frontpage videos 2021-07-31 12:05:28 +02:00
southerntofu 287b2cdbc4 A filter is not a list, but silently casts to an empty list? WTF python? 2021-07-31 11:31:58 +02:00
southerntofu 57dad72d9b Allow comments and multiple account/channel formats in subscriptions 2021-07-31 11:31:50 +02:00
southerntofu 40c1613582 Also load videos on homepage from subscribed channels.list 2021-07-31 11:31:37 +02:00
southerntofu 2b837c887d Cache accepts arbitrary (lambda) condition for refreshing 2021-07-31 11:31:13 +02:00
southerntofu 2eabe5c1eb Display latest videos from account subs in accounts.list on homepage, if any 2021-07-31 11:31:08 +02:00
southerntofu 0763e2a0bf Serve /opensearch.xml to add SimpleerTube as search engine to your webbrowser 2021-07-31 11:23:49 +02:00
southerntofu 404b6e73f6 Also serve /favicon.ico directly as 404 to avoid pointless requests 2021-07-31 11:23:32 +02:00
metalune 3040a8f320 Add link to Todo-handler 2021-07-30 21:23:16 +02:00
metalune 6b03bf194b Add Instance Home and Home button 2021-07-30 20:04:32 +02:00
southerntofu a04a418d32 Channel links go to /video-channels/ not /accounts/ 2021-07-30 19:56:35 +02:00
metalune a01b5547ee Stop printing captions into log 2021-07-30 19:27:53 +02:00
southerntofu 90a7a03b34 Fix subtitles proxying, every URL is unique and beautiful 2021-07-30 19:26:35 +02:00
southerntofu 0b47bda1aa /search supports GET forms, as required by OpenSearch 2021-07-30 19:20:27 +02:00
southerntofu ee6b30b0c0 Propose subtitles in the native video player 2021-07-30 19:18:57 +02:00
southerntofu 8e473b7948 Don't commit vim temporary files 2021-07-30 19:16:52 +02:00
southerntofu d51a5089f5 Don't traceback when favicon.ico is requested, display a nice error instead 2021-07-30 19:16:08 +02:00
southerntofu 03ddeb2608 Support custom interface/port in main.py 2021-07-30 19:15:23 +02:00
southerntofu ba55cfd472 Update the README 2021-07-30 19:09:39 +02:00
metalune 3946bc53c8 Re-add dependency for BeautifulSoup because of future patches 2021-07-28 21:22:40 +02:00
metalune b9fa8de39c Remove unnecessary import for BeautifulSoup 2021-07-28 20:10:18 +02:00
32 changed files with 1292 additions and 744 deletions

2
.gitignore vendored
View File

@ -1 +1,3 @@
__pycache__/
.*.sw*
*.list

View File

@ -1,4 +1,4 @@
### SimpleerTube
# SimpleerTube
Active Known Instances:
- https://simpleertube.metalune.xyz
@ -11,3 +11,34 @@ If you want to visit any page from your PeerTube instance of choice in SimpleerT
So, `https://videos.lukesmith.xyz/accounts/luke` becomes `https://simpleertube.metalune.xyz/videos.lukesmith.xyz/accounts/luke`.
If you visit the main page, you can search globally (it uses [Sepia Search](https://sepiasearch.org) in the backend).
## Setup
You need to setup a few dependencies first, usually using pip (`sudo apt install python3-pip` on Debian):
```
$ sudo pip3 install quart bs4 html2text lxml
```
**Note:** If there are other dependencies that are not packaged with your system, please report them to us so they can be added to this README.
Now you can run a development environment like so:
```
$ python3 main.py # Starts on localhost:5000
$ python3 main.py 192.168.42.2 # Starts on 192.168.42.2:5000
$ python3 main.py 7171 # Starts on localhost:7171
$ python3 main.py 192.168.42.2 7171 # Starts on 192.168.42.2:7171
$ python3 main.py ::1 7171 # Also works with IPv6 addresses
```
It is strongly disrecommended to run the production using this command. Instead, please refer to the [Quart deployment docs](https://pgjones.gitlab.io/quart/tutorials/deployment.html).
## TODO-Tracker
We have our TODO-Tracker hosted on todo.sr.ht: [SimpleerTube](https://todo.sr.ht/~metalune/SimpleerTube)
## License
This software is distributed under the AGPLv3 license. You can find a copy in the [LICENSE](LICENSE) file.

711
main.py
View File

@ -1,400 +1,419 @@
from quart import Quart, request, render_template, redirect
from datetime import datetime
from math import ceil
import peertube
import html2text
#from peertube import Cache, PaginatedResults, FailedRequest, API
from peertube import *
import sys
import logging
h2t = html2text.HTML2Text()
h2t.ignore_links = True
# Wrapper, only containing information that's important for us, and in some cases provides simplified ways to get information
class VideoWrapper:
def __init__(self, a, quality):
self.name = a["name"]
self.channel = a["channel"]
self.description = a["description"]
self.thumbnailPath = a["thumbnailPath"]
self.category = a["category"]
self.licence = a["licence"]
self.language = a["language"]
self.privacy = a["privacy"]
self.tags = a["tags"]
self.views = a["views"]
self.likes = a["likes"]
self.dislikes = a["dislikes"]
self.embedPath = a["embedPath"]
self.commentsEnabled = a["commentsEnabled"]
self.resolutions = []
self.video = None
self.files = a["files"]
if len(self.files) == 0:
self.files = ((a["streamingPlaylists"])[0])["files"]
self.default_res = None
for entry in self.files:
resolution = (entry["resolution"])["id"]
self.resolutions.append(entry["resolution"])
# chose the default quality
if resolution != 0 and quality == None:
if self.default_res == None:
self.default_res = resolution
self.video = entry["fileUrl"]
elif abs(720 - resolution) < abs(720 - self.default_res):
self.default_res = resolution
self.video = entry["fileUrl"]
if str(resolution) == str(quality):
self.video = entry["fileUrl"]
if quality == None:
self.quality = self.default_res
else:
self.quality = quality
self.no_quality_selected = not self.video
# Helper Class for using caches
class Cache:
def __init__(self):
self.dict = {}
def get(self, arg, func):
if arg in self.dict:
last_time_updated = (self.dict[arg])[1]
time_diff = datetime.now() - last_time_updated
if time_diff.days > 0:
self.dict[arg] = [
func(arg),
datetime.now()
]
else:
self.dict[arg] = [
func(arg),
datetime.now()
]
return (self.dict[arg])[0]
cached_instance_names = Cache()
cached_account_infos = Cache()
cached_video_channel_infos = Cache()
# cache the instance names so we don't have to send a request to the domain every time someone
# loads any site
def get_instance_name(domain):
return cached_instance_names.get(domain, peertube.get_instance_name)
# simple wrapper that is used inside the cached_account_infos
def get_account(info):
info = info.split("@")
return peertube.account(info[1], info[0])
def get_account_info(name):
return cached_account_infos.get(name, get_account)
# simple wrapper that is used inside the cached_video_channel_infos
def get_video_channel(info):
info = info.split("@")
return peertube.video_channel(info[1], info[0])
def get_video_channel_info(name):
return cached_video_channel_infos.get(name, get_video_channel)
# Import everything from utils.py, to keep main.py focused
# on CLI arguments parsing, and web routes declarations
from utils import *
cache = Cache()
api = API(cache)
from logging.config import dictConfig
dictConfig({
'version': 1,
'loggers': {
'quart.app': {
'level': 'DEBUG',
},
},
})
# --- WEBSERVER STUFF ---
# All utilities are above this point.
# Routes are defined below
app = Quart(__name__)
@app.route("/")
async def main():
return await render_template(
"index.html",
)
from werkzeug.routing import BaseConverter
class LangConverter(BaseConverter):
regex = r"[a-zA-Z]{2}"
app.url_map.converters['lang'] = LangConverter
# Simple class to designate a previous result, to be consumed in
# Context.insert_future_result()
class PreviousResult():
def __init__(self, key):
self.context_key = key
def key(self):
return self.context_key
# Class to build rendering context from, can be chained like
# context = Context().paginate(URL, start=0, count=10)
# .instance(domain)
# TODO: Since some context parsing will in fact produce fallible requests, how to error?
# Do we store self.failures(default None)? Then render can check for failures and render error.html instead
class Context():
def __init__(self, api):
self.context = {}
self.failures = []
self.api = api
# Helper function that's called by the lambda for passing context
def build(self, _ignored_old_api):
return self.context
# Helper function that's called during render for passing errors
def failed(self):
return self.failures
# Helper function to insert/log an error
def error(self, reason):
app.logger.error(reason)
self.failures.append(FailedRequest(reason))
return self
# Helper function to insert a successful result into context, or register failure otherwise
def insert_result(self, key, result):
if isinstance(result, FailedRequest):
self.failures.append(result)
else:
self.context[key] = result
return self
# Helper function to insert a raw value into context, even if it is a FailedRequest
def insert(self, key, value):
self.context[key] = value
return self
# Helper function to insert a successful result into context, from a function and tuple of arguments.
# If there were previous failures, no further request will be made. If you'd like to reuse a previous
# successful result, pass an PreviousResult("key") as argument, where key is where your previous result
# has been stored in the current context.
# Additionally, two more advanced features are supported:
# - if key is actually a tuple of strings, and the API responded with a tuple, the multiple values returned
# by the API will be inserted into those keys in context (no longer used at the moment, previously for pagination)
# - if the future result is in fact a PaginatedResults, automatically extract "data" from the entry, to insert into
# `key`, and extract "total" from the entry to insert into pagination["total"]
def insert_future_result(self, key, func, args):
#print("[DEBUG] Considering future result %s" % repr(key))
if len(self.failures) > 0:
# Previous failures, don't run further requests
app.logger.debug("Previous failures. Skipping future result %s" % repr(key))
return self
# Don't collapse (foo) into foo, keep it tupled
if not isinstance(args, tuple): args = (args, )
#print("[DEBUG] Arguments: %s" % args)
# If an argument is a previous result, extract it
args = tuple(map(lambda arg: self.unsafe_previous_result(arg), args))
#print("[DEBUG] Calling with arguments:\n")
#for arg in args: print("[DEBUG] - %s" % arg)
result = func(*args)
# Let's check key, if it's a tuple, we expect a tuple from running the API request,
# and they should be the same length
if isinstance(key, tuple):
# Here ("foo", "bar") tuple of context keys was passed as target, so we try to match
if not isinstance(result, tuple):
return self.error("PROGRAMMING ERROR: Requested insertion into several context keys, but API responded single response.")
if len(result) != len(key):
return self.error("PROGRAMMING ERROR: Requested insertion into %s context keys, but API returned %s arguments." % (str(len(result)), str(len(key))))
# Insert API results into every requested context key
for i in range(0, len(key)):
self.insert_result(key[i], result[i])
return self
# Simple key to insert_result result into, whether it succeeded or failed
# Special-case: if result is a PaginatedResults, then we insert the total results count into pagination["total"]
if isinstance(result, PaginatedResults):
app.logger.debug("Inserting pagination from API request!")
self.insert("pagination", { "total": result.total })
return self.insert_result(key, result.data)
# Very normal result
return self.insert_result(key, result)
# Extracts previous result from the current context
# Special-case, when an argument is a tuple of (PreviousResult, key, subkey...) we extract
# the key[subkey...] from the context's previous result
# NOTE: NEVER CALL THIS METHOD IF THERE WERE PREVIOUS FAILURES OR YOU'RE NOT SURE A KEY EXISTS
def unsafe_previous_result(self, arg):
if isinstance(arg, PreviousResult):
#print("[DEBUG] Found simple PreviousResult(%s)" % arg.key())
return self.context[arg.key()]
elif isinstance(arg, tuple):
if not isinstance(arg[0], PreviousResult):
#print("[DEBUG] Found normal tuple argument")
return arg # Normal tuple argument
data = self.context[arg[0].key()]
for entry in range(1, len(arg)):
# We take each part of the tuple as a "route" inside the previous result
data = data[arg[entry]]
app.logger.debug("Found previous entry %s at given route: %s" % (arg[0].key(), data))
return data
else:
#print("[DEBUG] Found normal argument")
return arg # Normal argument
# Build pagination context. Since total page count is fetched from
def paginate(self, url, page, total_pages):
self.context["pagination"]["pages"] = total_pages
self.context["pagination"]["url"] = url
self.context["pagination"]["current"] = page
return self
# Build pagination context from API results, that were previously inserted
# with self.insert_future_result(), so you don't have to store the API result in the route
# declaration, if you don't need to process it further
# DON'T CALL ME FROM INDEX ROUTE, SEE NOTE BELOW
def paginate_results(self, results_key, url, page):
if len(self.failures) > 0:
# There were previous errors, so we don't paginate, and skip silently
return self
if results_key not in self.context:
return self.error("PROGRAMMING ERROR: Attempting to paginate %s results, which are not found in current context despite no previous errors happening. Did you maybe call paginate_results before insert_future_result? Or maybe the results key %s is misspelled." % (results_key, results_key))
# TODO: Currently there is no handling of list of results, such as on the subscription page
# because the only page using it is index, and index currently doesn't support pagination
# because the merging algorithm is not smart (yet)
results = self.context[results_key]
if isinstance(results, FailedRequest):
# Request failed so we don't build the pagination
# Also this branch should never be reached because self.insert_result() should prevent it
return self.error("PROGRAMMING ERROR: Tried to paginate a failed result. You should insert results with context.insert_future_result to prevent this mistake.")
if "pagination" not in self.context or "total" not in self.context["pagination"]:
return self.error("PROGRAMMING ERROR: pagination.total count not found in current context. Did you really insert_future_result of an API request that produces paginated results?")
total_pages = ceil(self.context["pagination"]["total"] / 10)
app.logger.debug("Found %s pages when paginating %s" % (str(total_pages), results_key))
return self.paginate(url, page, total_pages)
# Build instance context (eg. domain and instance_name)
def instance(self, domain):
self.context["domain"] = domain
self.insert_future_result("instance", api.instance_name, (domain))
return self
# Build video context (captions, quality, embed, comments) from domain, id and request headers
def video(self, domain, id, page=1, args={}):
self.insert_future_result("video", self.api.video, (domain, id)) \
.insert_future_result("video_info", VideoInfo, (api, PreviousResult("video"), args)) \
.insert_future_result("captions", api.video_captions, (domain, id)) \
.insert_future_result("comments", comments, (api, domain, id, (PreviousResult("video"), "commentsEnabled"), (page - 1) * 10, 10)) \
.paginate_results("comments", "/" + domain + "/videos/watch/" + id + "/", page) \
.insert("quality", args.get("quality")) \
.insert("embed", args.get("embed"))
return self
# Build context for account/channel subscriptions. Failed requests are not considered critical, and are simply
# appended to the "failures" context entry, otherwise a single request would crash the whole homepage.
def subscriptions(self, page=1, lang=None):
# Let's follow subscriptions from the current working dir
# TODO: make it configurable, maybe by "playlist" defined in config?
subscriptions = Subscriptions('.', self.api, cache)
info = subscriptions.info()
# Lang-filtering is done deeper in the API, so if a channel has 3 differents videos per publication for different languages,
# we can still get the expected number of recent videos from there instead of only considering the eg. 5 latest videos in said lang
# TODO: Is it working well? Should write some tests
videos = subscriptions.videos(page=page, lang=lang)
failures = info.failures
failures.extend(videos.failures)
filtered_videos = videos.successes
self.insert("subscriptions", info.successes)
self.insert("videos", filtered_videos)
# Sanitize errors for HTML so we can have newlines in errors but not risk content injection
self.insert("failures", format_errors_html(failures))
return self
# --- INDEX ROUTE ---
@app.route("/", defaults = {"page": 1})
@app.route("/<int:page>")
async def main(page):
# TODO: Pagination
context = Context(api).subscriptions(page=page)
# Inside subscriptions variable in templates, you may find either an account info structure, or a channel info structure. Channels may be recognized due to `ownerAccount` property.
# Failed requests do not fail the index.html rendering, instead they are stored in "failures" context key
return await render("index.html", context.failed(), context.build)
@app.route("/<lang:lang>", defaults = {"page": 1})
@app.route("/<lang:lang>/<int:page>")
async def main_lang(lang, page):
# TODO: Pagination
context = Context(api).subscriptions(page=page, lang=lang)
return await render("index.html", context.failed(), context.build)
# --- END INDEX ROUTE ---
# --- SEARCH ROUTES ---
@app.route("/search", methods = ["POST"])
async def simpleer_search_redirect():
async def search_post_redirect():
query = (await request.form)["query"]
return redirect("/search/" + query)
@app.route("/search/<string:query>", defaults = {"page": 1})
@app.route("/search/<string:query>/<int:page>")
async def simpleer_search(query, page):
results = peertube.sepia_search(query, (page - 1) * 10)
return await render_template(
"simpleer_search_results.html",
results = results,
query = query,
# details for pagination
page=page,
pages_total=ceil(results["total"] / 10),
)
@app.route("/<string:domain>/")
async def instance(domain):
return redirect("/" + domain + "/videos/trending")
@app.route("/<string:domain>/videos/local", defaults = {"page": 1})
@app.route("/<string:domain>/videos/local/<int:page>")
async def instance_videos_local(domain, page):
vids = peertube.get_videos_local(domain, (page - 1) * 10)
return await render_template(
"instance/local.html",
domain=domain,
instance_name=get_instance_name(domain),
videos = vids,
# details for pagination
page=page,
pagination_url="/" + domain + "/videos/local/",
pages_total=ceil(vids["total"] / 10),
)
@app.route("/<string:domain>/videos/trending", defaults = {"page": 1})
@app.route("/<string:domain>/videos/trending/<int:page>")
async def instance_videos_trending(domain, page):
vids = peertube.get_videos_trending(domain, (page - 1) * 10)
return await render_template(
"instance/trending.html",
domain=domain,
instance_name=get_instance_name(domain),
videos = vids,
# details for pagination
page=page,
pagination_url="/" + domain + "/videos/trending/",
pages_total=ceil(vids["total"] / 10),
)
@app.route("/<string:domain>/videos/most-liked", defaults = {"page": 1})
@app.route("/<string:domain>/videos/most-liked/<int:page>")
async def instance_videos_most_liked(domain, page):
vids = peertube.get_videos_most_liked(domain, (page - 1) * 10)
return await render_template(
"instance/most-liked.html",
domain=domain,
instance_name=get_instance_name(domain),
videos = vids,
# details for pagination
page=page,
pagination_url="/" + domain + "/videos/most-liked/",
pages_total=ceil(vids["total"] / 10),
)
@app.route("/<string:domain>/videos/recently-added", defaults = {"page": 1})
@app.route("/<string:domain>/videos/recently-added/<int:page>")
async def instance_videos_recently_added(domain, page):
vids = peertube.get_videos_recently_added(domain, (page - 1) * 10)
return await render_template(
"instance/recently-added.html",
domain=domain,
instance_name=get_instance_name(domain),
videos = vids,
# details for pagination
page=page,
pagination_url="/" + domain + "/videos/recently-added/",
pages_total=ceil(vids["total"] / 10),
)
@app.route("/search", methods = ["GET"])
async def search_get_redirect():
query = request.args.get("query")
return redirect("/search/" + query)
@app.route("/<string:domain>/search", methods=["POST"])
async def search_redirect(domain):
async def search_instance_redirect(domain):
query = (await request.form)["query"]
return redirect("/" + domain + "/search/" + query)
@app.route("/search/<string:query>", defaults = {"page": 1})
@app.route("/search/<string:query>/<int:page>")
async def search(query, page):
context = Context(api).insert_future_result("results", api.search, (query, (page - 1) * 10, 10)) \
.paginate_results("results", "/search/" + query + "/", page) \
.insert("query", query)
return await render("search.html", context.failed(), context.build)
@app.route("/<string:domain>/search/<string:term>", defaults = {"page": 1})
@app.route("/<string:domain>/search/<string:term>/<int:page>")
async def search(domain, term, page):
results = peertube.search(domain, term, (page - 1) * 10)
return await render_template(
"search_results.html",
domain=domain,
instance_name=get_instance_name(domain),
@app.route("/<string:domain>/search/<string:query>", defaults = {"page": 1})
@app.route("/<string:domain>/search/<string:query>/<int:page>")
async def search_instance(domain, query, page):
context = Context(api).instance(domain) \
.insert_future_result("results", api.search_instance, (domain, query, (page - 1) * 10, 10)) \
.paginate_results("results", "/" + domain + "/search/" + query + "/", page) \
.insert("query", query)
return await render("instance/search.html", context.failed(), context.build)
# --- END SEARCH ROUTES ---
results=results,
search_term=term,
# --- INSTANCE ROUTES ---
@app.route("/<string:domain>/")
async def instance(domain):
# favicon.ico is not a domain name
if domain == "favicon.ico": return await favicon()
return redirect("/" + domain + "/videos/trending")
# details for pagination
page=page,
pagination_url="/" + domain + "/search/" + term + "/",
pages_total=(results["total"] / 10)
)
@app.route("/<string:domain>/videos/<string:category>", defaults = {"page": 1})
@app.route("/<string:domain>/videos/<string:category>/<int:page>")
async def instance_videos(domain, category, page):
if category not in [ "most-liked", "local", "trending", "recently-added" ]:
return await render_custom_error("No such video category: %s" % category, 404)
@app.route("/<string:domain>/videos/watch/<string:id>/")
async def video(domain, id):
data = peertube.video(domain, id)
quality = request.args.get("quality")
embed = request.args.get("embed")
vid = VideoWrapper(data, quality)
quality = int(vid.quality)
context = Context(api).instance(domain) \
.insert_future_result("videos", api.instance_videos, (domain, (page - 1) * 10, 10, category)) \
.paginate_results("videos", "/" + domain + "/videos/" + category + '/', page)
return await render("instance/%s.html" % category, context.failed(), context.build)
# --- END INSTANCE ROUTES ---
# only make a request for the comments if commentsEnabled
comments = ""
if data["commentsEnabled"]:
comments = peertube.get_comments(domain, id)
# --- VIDEO ROUTE ---
@app.route("/<string:domain>/videos/watch/<string:id>/", defaults = { "page": 1 })
@app.route("/<string:domain>/videos/watch/<string:id>/<int:page>")
async def video(domain, id, page):
context = Context(api).instance(domain) \
.video(domain, id, page, request.args)
return await render("video.html", context.failed(), context.build)
# Strip the HTML from the comments and convert them to plain text
new_comments = {"total": comments["total"], "data": []}
for comment in comments["data"]:
text = h2t.handle(comment["text"]).strip().strip("\n")
comment["text"] = text
new_comments["data"].append(comment)
comments = new_comments
@app.route("/<string:domain>/videos/watch/<string:id>/<string:lang>.vtt")
async def captions(domain, id, lang):
captions = api.video_captions(domain, id)
if isinstance(captions, FailedRequest):
return await render_error(captions)
for entry in captions:
if entry["language"]["id"] == lang:
content = api.video_captions_proxy(domain, entry["captionPath"].split('/')[-1])
return await render_plaintext(content)
return await render_template(
"video.html",
domain=domain,
instance_name=get_instance_name(domain),
video=vid,
comments=comments,
quality=quality,
embed=embed,
)
def build_channel_or_account_name(domain, name):
if '@' in name:
return name
return name + "@" + domain
# --- Accounts ---
return await render_custom_error("This video has no subtitles/captions in requested language", code=404)
# --- END VIDEO ROUTE ---
# --- ACCOUNT ROUTES ---
@app.route("/<string:domain>/accounts/<string:name>")
async def accounts_redirect(domain, name):
return redirect("/" + domain + "/accounts/" + name + "/video-channels")
@app.route("/<string:domain>/accounts/<string:name>/video-channels", defaults = {"page": 1})
@app.route("/<string:domain>/accounts/<string:name>/video-channels/<int:page>")
async def account__video_channels(domain, name, page):
video_channels = peertube.account_video_channels(domain, name, (page - 1) * 10)
return await render_template(
"accounts/video_channels.html",
domain=domain,
instance_name=get_instance_name(domain),
name = name,
account = get_account_info(build_channel_or_account_name(domain, name)),
video_channels = video_channels,
# details for pagination
page=page,
pagination_url="/" + domain + "/accounts/" + name + "/video-channels/",
pages_total=ceil(video_channels["total"] / 10)
)
async def account_channels(domain, name, page):
context = Context(api).instance(domain) \
.insert_future_result("video_channels", api.account_channels, (domain, name, (page - 1) * 10, 10)) \
.paginate_results("video_channels", "/" + domain + "/accounts/" + name + "/video-channels/", page) \
.insert_future_result("account", api.account, (domain, name)) \
.insert("name", name)
return await render("account/channels.html", context.failed(), context.build)
@app.route("/<string:domain>/accounts/<string:name>/videos", defaults = {"page": 1})
@app.route("/<string:domain>/accounts/<string:name>/videos/<int:page>")
async def account__videos(domain, name, page):
vids = peertube.account_videos(domain, name, (page - 1) * 10)
return await render_template(
"accounts/videos.html",
domain=domain,
instance_name=get_instance_name(domain),
name = name,
account = get_account_info(build_channel_or_account_name(domain, name)),
videos = vids,
# details for pagination
page=page,
pagination_url="/" + domain + "/accounts/" + name + "/videos/",
pages_total=ceil(vids["total"] / 10)
)
async def account_videos(domain, name, page):
context = Context(api).instance(domain) \
.insert_future_result("videos", api.account_videos, (domain, name, (page - 1) * 10, 10)) \
.paginate_results("videos", "/" + domain + "/accounts/" + name + "/videos/", page) \
.insert_future_result("account", api.account, (domain, name)) \
.insert("name", name)
return await render("account/videos.html", context.failed(), context.build)
@app.route("/<string:domain>/accounts/<string:name>/about")
async def account__about(domain, name):
return await render_template(
"accounts/about.html",
domain=domain,
instance_name=get_instance_name(domain),
name = name,
account = get_account_info(build_channel_or_account_name(domain, name)),
about = peertube.account(domain, name)
)
# --- Video-Channels ---
async def account_about(domain, name):
context = Context(api).instance(domain) \
.insert_future_result("account", api.account, (domain, name)) \
.insert("name", name)
return await render("account/about.html", context.failed(), context.build)
# --- END ACCOUNT ROUTES ---
# --- CHANNEL ROUTES ---
@app.route("/<string:domain>/video-channels/<string:name>")
async def video_channels_redirect(domain, name):
return redirect("/" + domain + "/video-channels/" + name + "/videos")
@app.route("/<string:domain>/video-channels/<string:name>/videos", defaults = {"page": 1})
@app.route("/<string:domain>/video-channels/<string:name>/videos/<int:page>")
async def video_channels__videos(domain, name, page):
vids = peertube.video_channel_videos(domain, name, (page - 1) * 10)
return await render_template(
"video_channels/videos.html",
domain=domain,
instance_name=get_instance_name(domain),
name = name,
video_channel = get_video_channel_info(build_channel_or_account_name(domain, name)),
page=page,
pagination_url="/" + domain + "/video-channels/" + name + "/videos/",
pages_total=ceil(vids["total"] / 10),
videos = vids,
)
async def channel_videos(domain, name, page):
# TODO: Is it good to use ownerAccount.{host,name}?
context = Context(api).instance(domain) \
.insert_future_result("video_channel", api.channel, (domain, name)) \
.insert_future_result("videos", api.channel_videos, (domain, name, (page - 1) * 10, 10)) \
.paginate_results("videos", "/" + domain + "/video-channels/" + name + "/videos/", page) \
.insert_future_result("account", api.account, ((PreviousResult("video_channel"), "ownerAccount", "host"), (PreviousResult("video_channel"), "ownerAccount", "name"))) \
.insert("name", name)
return await render("channel/videos.html", context.failed(), context.build)
# TODO: Hope this route works, i don't know some account who has channel playlists to test with
@app.route("/<string:domain>/video-channels/<string:name>/video-playlists", defaults = {"page": 1})
@app.route("/<string:domain>/video-channels/<string:name>/video-playlists/<int:page>")
async def video_channels__video_playlists(domain, name, page):
video_playlists = peertube.video_channel_video_playlists(domain, name, (page - 1) * 10)
return await render_template(
"video_channels/video_playlists.html",
domain=domain,
instance_name=get_instance_name(domain),
name = name,
video_channel = get_video_channel_info(build_channel_or_account_name(domain, name)),
video_playlists = video_playlists,
page=page,
pagination_url="/" + domain + "/video-channels/" + name + "/video-playlists/",
pages_total=ceil(video_playlists["total"] / 10)
)
async def channel_playlists(domain, name, page):
channel = api.channel(domain, name)
if isinstance(channel, FailedRequest):
return await render_error(channel)
# TODO: Is it good to use ownerAccount.{host,name}?
context = Context(api).instance(domain) \
.insert_future_result("video_channel", api.channel, (domain, name)) \
.insert_future_result("video_playlists", api.channel_playlists, (domain, name, (page - 1) * 10, 10)) \
.paginate_results("video_playlists", "/" + domain + "/video-channels/" + name + "/video-playlists/", page) \
.insert_future_result("account", api.account, ((PreviousResult("video_channel"), "ownerAccount", "host"), (PreviousResult("video_channel"), "ownerAccount", "name"))) \
.insert("name", name)
return await render("channel/playlists.html", context.failed(), context.build)
@app.route("/<string:domain>/video-channels/<string:name>/about")
async def video_channels__about(domain, name):
return await render_template(
"video_channels/about.html",
domain=domain,
instance_name=get_instance_name(domain),
async def channel_about(domain, name):
context = Context(api).instance(domain) \
.insert_future_result("video_channel", api.channel, (domain, name)) \
.insert("name", name)
return await render("channel/about.html", context.failed(), context.build)
name = name,
video_channel = get_video_channel_info(build_channel_or_account_name(domain, name)),
about = peertube.video_channel(domain, name)
)
# --- DIVERSE ROUTES ---
@app.route("/favicon.ico")
async def favicon():
return await render_custom_error("We don't have a favicon yet. If you would like to contribute one, please send it to ~metalune/public-inbox@lists.sr.ht", code=404)
@app.route("/opensearch.xml")
async def opensearch():
try:
with open('opensearch.xml', 'r') as f:
return f.read().replace('$BASEURL', request.headers["Host"])
except Exception as e:
return await render_error(FailedRequest(e))
# --- END DIVERSE ROUTES ---
# --- END WEBSERVER STUFF ---
# --- CLI STUFF ---
if __name__ == "__main__":
app.run()
if len(sys.argv) == 3:
interface = sys.argv[1]
port = sys.argv[2]
elif len(sys.argv) == 2:
interface = "127.0.0.1"
port = sys.argv[1]
else:
interface = "127.0.0.1"
port = "5000"
app.run(host=interface, port=port)
# --- END CLI STUFF ---

12
opensearch.xml Normal file
View File

@ -0,0 +1,12 @@
<?xml version="1.0"?>
<OpenSearchDescription xmlns="http://a9.com/-/spec/opensearch/1.1/"
xmlns:moz="http://www.mozilla.org/2006/browser/search/">
<ShortName>SimpleerTube</ShortName>
<Description>Search Peertube with a lightweight, minimalist interface. Noscript-friendly, multi-instance Peertube client.</Description>
<InputEncoding>UTF-8</InputEncoding>
<Image width="16" height="16" type="image/png">https://$BASEURL/static/peertube.png</Image>
<Url type="text/html" template="https://$BASEURL/search">
<Param name="query" value="{searchTerms}"/>
</Url>
<moz:SearchForm>https://$BASEURL/</moz:SearchForm>
</OpenSearchDescription>

View File

@ -1,91 +1,506 @@
from bs4 import BeautifulSoup
import requests
import json
import sys
from datetime import datetime
from dateutil import parser as dateutil
# --- Sepiasearch ---
def sepia_search(query, start=0, count=10):
url = "https://search.joinpeertube.org/api/v1/search/videos?search=" + query + "&start=" + str(start) + "&count=" + str(count)
return json.loads(requests.get(url).text)
# --- REQUEST UTILS ---
# Semantic class to store remote errors
class FailedRequest:
def __init__(self, e):
self.exception = e
# --- ----
def format(self):
# If it's a rawtext error, print it
# Otherwise look for 'message' attribute
# Otherwise ask python to represent the exception
if isinstance(self.exception, str):
return self.exception
else: return getattr(self.exception, 'message', repr(self.exception))
# Strip < and > symbols to prevent content injection,
# and replace newlines with HTML line breaks <br>
def format_html(self):
return self.format().replace('<', "&lt;").replace('>', "&gt;").replace("\n", "<br>")
# Format a list of FailedRequest's
def format_errors(failures):
return list(map(lambda failure: failure.format(), failures))
def format_errors_html(failures):
return list(map(lambda failure: failure.format_html(), failures))
# Helper class to store paginated results
# self.data is a LIST of stuff, or a single error. When multiple errors can occur, see MultipleResults instead
# TODO: Maybe rename PaginatedResult (without plural) to indicate it's only one request
class PaginatedResults:
def __init__(self, total, data):
self.total = total
self.data = data
# Helper class to store multiple, potentially paginated, results
# Each of which can be failed or succeeded independently
class MultipleResults:
# Initialize me by calling MultipleResults().merge_with(map(lambda x: api.foo(x), entries))
def __init__(self):
self.successes = []
self.failures = []
# Helper function to insert/log an error
def error(self, reason):
print("[ERROR] %s" % reason)
self.failures.append(FailedRequest(reason))
return self
# Insert a single PaginatedResults instance into the current MultipleResults
def insert_paginated(self, result):
if isinstance(result, FailedRequest):
self.failures.append(result)
return self
if not isinstance(result, PaginatedResults):
return self.error("PROGRAMMING ERROR: MultipleResults.insert only takes a single FailedRequest or PaginatedResults")
# Now we have many items from that PaginatedResults to insert into successes
self.successes.extend(result.data)
return self
# Inserts a single successful result into the current MultipleResults
# TODO: Should make more typesafe by having a SuccessfulResult type
def insert(self, result):
if isinstance(result, FailedRequest):
self.failures.append(result)
return self
if isinstance(result, PaginatedResults):
return self.error("PROGRAMMING ERROR: MultipleRequests.insert only takes a single successful or FailedRequest. PaginatedResults should be inserted with MultipleRequests.insert_paginated")
self.successes.append(result)
# Merge successes and failures with another MultipleResults
def merge_with(self, results):
if not isinstance(results, MultipleResults):
return self.error("PROGRAMMING ERROR: MultipleResults.merge_with should only be called with another MultipleResults")
self.successes.extend(results.successes)
self.failures.extend(results.failures)
return self
# --- END REQUEST UTILS ---
def get_instance_name(domain):
soup = BeautifulSoup(requests.get("https://" + domain).text, "lxml")
# Helper class for using caches, you can use any other cache that implements the same API (get/set)
# Default TTL: 3600s (1h)
class Cache:
def __init__(self, ttl=3600):
self.dict = {}
self.ttl = ttl
# Use ttl=0 to disable
def get(self, key, ttl=None):
if ttl == None: ttl = self.ttl
if key in self.dict:
last_time_updated = (self.dict[key])[1]
time_diff = datetime.now() - last_time_updated
if time_diff.total_seconds() > ttl:
# Outdated data
return None
# Data still valid according to TTL
return (self.dict[key])[0]
else:
# No data
return None
def set(self, key, value):
self.dict[key] = [ value, datetime.now() ]
# Takes a successful plaintext response and parses it as HTML to extract the title
def html_title(content):
soup = BeautifulSoup(content, "lxml")
title = soup.find('title')
if title:
return title.text
else:
return "PeerTube Instance"
def video(domain, id):
url = "https://" + domain + "/api/v1/videos/" + id
return json.loads(requests.get(url).text)
# Takes a successfully-parsed JSON response and extracts data/total from it to build pagination
def paginator(response):
if "data" not in response or "total" not in response:
return FailedRequest("The API response provided to paginator appears not to be paginated")
# TODO: check that total is an integer and that it's greater than len(data)
#return response["total"], response["data"]
return PaginatedResults(response["total"], response["data"])
def search(domain, term, start=0, count=10):
url = "https://" + domain + "/api/v1/search/videos?start=" + str(start) + "&count=" + str(count) + "&search=" + term + "&sort=-match&searchTarget=local"
return json.loads(requests.get(url).text)
class API:
# The PeertubeAPI is initialized with a caching backend and a default TTL, that can be overriden in specific
# API request calls. The caching backend should implement a get(key, ttl) and set(key, value) API.
# Also can specify the SepiaSearch instance here
# NOTE: The whole API must be oriented for ordered, not named arguments, because they would be tricky to implement
# in Context.insert_future_result()
def __init__(self, cache, ttl=3600, search="https://search.joinpeertube.org"):
self.cache = cache
self.ttl = ttl
# If search instance has no protocol set, assume https://
if not search.startswith("http"): search = "https://" + search
# Remove trailing slash
if search.endswith('/'): self.search_source = search[0:-1]
else: self.search_source = search
# Instance search API expects a filter and a sorting criteria. Here we provide
# higher-level categories that match requested URIs: trending, most-liked, recently-added, local
self.instance_filters = {
"trending": (None, "-trending"),
"most-liked": (None, "-likes"),
"local": ("local", "-publishedAt"),
"recently-added": (None, "-publishedAt"),
}
def get_comments(domain, id):
url = "https://" + domain + "/api/v1/videos/" + id + "/comment-threads"
return json.loads(requests.get(url).text)
# Wrapper around requests.get() so that it cannot fail
# Usually, you want to call self.request(), which is an even higher-level wrapper
# If the request succeeds:
# - parse as JSON, or return None
# - if "error" field in JSON, return None
# - return the parsed JSON
# Else: return None
# In all cases where the function returns None, the errors are logged
# Use ttl=0 to disable Cache for this request
# Only use with JSON requests, otherwise use self.(plaintext_request)
def json_request(self, url):
print("[DEBUG] Requesting JSON URL %s" % url)
try:
# If serialization fails, we'll end up in `except` block
parsed_response = json.loads(requests.get(url).text)
if "error" in parsed_response:
print("[WARN] Remote peertube returned error for %s:\n%s" % (url, parsed_response["error"]))
return FailedRequest("Remote peertube server returned an error for URL %s:\n%s" % (url, parsed_response["error"]))
return parsed_response
except Exception as e:
print("[WARN] Error fetching page \"%s\":\n%s" % (url, e))
return FailedRequest(e)
def get_videos_trending(domain, start=0, count=10):
url = "https://" + domain + "/api/v1/videos?sort=-trending&start=" + str(start) + "&count=" + str(count)
return json.loads(requests.get(url).text)
# Wrapper around requests.get() so that it cannot fail
# Use with plaintext requests, for JSON requests use self.request()
def plaintext_request(self, url):
print("[DEBUG] Requesting plaintext URL %s" % url)
try:
return requests.get(url).text
except Exception as e:
print("[WARN] Error fetching page \"%s\":\n%s" % (url, e))
return FailedRequest(e)
def get_videos_most_liked(domain, start=0, count=10):
url = "https://" + domain + "/api/v1/videos?sort=-likes&start=" + str(start) + "&count=" + str(count)
return json.loads(requests.get(url).text)
# Useful wrapper method to reduce boilerplate
# args: parameters tuple to form the cache key
# url: string template to form URL from args tuple, where ${i} is arg[i]
# key: Key to extract from a successful response
# backend: the method to use for fetching URL (default: self.json_request), can be self.plaintext_request
# extractor: a lambda function to execute to extract stuff from a successful request, when key isn't set
def request(self, args, url, key=None, backend=None, extractor=None, ttl=None):
# WTF python? '/'.join(("foo)) => "f/o/o", not "foo"?! Special case when only one arg
if isinstance(args, str): args = (args, )
cache_key = '/'.join(args)
cached = self.cached(cache_key, ttl=ttl)
if cached == None:
# Defaults to making a JSON API request
if backend == None: backend = self.json_request
for i in range(0, len(args)):
url = url.replace("${%s}" % str(i), args[i])
res = backend(url)
if not isinstance(res, FailedRequest):
if key != None:
# Extract requested key from successful request
res = res[key]
elif extractor != None:
# Run extractor on result from successful request
res = extractor(res)
self.save(cache_key, res)
return res
return cached
def get_videos_recently_added(domain, start=0, count=10):
url = "https://" + domain + "/api/v1/videos?sort=-publishedAt&start=" + str(start) + "&count=" + str(count)
return json.loads(requests.get(url).text)
# Returns an entry from cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name
def cached(self, key, ttl=None):
if ttl == None: ttl = self.ttl
caller = sys._getframe(2).f_code.co_name
key_name = "peertube-" + caller + "-" + key
res = self.cache.get(key_name, ttl=ttl)
if res == None:
print("[CACHE] Entry not found for %s: %s" % (caller, key))
return None
print("[CACHE] Found entry for %s: %s" % (caller, key))
return res
def get_videos_local(domain, start=0, count=10):
url = "https://" + domain + "/api/v1/videos?sort=-publishedAt&filter=local&start=" + str(start) + "&count=" + str(count)
return json.loads(requests.get(url).text)
# Save an entry into cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name
def save(self, key, value):
# TODO: Maybe now that we call from two levels above (request_wrapper->request->save) we need to adapt the frame fetched
caller = sys._getframe(2).f_code.co_name
key_name = "peertube-" + caller + "-" + key
print("[CACHE] Saving entry for %s: %s" % (caller, key))
self.cache.set(key_name, value)
# --- Accounts ---
# Fetch instance name from its HTML source
def instance_name(self, domain, ttl=None):
return self.request(
(domain),
"https://${0}",
backend=self.plaintext_request,
extractor=html_title
)
def account_video_channels(domain, name, start=0, count=10):
url = "https://" + domain + "/api/v1/accounts/" + name + "/video-channels?start=" + str(start) + "&count=" + str(count)
return json.loads(requests.get(url).text)
# Search the configured self.search_source for `query`, returning `count` items after `start`
# NOTE: Returns a PaginatedResults upon success
def search(self, query, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), self.search_source, query),
# self.search_source already has protocol pre-pended
"${2}/api/v1/search/videos?start=${0}&count=${1}&search=${3}",
extractor=paginator
)
def account_videos(domain, name, start=0, count=10):
url = "https://" + domain + "/api/v1/accounts/" + name + "/videos?start=" + str(start) + "&count=" + str(count)
return json.loads(requests.get(url).text)
# Search a specific Peertube instance for `query`,
# returning `count` items after `start`. Slightly different URL format from SepiaSearch (self.search())
# NOTE: Returns a PaginatedResults upon success
def search_instance(self, domain, term, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), domain, term),
"https://${2}/api/v1/search/videos?start=${0}&count=${1}&search=${3}&sort=-match&searchTarget=local",
extractor=paginator
)
def account(domain, name):
url = "https://" + domain + "/api/v1/accounts/" + name
return json.loads(requests.get(url).text)
# Default category is local, other categories are: trending, most-liked, recently-added, local
# See self.instance_filters in self.__init()__ for sorting/filtering detail
# NOTE: Returns a PaginatedResults upon success
def instance_videos(self, domain, start=0, count=10, category="local", ttl=None):
if not category in self.instance_filters:
return FailedRequest("instance_videos called with bogus filter: %s" % category)
filt, sort = self.instance_filters[category]
url = "https://${2}/api/v1/videos?sort=" + sort + "&start=${0}&count=${1}"
if filt: url += "&filter=" + filt
return self.request(
(str(start), str(count), domain, category),
url,
extractor=paginator
)
# --- Video Channels ---
def video(self, domain, id, ttl=None):
return self.request(
(domain, id),
"https://${0}/api/v1/videos/${1}",
)
def video_channel_videos(domain, name, start=0, count=10):
url = "https://" + domain + "/api/v1/video-channels/" + name + "/videos?start=" + str(start) + "&count=" + str(count)
return json.loads(requests.get(url).text)
def video_captions(self, domain, id, ttl=None):
return self.request(
(domain, id),
"https://${0}/api/v1/videos/${1}/captions",
# NOTE: Captions look like paginated content because they have 'total' and 'data' field
# However they are, and that's good, not paginated.
key="data"
)
def video_captions_proxy(self, domain, caption_id, ttl=None):
# URL is hardcoded to prevent further proxying. URL may change with updates, see captions API
# eg. https://kolektiva.media/api/v1/videos/9c9de5e8-0a1e-484a-b099-e80766180a6d/captions
# TODO: What if the captionPath doesn't follow this format on an instance? Should we really proxy ANYTHING returned by API?
return self.request(
(domain, caption_id),
"https://${0}/lazy-static/video-captions/${1}",
backend=self.plaintext_request
)
def video_channel_video_playlists(domain, name, start=0, count=10):
url = "https://" + domain + "/api/v1/video-channels/" + name + "/video-playlists?start=" + str(start) + "&count=" + str(count)
return json.loads(requests.get(url).text)
# NOTE: Returns a PaginatedResults upon success
def video_comments(self, domain, id, start=0, count=10, ttl=None):
return self.request(
(domain, id, str(start), str(count)),
"https://${0}/api/v1/videos/${1}/comment-threads?start=${2}&count=${3}",
extractor=paginator
)
def video_channel(domain, name):
url = "https://" + domain + "/api/v1/video-channels/" + name
return json.loads(requests.get(url).text)
def account(self, domain, name, ttl=None):
return self.request(
(domain, name),
"https://${0}/api/v1/accounts/${1}"
)
if __name__ == "__main__":
#name = get_instance_name("videos.lukesmith.xyz")
#print(name)
# NOTE: Returns a PaginatedResults upon success
def account_channels(self, domain, name, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), domain, name),
"https://${2}/api/v1/accounts/${3}/video-channels?start=${0}&count=${1}",
extractor=paginator
)
#com = get_comments("videos.lukesmith.xyz", "d1bfb082-b203-43dc-9676-63d28fe65db5")
#print(json.dumps(com, indent=2))
# NOTE: Returns a PaginatedResults upon success
def account_videos(self, domain, name, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), domain, name),
"https://${2}/api/v1/accounts/${3}/videos?start=${0}&count=${1}",
extractor=paginator
)
#vid = video("diode.zone", "c4f0d71b-bd8b-4641-87b0-6d9edd4fa9ce")
#print(json.dumps(vid, indent=2))
# Fetch information about multiple accounts, returned as a MultipleResults
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
def accounts(self, accounts, ttl=None):
results = MultipleResults()
for account in accounts:
results.insert(self.account(account.domain, account.name))
return results
_, results = search("diode.zone", "test")
print(json.dumps(results, indent=2))
# Fetch latest videos from multiple accounts, returned as MultipleResults
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
def accounts_videos(self, accounts, limit=None, sort=True, ttl=None):
api_limit = 10 if limit == None else limit
results = MultipleResults()
for account in accounts:
results.insert_paginated(self.account_videos(account.domain, account.name, count=api_limit))
if limit or sort:
# We also sort when limit is set, because otherwise limit will discard useful information
results.successes.sort(key = lambda vid: dateutil.isoparse(vid["createdAt"]), reverse=True)
if limit: results.successes = results.successes[0:limit]
return results
#video_channels = account_video_channels("peer.tube", "mouse@peertube.dsmouse.net")
#print(json.dumps(video_channels, indent=2))
def channel(self, domain, name, ttl=None):
return self.request(
(domain, name),
"https://${0}/api/v1/video-channels/${1}"
)
# NOTE: Returns a PaginatedResults upon success
def channel_videos(self, domain, name, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), domain, name),
"https://${2}/api/v1/video-channels/${3}/videos?start=${0}&count=${1}",
extractor=paginator
)
# NOTE: Returns a PaginatedResults upon success
def channel_playlists(self, domain, name, start=0, count=10, ttl=None):
return self.request(
(str(start), str(count), domain, name),
"https://${2}/api/v1/video-channels/${3}/video-playlists?start=${0}&count=${1}",
extractor=paginator
)
# List of detailed info about local channel subscriptions
# Fetch information about multiple channels, returned as a MultipleResults
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
def channels(self, channels, ttl=None):
results = MultipleResults()
for channel in channels:
results.insert(self.channel(channel.domain, channel.name))
return results
# Fetch latest videos from multiple channels, returned as MultipleResults
# NOTE: This new API method enforces usage of Account class as channel. DO NOT USE WITH (account, domain) tuple.
def channels_videos(self, channels, limit=None, sort=True, ttl=None):
api_limit = 10 if limit == None else limit
results = MultipleResults()
for channel in channels:
results.insert_paginated(self.channel_videos(channel.domain, channel.name, count=api_limit))
if limit or sort:
# We also sort when limit is set, because otherwise limit will discard useful information
results.successes.sort(key = lambda vid: dateutil.isoparse(vid["createdAt"]), reverse=True)
if limit: results.successes = results.successes[0:limit]
return results
# Extra information about video, not contained directly in API result
# a is video result
class VideoInfo:
def __init__(self, api, a, args):
# If the video is being built from a failed request, return that request instead
if isinstance(a, FailedRequest):
print("[ERROR] A video request failed, yet you called VideoInfo about it. You should probably not make useless requests.")
return FailedRequest("A previous video request failed, not attempting to fetch extra information about it.")
quality = args.get("quality")
self.resolutions = []
self.video = None
self.files = a["files"]
if len(self.files) == 0:
self.files = ((a["streamingPlaylists"])[0])["files"]
self.default_res = None
for entry in self.files:
resolution = (entry["resolution"])["id"]
self.resolutions.append(entry["resolution"])
# chose the default quality
if resolution != 0 and quality == None:
if self.default_res == None:
self.default_res = resolution
self.video = entry["fileUrl"]
elif abs(720 - resolution) < abs(720 - self.default_res):
self.default_res = resolution
self.video = entry["fileUrl"]
if str(resolution) == str(quality):
self.video = entry["fileUrl"]
if quality == None:
self.quality = self.default_res
else:
self.quality = quality
self.no_quality_selected = not self.video
# --- IDENTIFIERS HELPERS ---
# This class Account can be either an actual user account or a channel
# TODO: Find another name to express that... Maybe Subscription? Maybe Identifier?
class Account:
def __init__(self, name, domain):
self.name = name
self.domain = domain
# Maybe useful?
def __str__(self):
return self.name + "@" + self.domain
# For debug prints
def __repr__(self):
return self.name + "@" + self.domain
# Builds an Account from one of the following syntaxes, additionally stripping extra whitespace and ignoring `#` as comments:
# - id@server
# - @id@server
# - http(s)://server/c/id
# - http(s)://server/a/id
# returns an Account instance
def parse(identifier):
identifier = identifier.split('#')[0].strip()
# Comment line is returned as empty string
if identifier == '': return None
requested_identifier = identifier
if identifier.startswith('http'):
identifier = identifier[4:]
# HTTPS?
if identifier.startswith('s'): identifier = identifier[1:]
# Remove ://
identifier = identifier[3:]
parts = identifier.split('/')
if len(parts) < 3:
print("[ERROR] Misformed URL for identifier, not enough components: %s" % requested_identifier)
return None
if parts[1] == 'a' or parts[1] == 'c':
# Account or channel found, take the next part
return Account(parts[2], parts[0])
print("[ERROR] Identifier not understood: %s" % requested_identifier)
return None
# Not an HTTP URL, we assume user@server or @user@server address
return unsafe_account_parser(identifier)
# Takes an account in the [@]user@server form and makes sure it makes sense
# Returns an Account in that case, None otherwise
# This function does not further sanitize the string (eg. strip whitespace or remove comments), please use
# Account.parse() for that, which also supports more account formats
def unsafe_account_parser(identifier):
requested_identifier = identifier
# Remove first @, if any
if identifier.startswith('@'): identifier = identifier[1:]
parts = identifier.split('@')
if len(parts) != 2:
print("[ERROR] Wrong identifier, because we are expecting a single '@': %s" % requested_identifier)
return None
if len(parts[0]) == 0 or len(parts[1]) == 0:
print("[ERROR] Wrong identifier, because one part is empty before/after '@': %s" % requested_identifier)
return None
return Account(parts[0], parts[1])
# --- END IDENTIFIERS HELPERS ---

BIN
static/no_avatar.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 KiB

BIN
static/peertube.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 746 B

View File

@ -1,4 +1,4 @@
{% extends "accounts/base.html" %}
{% extends "account/base.html" %}
{% block title %}{{ domain }}{% endblock %}

View File

@ -4,7 +4,7 @@
<table>
<tr>
<td>
<img src="https://{{ account.host }}{{ account.avatar.path }}" height="100"/>
<img src="{% if account.avatar %}https://{{ domain }}{{ account.avatar.path }}{% else %}/no_avatar.png{% endif %}" height="100"/>
</td>
<td>
<b>{{ account.displayName }}</b> {{ account.name }}

View File

@ -1,18 +1,18 @@
{% extends "accounts/base.html" %}
{% extends "account/base.html" %}
{% block title %}{{ domain }}{% endblock %}
{% block content %}
{{ video_channels.total }} Channels
{{ video_channels|length }} Channels
<br>
<br>
<hr>
<div id="wrap">
{% for channel in video_channels.data %}
{% for channel in video_channels %}
<div class="result-wrapper">
<img src="https://{{ domain }}{{ channel.avatar.path }}" height="75"/>
<img src="{% if channel.avatar %}https://{{ domain }}{{ channel.avatar.path }}{% else %}/static/no_avatar.png{% endif %}" height="75"/>
<div class="result-info">
<a href="/{{ domain }}/video-channels/{{ channel.name }}@{{ channel.host }}">

View File

@ -0,0 +1,15 @@
{% extends "account/base.html" %}
{% block title %}{{ domain }}{% endblock %}
{% block content %}
{{ videos|length }} Videos
<br>
<br>
<div id="wrap">
{{ macros.videos(videos) }}
</div>
{% endblock %}

View File

@ -1,30 +0,0 @@
{% extends "accounts/base.html" %}
{% block title %}{{ domain }}{% endblock %}
{% block content %}
{{ videos.total }} Videos
<br>
<br>
<div id="wrap">
{% for video in videos.data %}
<div class="result-wrapper">
<a href="/{{ domain }}/videos/watch/{{ video.uuid }}">
<img src="https://{{ domain }}{{ video.thumbnailPath }}" height="150"/>
</a>
<div class="result-info">
<a href="/{{ domain }}/videos/watch/{{ video.uuid }}">{{ video.name }}</a>
<br>
{{ video.views }} Views
<br>
<b>{{ video.channel.displayName }}</b>
</a>
</div>
</div>
{% endfor %}
</div>
{% endblock %}

View File

@ -1,9 +1,11 @@
{%- import "macros.html" as macros %}
<!doctype html>
<html>
<head>
<link rel="stylesheet" href="/static/style.css">
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="search" type="application/opensearchdescription+xml" href="/opensearch.xml" title="SimpleerTube"/>
{% block head %}
<title>{% block full_title %}{% block title %}{% endblock %} - SimpleerTube{% endblock %}</title>
{% endblock %}
@ -11,7 +13,13 @@
<body>
<center>
<h2>{{ instance_name }}</h2>
{% if instance %}{# instance/channel/account or video.html #}
<h2>{{ instance }}</h2>
<a href="/">Home</a>
<b> | </b>
<a href="/{{ domain }}">Instance Home</a>
<form action="/{{ domain }}/search" method="POST">
<input size="45" style="max-width: 100%" type="text" name="query" id="query" placeholder="Search" value="{{ search_term }}"/>
<button type="submit">Search</button>
@ -26,41 +34,34 @@
<a href="/{{ domain }}/videos/most-liked">Most Liked</a>
<b> | </b>
<a href="/{{ domain }}/videos/recently-added">Recently Added</a>
{% else %}{# search/index.html #}
<h2>Simpleertube</h2>
<h5>A simple frontend for PeerTube</h5>
<form action="/search" method="POST">
<input size="45" type="text" name="query" id="query" placeholder="Search" value="{{ query }}" style="max-width: 100%" />
<button type="submit">Search</button>
</form>
{% endif %}
<br>
<br>
{% block head_content %}{% endblock %}
{% if pagination_url %}
{% if pages_total > 1 %}
{% if page > 1 %}
<a href="{{ pagination_url }}{{ page - 1 }}">Previous</a>
<b> | </b>
{% endif %}
Page {{ page }} of {{ pages_total }}
{% if page < pages_total %}
<b> | </b>
<a href="{{ pagination_url }}{{ page + 1}}">Next</a>
{% endif %}
{% endif %}
{% endif %}
{{ macros.paginate(pagination) }}
<br>
{% block content %}{% endblock %}
{% if pagination_url %}
{% if pages_total > 1 %}
{% if page > 1 %}
<a href="{{ pagination_url }}{{ page - 1 }}">Previous</a>
<b> | </b>
{% endif %}
Page {{ page }} of {{ pages_total }}
{% if page < pages_total %}
<b> | </b>
<a href="{{ pagination_url }}{{ page + 1}}">Next</a>
{% endif %}
{% endif %}
{% endif %}
{{ macros.paginate(pagination) }}
{% if failures %}
<footer>
<h2>While loading this page, some errors were encountered:</h2>
<ul>{% for failure in failures %}
<li>{{ failure | safe }}</li>
{% endfor %}</ul>
</footer>
{% endif %}
</center>
</body>
</html>

View File

@ -1,4 +1,4 @@
{% extends "video_channels/base.html" %}
{% extends "channel/base.html" %}
{% block title %}{{ domain }}{% endblock %}

View File

@ -0,0 +1,32 @@
{% extends "base.html" %}
{% block head_content %}
<table>
<tr>
<td>
<img src="{% if video_channel.avatar %}https://{{ domain }}{{ video_channel.avatar.path }}{% else %}/static/no_avatar.png{% endif %}" height="100"/>
</td>
<td>
<b>{{ video_channel.displayName }}</b> {{ video_channel.name }}
<br>
{{ video_channel.followersCount }} Followers
<br>
Created By
<a href="/{{ video_channel.ownerAccount.host }}/accounts/{{ video_channel.ownerAccount.name }}@{{ video_channel.ownerAccount.host }}"> {{ video_channel.ownerAccount.name }}</a>
</td>
<tr>
</table>
<br>
<br>
<a href="/{{ video_channel.ownerAccount.host }}/video-channels/{{ name }}/videos">Videos</a>
<b> | </b>
<a href="/{{ video_channel.ownerAccount.host }}/video-channels/{{ name }}/video-playlists">Video Playlists</a>
<b> | </b>
<a href="/{{ video_channel.ownerAccount.host }}/video-channels/{{ name }}/about">About</a>
<br>
<br>
{% endblock %}

View File

@ -1,16 +1,16 @@
{% extends "video_channels/base.html" %}
{% extends "channel/base.html" %}
{% block title %}{{ domain }}{% endblock %}
{% block content %}
{{ video_playlists.total }} Playlists
{{ video_playlists|length }} Playlists
<br>
<br>
<hr>
<div id="wrap">
{% for playlist in video_playlists.data %}
{% for playlist in video_playlists %}
<div class="result-wrapper">
<img src="https://{{ domain }}{{ playlist.thumbnailPath }}" height="150"/>

View File

@ -0,0 +1,17 @@
{% extends "channel/base.html" %}
{% block title %}{{ domain }}{% endblock %}
{% block content %}
{{ videos|length }} Videos
<br>
<br>
<hr>
<div id="wrap">
{{ macros.videos(videos) }}
</div>
<hr>
{% endblock %}

10
templates/error.html Normal file
View File

@ -0,0 +1,10 @@
{% extends "base.html" %}
{% block title %}ERROR: {% endblock %}
{% block content %}
<h1>Error {{ error_number }}</h1>
<ul>{% for error in error_reasons %}
<li>{{ error | safe }}</li>
{% endfor %}</ul>
{% endblock %}

View File

@ -1,19 +1,17 @@
<!doctype html>
<html>
<head>
<title>SimpleerTube - Search</title>
<link rel="stylesheet" href="/static/style.css">
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
</head>
<body>
<center>
<h2>SimpleerTube</h2>
<h5>A simple frontend for PeerTube</h5>
{% extends "base.html" %}
<form action="/search" method="POST">
<input size="45" style="max-width: 100%" type="text" name="query" id="query" placeholder="SepiaSearch"/>
<button type="submit">Search</button>
</form>
</center>
</body>
</html>
{% block full_title %}Simpleertube{% endblock %}
{% block content %}
{% if videos|length > 0 %}
<hr>
<h2>{{ videos|length }} latest videos from local subscriptions</h2>
<p>{% for sub in subscriptions %}{% if not loop.first %}, {% endif %}<a href="/{{ sub.host }}{% if sub.ownerAccount %}/video-channels{% else %}/accounts{% endif %}/{{ sub.name }}">{{ sub.displayName }} (@{{ sub.name }}@{{ sub.host }})</a>{% endfor %}</p>
<hr>
<div id="wrap">
{{ macros.videos(videos) }}
</div>{% endif %}
{% endblock content %}

View File

@ -1,30 +1,12 @@
{% extends "base.html" %}
{% block full_title %}{{ instance_name }} - Local{% endblock %}
{% block full_title %}{{ instance }} - Local{% endblock %}
{% block content %}
<hr>
<div id="wrap">
{% for video in videos.data %}
<div class="result-wrapper">
<img src="https://{{ domain }}{{ video.thumbnailPath }}" height="150"/>
<div class="result-info">
<a href="/{{ domain }}/videos/watch/{{ video.uuid }}">{{ video.name }}</a>
<br>
{{ video.views }} Views
<br>
<a href="/{{ domain }}/accounts/{{ video.channel.name }}@{{ video.channel.host }}">
<b>{{ video.channel.displayName }}</b>
</a>
<br>
<a href="/{{ domain }}/accounts/{{ video.account.name }}@{{ video.account.host }}">
{{ video.account.name }}@{{ video.account.host }}
</a>
</div>
</div>
{% endfor %}
{{ macros.videos(videos, domain=domain) }}
</div>
<hr>

View File

@ -1,30 +1,12 @@
{% extends "base.html" %}
{% block full_title %}{{ instance_name }} - Most Liked{% endblock %}
{% block full_title %}{{ instance }} - Most Liked{% endblock %}
{% block content %}
<hr>
<div id="wrap">
{% for video in videos.data %}
<div class="result-wrapper">
<img src="https://{{ domain }}{{ video.thumbnailPath }}" height="150"/>
<div class="result-info">
<a href="/{{ domain }}/videos/watch/{{ video.uuid }}">{{ video.name }}</a>
<br>
{{ video.views }} Views
<br>
<a href="/{{ domain }}/accounts/{{ video.channel.name }}@{{ video.channel.host }}">
<b>{{ video.channel.displayName }}</b>
</a>
<br>
<a href="/{{ domain }}/accounts/{{ video.account.name }}@{{ video.account.host }}">
{{ video.account.name }}@{{ video.account.host }}
</a>
</div>
</div>
{% endfor %}
{{ macros.videos(videos, domain=domain) }}
</div>
<hr>

View File

@ -1,31 +1,12 @@
{% extends "base.html" %}
{% block full_title %}{{ instance_name }} - Recently Added{% endblock %}
{% block full_title %}{{ instance }} - Recently Added{% endblock %}
{% block content %}
<hr>
<div id="wrap">
{% for video in videos.data %}
<div class="result-wrapper">
<img src="https://{{ domain }}{{ video.thumbnailPath }}" height="150"/>
<div class="result-info">
<a href="/{{ domain }}/videos/watch/{{ video.uuid }}">{{ video.name }}</a>
<br>
{{ video.views }} Views
<br>
<a href="/{{ domain }}/accounts/{{ video.channel.name }}@{{ video.channel.host }}">
<b>{{ video.channel.displayName }}</b>
</a>
<br>
<a href="/{{ domain }}/accounts/{{ video.account.name }}@{{ video.account.host }}">
{{ video.account.name }}@{{ video.account.host }}
</a>
</div>
</div>
{% endfor %}
{{ macros.videos(videos, domain=domain) }}
</div>
<hr>

View File

@ -0,0 +1,14 @@
{% extends "base.html" %}
{% block title %}{{ query }} - {{ domain }}{% endblock %}
{% block content %}
<p>{{ results|length }} results</p>
<hr>
<div id="wrap">
{{ macros.videos(results, domain=domain) }}
</div>
<hr>
{% endblock %}

View File

@ -1,30 +1,12 @@
{% extends "base.html" %}
{% block full_title %}{{ instance_name }} - Trending{% endblock %}
{% block full_title %}{{ instance }} - Trending{% endblock %}
{% block content %}
<hr>
<div id="wrap">
{% for video in videos.data %}
<div class="result-wrapper">
<img src="https://{{ domain }}{{ video.thumbnailPath }}" height="150"/>
<div class="result-info">
<a href="/{{ domain }}/videos/watch/{{ video.uuid }}">{{ video.name }}</a>
<br>
{{ video.views }} Views
<br>
<a href="/{{ domain }}/accounts/{{ video.channel.name }}@{{ video.channel.host }}">
<b>{{ video.channel.displayName }}</b>
</a>
<br>
<a href="/{{ domain }}/accounts/{{ video.account.name }}@{{ video.account.host }}">
{{ video.account.name }}@{{ video.account.host }}
</a>
</div>
</div>
{% endfor %}
{{ macros.videos(videos, domain=domain) }}
</div>
<hr>

45
templates/macros.html Normal file
View File

@ -0,0 +1,45 @@
{% macro paginate(pagination) %}
{% if pagination %}
{% if pagination.pages > 1 %}
{% if pagination.current > 1 %}
<a href="{{ pagination.url }}{{ pagination.current - 1 }}">Previous</a>
<b> | </b>
{% endif %}
Page {{ pagination.current }} of {{ pagination.pages }}
{% if pagination.current < pagination.pages %}
<b> | </b>
<a href="{{ pagination.url }}{{ pagination.current + 1}}">Next</a>
{% endif %}
{% endif %}
{% endif %}
{% endmacro %}
{# Extract the createdAt entry of the object and try to display it in a human-friendly manner #}
{%- macro date(object) -%}
<time datetime="{{ object.createdAt }}">{{ object.createdAt|truncate(10, true, "") }}</time>
{%- endmacro -%}
{% macro videos(videos, domain=None) %}
{% for video in videos %}
{% if domain == None %}{% set domain = video.account.host %}{% endif %}
<div class="result-wrapper">
<a href="/{{ domain }}/videos/watch/{{ video.uuid }}">
<img src="{% if video.thumbnailPath.startswith("http") %}{{ video.thumbnailPath }}{% else %}https://{{ domain }}{{ video.thumbnailPath }}{% endif %}" height="150"/>
</a>
<div class="result-info">
<a href="/{{ video.account.host }}/videos/watch/{{ video.uuid }}">{{ video.name }}</a>
<br>{# For video lists, we don't have access to the VideoInfo struct (TODO: why not?), so this is just a quickhack based on peertube date format to extract relevant string out of it for human consumption #}
{{ date(video) }} - {{ video.views }} Views
<br>{% if video.channel %}
<a href="/{{ video.channel.host }}/video-channels/{{ video.channel.name }}">
<b>{{ video.channel.displayName }}</b>
</a>{% endif %}
<br>
<a href="/{{ video.account.host }}/accounts/{{ video.account.name }}">
{{ video.account.name }}@{{ video.account.host }}
</a>
</div>
</div>
{% endfor %}
{% endmacro %}

12
templates/search.html Normal file
View File

@ -0,0 +1,12 @@
{% extends "base.html" %}
{% block full_title %}{{ query }} - Simpleertube search results{% endblock %}
{% block content %}
{{ results|length }} Results
<hr>
<div id="wrap">
{{ macros.videos(results) }}
</div>
<hr>
{% endblock %}

View File

@ -1,34 +0,0 @@
{% extends "base.html" %}
{% block title %}{{ search_term }} - {{ domain }}{% endblock %}
{% block content %}
<p>{{ results.total }} results</p>
<hr>
<div id="wrap">
{% for result in results.data %}
<div class="result-wrapper">
<a href="/{{ domain }}/videos/watch/{{ result.uuid }}">
<img src="https://{{ domain }}{{ result.thumbnailPath }}" height="150"/>
</a>
<div class="result-info">
<a href="/{{ domain }}/videos/watch/{{ result.uuid }}">{{ result.name }}</a>
<br>
{{ result.views }} Views
<br>
<a href="/{{ domain }}/accounts/{{ result.channel.name }}@{{ result.channel.host }}">
<b>{{ result.channel.displayName }}</b>
</a>
<br>
<a href="/{{ domain }}/accounts/{{ result.account.name }}@{{ result.account.host }}">
{{ result.account.name }}@{{ result.account.host }}
</a>
</div>
</div>
{% endfor %}
</div>
<hr>
{% endblock %}

View File

@ -1,70 +0,0 @@
<!doctype html>
<html>
<head>
<title>SimpleerTube - Search Results</title>
<link rel="stylesheet" href="/static/style.css">
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
</head>
<body>
<center>
<h2>SimpleerTube</h2>
<form action="/search" method="POST">
<input size="45" type="text" name="query" id="query" placeholder="Search" value="{{ query }}"/>
<button type="submit">Search</button>
</form>
<br>
<br>
{% if pages_total > 1 %}
{% if page > 1 %}
<a href="/search/{{ query }}/{{ page - 1 }}">Previous</a>
<b> | </b>
{% endif %}
Page {{ page }} of {{ pages_total }}
{% if page < pages_total %}
<b> | </b>
<a href="/search/{{ query }}/{{ page + 1}}">Next</a>
{% endif %}
{% endif %}
<br>
<br>
{{ results.total }} Results
<hr>
<div id="wrap">
{% for result in results.data %}
<div class="result-wrapper">
<a href="/{{ result.channel.host }}/videos/watch/{{ result.uuid }}">
<img src="{{ result.thumbnailUrl }}" height="150">
</a>
<div class="result-info">
<a href="/{{ result.channel.host }}/videos/watch/{{ result.uuid }}">{{ result.name }}</a>
<br>
{% if result.views == 1%}
1 View
{% else %}
{{ result.views }} Views
{% endif %}
</div>
</div>
{% endfor %}
</div>
<hr>
{% if pages_total > 1 %}
{% if page > 1 %}
<a href="/search/{{ query }}/{{ page - 1 }}">Previous</a>
<b> | </b>
{% endif %}
Page {{ page }} of {{ pages_total }}
{% if page < pages_total %}
<b> | </b>
<a href="/search/{{ query }}/{{ page + 1}}">Next</a>
{% endif %}
{% endif %}
</center>
</body>
</html>

View File

@ -22,13 +22,14 @@ By:
<b>Resolutions:</b>
{% else %}
<video height="300" style="max-width: 100%" controls>
<source src="{{ video.video }}">
<source src="{{ video_info.video }}">{% for track in captions %}
<track kind="subtitles" srclang="{{ track.language.id }}" label="{{ track.language.label }}" src="/{{ domain }}/videos/watch/{{ video.uuid }}/{{ track.language.id }}.vtt">{% endfor %}
</video>
<br>
<b>Resolutions:</b>
{% endif %}
{% for resolution in video.resolutions %}
{% for resolution in video_info.resolutions %}
{% if resolution.id == quality %}
{% if resolution.label == "0p" %}
[<a href="?quality={{ resolution.id }}">Audio Only</a>]
@ -55,6 +56,10 @@ Views: <b>{{ video.views }}</b> Likes: <b>{{ video.likes }}</b> Dislikes: <b>{{
<br>
<table>
<tr>
<td><b>Date</b></td>
<td>{{ macros.date(video) }}</td>
</tr>
<tr>
<td><b>Category</b></td>
<td>{{ video.category.label }}</td>
@ -83,11 +88,14 @@ Views: <b>{{ video.views }}</b> Likes: <b>{{ video.likes }}</b> Dislikes: <b>{{
<br>
<br>
<br>
{% if video.commentsEnabled %}
{{ comments.total }} Comments
{% if comments.exception %}
<h2>Failed to load comments:</h2>
<p>{{ comments.exception }}</p>
{% elif video.commentsEnabled %}
{{ comments|length }} Comments
<br>
<br>
{% for comment in comments.data %}
{% for comment in comments %}
{% if not comment.isDeleted %}
<a href="/{{ domain }}/accounts/{{ comment.account.name }}@{{ comment.account.host }}">

View File

@ -1,32 +0,0 @@
{% extends "base.html" %}
{% block head_content %}
<table>
<tr>
<td>
<img src="https://{{ video_channel.host }}{{ video_channel.avatar.path }}" height="100"/>
</td>
<td>
<b>{{ video_channel.displayName }}</b> {{ video_channel.name }}
<br>
{{ video_channel.followersCount }} Followers
<br>
Created By
<a href="/{{ domain }}/accounts/{{ video_channel.ownerAccount.name }}@{{ video_channel.ownerAccount.host }}"> {{ video_channel.ownerAccount.name }}</a>
</td>
<tr>
</table>
<br>
<br>
<a href="/{{ domain }}/video-channels/{{ name }}/videos">Videos</a>
<b> | </b>
<a href="/{{ domain }}/video-channels/{{ name }}/video-playlists">Video Playlists</a>
<b> | </b>
<a href="/{{ domain }}/video-channels/{{ name }}/about">About</a>
<br>
<br>
{% endblock %}

View File

@ -1,30 +0,0 @@
{% extends "video_channels/base.html" %}
{% block title %}{{ domain }}{% endblock %}
{% block content %}
{{ videos.total }} Videos
<br>
<br>
<hr>
<div id="wrap">
{% for video in videos.data %}
<div class="result-wrapper">
<a href="/{{ domain }}/videos/watch/{{ video.uuid }}">
<img src="https://{{ domain }}{{ video.thumbnailPath }}" height="150"/>
</a>
<div class="result-info">
<a href="/{{ domain }}/videos/watch/{{ video.uuid }}">{{ video.name }}</a>
<br>
{{ video.views }} Views
</a>
</div>
</div>
{% endfor %}
</div>
<hr>
{% endblock %}

186
utils.py Normal file
View File

@ -0,0 +1,186 @@
from quart import render_template
from datetime import datetime
from dateutil import parser as dateutil
#import peertube
from peertube import PaginatedResults, FailedRequest, MultipleResults, Account
import html2text
import sys
# --- TEMPLATING HELPERS ---
# Feed me a template name, a peertube.Api.method() result,
# and a lamba to build the template context from result, on success
# If passed result is in fact an iterable (list/tuple) of results,
# they will all be checked for success
# NOTE: Non-critical failures should be passed as failures context variable
async def render(template, result, context):
if hasattr(result, '__iter__'):
failures = []
for res in result:
if isinstance(res, FailedRequest):
failures.append(res.format_html())
if failures:
# One or more failures occurred
return await render_template(
"error.html",
error_number = "500",
error_reasons = failures,
), 500
elif isinstance(result, FailedRequest):
return await render_template(
"error.html",
error_number = "500",
error_reasons = [result.format_html()],
), 500
context = context(result)
try:
return await render_template(
template,
**context,
)
except Exception as e:
return await render_error(FailedRequest(e), context)
# Feed me a single peertube.Api that failed (FailedRequest class), i will render an error
# Also you can add the original context as a dictionary
async def render_error(result, context = {}):
if not isinstance(result, FailedRequest):
print("[ERROR] Called render_error with a non-failed result:\n%s" % result)
return
return await render_template(
"error.html",
error_number = "500",
error_reasons = [result.format_html()],
**context
), 500
# Feed me an error code and a reason, i will render an error
async def render_custom_error(reason, code=500):
return await render_template(
"error.html",
error_number = str(code),
error_reasons = [reason],
), code
# Feed me a peertube.Api result and i will output plaintext, or render an error
async def render_plaintext(result):
if isinstance(result, FailedRequest):
return await render_template(
"error.html",
error_number = "500",
error_reasons = [result.format_html()],
), 500
return result
# --- END TEMPLATING UTILS ---
# --- COMMENT UTILS ---
h2t = html2text.HTML2Text()
h2t.ignore_links = True
# NOTE: Python makes it really hard to accept both boolean and stringy `enabled` values
# This function MUST be called with successful results, don't pass it anything that hasn't been fail-proofed
def comments(api, domain, id, enabled=True, start=0, count=10):
# Skip comments querying/formatting when API previously returned comments are disabled
if enabled != True and str(enabled).lower() != "true": return ""
comments = api.video_comments(domain, id, start=start, count=count)
if isinstance(comments, FailedRequest):
return comments
# Successful request, get comment count from PaginatedResults
if not isinstance(comments, PaginatedResults):
return FailedRequest("WTF. comments() util was called with something that is not PaginatedResults")
# TODO: do some cleanup
#total_comments = comments.total
#comments = comments.data
# Strip the HTML from the comments and convert them to plain text
new_comments = []
for comment in comments.data:
text = h2t.handle(comment["text"]).strip().strip("\n")
comment["text"] = text
new_comments.append(comment)
comments.data = new_comments
#print(total_comments)
#print(len(new_comments))
#return total_comments, new_comments
return comments
# --- END COMMENT UTILS ---
# --- SUBSCRIPTION UTILS ---
# Manage Subscriptions to remote channels and accounts (see INDEX ROUTE in main.py)
class Subscriptions:
def __init__(self, folder, api, cache, ttl=3600):
self.api = api
self.cache = cache
self.folder = folder
self.ttl = ttl
# Returns an entry from cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name
def cached(self, key, ttl=None):
if ttl == None: ttl = self.ttl
caller = sys._getframe(1).f_code.co_name
key_name = "subscriptions-" + caller + "-" + key
res = self.cache.get(key_name, ttl=ttl)
if res == None:
print("[CACHE] Entry not found for %s: %s" % (caller, key))
return None
print("[CACHE] Found entry for %s: %s" % (caller, key))
return res
# Save an entry into cache, automatically prefixing "peertube-FUNC-" where FUNC is the caller method name
def save(self, key, value):
caller = sys._getframe(1).f_code.co_name
key_name = "subscriptions-" + caller + "-" + key
print("[CACHE] Saving entry for %s: %s" % (caller, key))
self.cache.set(key_name, value)
# Load subscriptions from a file called `kind`.list (60s cache)
def load(self, kind):
print("[INFO] Refreshing subscriptions %s from %s/%s.list" % (kind, self.folder, kind))
try:
with open(self.folder + '/' + kind + '.list', 'r') as f:
subscriptions = map(Account.parse, f.read().splitlines())
except Exception as e:
print("[INFO] No `%s.list` file to load for local subscriptions in %s, or the file isn't readable" % (kind, self.folder))
subscriptions = []
# Remove comment entries and empty lines
results = list(filter(lambda entry: isinstance(entry, Account), subscriptions))
print("[DEBUG] %s subscriptions: %s" % (kind, results))
return results
# Get the info about local subscriptions for accounts and channels
def info(self, ttl=None):
results = MultipleResults().merge_with(self.api.accounts(self.accounts(), ttl=ttl)) \
.merge_with(self.api.channels(self.channels(), ttl=ttl))
return results
# Get the latest `limit` videos from accounts and channels subscriptions combined. Returns a list of successes and failures
# NOTE: duplicates are not handled, why would you add both an account and the corresponding channel?
def videos(self, page=None, sort=True, lang=None, ttl=None):
results = MultipleResults().merge_with(self.api.accounts_videos(self.accounts(), ttl=ttl, sort=False)) \
.merge_with(self.api.channels_videos(self.channels(), ttl=ttl, sort=False))
if lang != None:
results.successes = list(filter(lambda vid: vid["language"]["id"] == lang or vid["language"]["id"] == None, results.successes))
if page or sort:
results.successes.sort(key = lambda vid: dateutil.isoparse(vid["createdAt"]), reverse=True)
if page: results.successes = results.successes[15*(page-1):15*page]
return results
# List of locally-subscribed accounts (accounts.list)
# Enforce 60s refresh by default
def accounts(self, ttl=60):
cached = self.cached("accounts", ttl=ttl)
if cached == None:
# Refresh local accounts subscriptions
res = self.load("accounts")
self.save("accounts", res)
return res
return cached
# List of locally-subscribed channels (channels.list)
# Enforce 60s refresh by default
def channels(self, ttl=60):
cached = self.cached("channels", ttl=ttl)
if cached == None:
# Refresh local channels subscriptions
res = self.load("channels")
self.save("channels", res)
return res
return cached