bubble/user.py

499 lines
20 KiB
Python

import re
import urllib.parse as urlparse
from model import Notification, Segment, User, Post, Subspace, \
FOLLOW_POST, FOLLOW_SUBSPACE, FOLLOW_USER, MUTE_POST, MUTE_SUBSPACE, MUTE_USER
from utils import plural, plural_s, clean_query, is_empty_query, shorten_text, \
strip_links, clean_title
def user_actions(session):
db = session.db
req = session.req
user = session.user
if req.path.startswith(session.path + 'like/') or \
req.path.startswith(session.path + 'unlike/'):
if not user:
return 60, 'Login required'
if not session.is_likes_enabled():
return 50, 'Likes are disabled'
if user.role == User.LIMITED:
return 61, 'Not authorized'
found = re.match(r'^(like|unlike)/(\d+)$', req.path[len(session.path):])
action = found.group(1)
post_id = int(found.group(2))
post = db.get_post(id=post_id)
if not post:
return 51, 'Not found'
db.modify_likes(session.user, post, add=(action == 'like'))
return 30, post.page_url()
elif req.path.startswith(session.path + 'lock/') or \
req.path.startswith(session.path + 'unlock/'):
if not user:
return 60, 'Login required'
if user.role == User.LIMITED:
return 61, 'Not authorized'
found = re.match(r'^(lock|unlock)/(\d+)$', req.path[len(session.path):])
action = found.group(1)
post_id = int(found.group(2))
post = db.get_post(id=post_id)
if not post:
return 51, 'Not found'
if not session.is_lockable(post):
return 60, 'Not authorized'
db.update_post(post, flags=post.flags ^ Post.LOCKED_FLAG)
return 30, post.page_url() + '/more'
elif req.path.startswith(session.path + 'react/') or \
req.path.startswith(session.path + 'unreact/'):
if not user:
return 60, 'Login required'
if not session.is_reactions_enabled():
return 50, 'Reactions are disabled'
if user.role == User.LIMITED:
return 61, 'Not authorized'
found = re.match(r'^(react|unreact)/(\d+)$', req.path[len(session.path):])
action = found[1]
post_id = int(found[2])
post = db.get_post(id=post_id)
if not post:
return 51, 'Not found'
if action == 'unreact':
db.modify_reactions(session.user, post, None)
else:
available_reactions = session.bubble.user_reactions.split()
if is_empty_query(req):
selected = db.get_user_reaction(post, session.user.id)
page = 'Choose a reaction to:\n'
page += session.gemini_feed_entry(post, session.context)
page += '\n'
idx = 1
for react in available_reactions:
if selected == react:
page += f'=> /unreact/{post.id} ❌ Reacted: {selected}\n'
else:
page += f'=> /react/{post.id}?{idx} {react}\n'
idx += 1
page += f'\n=> {post.page_url()} Back to post\n'
return page
else:
idx = int(clean_query(req))
selected = available_reactions[idx - 1]
db.modify_reactions(user, post, selected)
return 30, session.path + f'react/{post.id}'
elif req.path.startswith(session.path + 'remind/'):
if not user:
return 60, 'Login required'
found = re.match(r"^remind/(\d+)$", req.path[len(session.path):])
post_id = int(found.group(1))
post = db.get_post(id=post_id)
if not post:
return 51, 'Not found'
db.notify_reminder(session.user, post)
return 30, post.page_url()
elif req.path.startswith(session.path + 'report/'):
if not user:
return 60, 'Login required'
if user.role == User.LIMITED:
return 61, 'Not authorized'
found = re.match(r"^report/(\d+)(/([A-Za-z0-9]+))?$", req.path[len(session.path):])
post_id = int(found[1])
token = found[3]
post = db.get_post(id=post_id)
if not post:
return 51, 'Not found'
if token and db.verify_token(user, token):
db.notify_moderators(Notification.REPORT, session.user, post)
return 30, post.page_url()
subspace = db.get_subspace(id=post.subspace)
kind = 'comment' if post.parent \
else 'issue' if subspace.flags & Subspace.ISSUE_TRACKER \
else 'post'
page = f'# ⚠️\n\nYou are about to report the following {kind} to the moderators:\n'
page += session.gemini_feed_entry(post, session.context)
page += f'\n=> /report/{post.id}/{session.get_token()} ⚠️ Send the report\n'
page += f'=> {post.page_url()} Back to post\n'
return page
elif req.path.startswith(session.path + 'transmit/'):
if not user:
return 60, 'Login required'
if not session.is_antenna_enabled():
return 50, 'Not available'
if user.role == User.LIMITED:
return 61, 'Not authorized'
# Parse arguments.
found = re.match(r"^transmit/(\d+)/(post/(\d+)|feed/(([\w-]+)(/([\w-]+))?))", req.path[len(session.path):])
try:
antenna_index = int(found[1])
post_id = int(found[3]) if found[3] else None
username = found[5]
tag_filter = found[7]
if post_id != None:
kind = 'post'
post = db.get_post(id=post_id)
if post.user != user.id:
return 61, "Cannot submit other users' posts"
else:
kind = 'feed'
if username != user.name:
return 61, "Cannot submit other users' feeds"
# Confirmation page.
antenna_label = session.bubble.antenna_labels[antenna_index]
page = f'# 📡\n'
page += f'=> {session.bubble.antenna_abouts[antenna_index]} About {antenna_label}\n\n'
page += f'You are about to submit the following:\n'
if post_id != None:
page += session.gemini_feed_entry(post)
post_sub = db.get_subspace(id=post.subspace)
sub_path = f"u/{session.user.name}" if post_sub.owner == user.id \
else f"s/{post_sub.name}"
antenna_feed = f"gemini://{session.bubble.hostname}{session.path}{sub_path}/{post.id}/antenna"
else:
usub = db.get_subspace(name=username)
antenna_feed = f"{session.server_root()}{session.path}u/{username}{'/tag/' + tag_filter if tag_filter else ''}?feed"
page += f"=> {antenna_feed} {usub.info if usub.info else usub.name}{' [#' + tag_filter + ']' if tag_filter else ''}\n"
page += '\n' + session.bubble.antenna_links(kind, antenna_feed)[antenna_index]
if post_id:
page += f'=> {post.page_url()} Back to post\n'
else:
page += f'=> {session.path}u/{username} Back to u/{username}\n'
except Exception as x:
print(x)
return 51, 'Not found'
return page
elif req.path.startswith(session.path + 'thanks/'):
if not user:
return 60, 'Login required'
if user.role == User.LIMITED:
return 61, 'Not authorized'
found = re.match(r"^thanks/(\d+)$", req.path[len(session.path):])
post_id = int(found.group(1))
post = db.get_post(id=post_id)
if not post:
return 51, 'Not found'
db.notify_thanks(session.user, post)
page = '# 🙏\n\nYou thanked\n'
poster = db.get_user(id=post.user)
page += f'=> /u/{poster.name} {poster.avatar} {poster.name}\n'
page += 'for:\n'
page += session.gemini_feed_entry(post, session.context)
return page
elif req.path.startswith(session.path + 'vote/'):
try:
if not user:
return 60, 'Login required'
if user.role == User.LIMITED:
return 61, 'Not authorized'
m = re.search(r'/vote/(\d+)/([0-9a-zA-Z]{10})$', req.path)
if not m:
return 59, 'Bad request'
seg_id = int(m[1])
token = m[2]
if not db.verify_token(user, token):
return 61, 'Expired link'
segment = db.get_segment(id=seg_id)
if not segment or segment.type != Segment.POLL:
return 51, 'Not found'
db.modify_vote(user, segment)
return 30, db.get_post(id=segment.post).page_url()
except ValueError:
return 59, 'Bad request'
elif re.search(r'^(follow|unfollow|mute|unmute)/', req.path[len(session.path):]):
if not session.user:
return 60, 'Login required'
if user.role == User.LIMITED:
return 61, 'Not authorized'
found = re.match(r'(follow|unfollow|mute|unmute)/(post/(\d+)|([\w%-]+)|(u/[\w%-]+)|(s/[\w%-]+))$',
req.path[len(session.path):])
if not found:
return 59, 'Bad request'
action = found[1]
post_id = found[3]
u_name = urlparse.unquote(found[4]) if found[4] else None
u_sub = urlparse.unquote(found[5]) if found[5] else None
s_sub = urlparse.unquote(found[6]) if found[6] else None
is_follow = action in ('follow', 'unfollow')
if u_name != None:
target_type = FOLLOW_USER if is_follow else MUTE_USER
target_id = db.get_user(name=u_name).id
dst = f'/u/{u_name}'
elif post_id != None:
target_type = FOLLOW_POST if is_follow else MUTE_POST
target_id = int(post_id)
post = db.get_post(target_id)
dst = post.page_url() + '/more'
else:
target_type = FOLLOW_SUBSPACE if is_follow else MUTE_SUBSPACE
sub_name = (u_sub if u_sub != None else s_sub)[2:]
sub = db.get_subspace(name=sub_name)
target_id = sub.id
dst = '/' + sub.title()
if is_follow:
db.modify_follows(session.user, action == 'follow', target_type, target_id)
else:
db.modify_mutes(session.user, action == 'mute', target_type, target_id)
return 30, dst
elif req.path.startswith(session.path + 'notif/'):
if not session.user:
return 60, 'Login required'
found = re.match(r'^notif/((\d+)|(clear|history|feed))$', req.path[len(session.path):])
if not found:
return 59, 'Bad request'
notif_id = int(found[2]) if found[2] else None
action = found[3]
if action == 'clear':
db.get_notifications(session.user, clear=True)
db.update_user(session.user, active=True)
return 30, '/dashboard'
if action == 'history':
page = '# Notification History\n'
page += '=> /dashboard Back to Dashboard\n\n'
notifs = db.get_notifications(session.user, include_hidden=True, sort_desc=True)
cur_ymd = None
for notif in notifs:
ymd = notif.ymd_date(tz=session.tz)
link, label = notif.entry(show_age=False, with_time=True, tz=session.tz)
if cur_ymd != ymd:
cur_ymd = ymd
page += f'## {ymd}\n\n'
page += f'=> {link} {label}\n'
return page
if action == 'feed':
page = f'# @{session.user.name}\n'
page += '\n## Notifications\n'
notifs = db.get_notifications(session.user)
if not notifs:
page += '\nNo activity.\n'
for notif in notifs:
link, label = notif.entry(show_age=False)
page += f'=> {link} {notif.ymd_date(tz=session.tz)} {label}\n'
return page
notif = db.get_notification(session.user, notif_id, clear=True)
db.update_user(session.user, active=True)
if not notif:
return 30, '/dashboard'
if notif.comment:
cmt = db.get_post(id=notif.comment)
if not cmt:
return 51, 'Not found'
return 30, cmt.page_url()
if notif.post:
post = db.get_post(id=notif.post)
if not post:
return 51, 'Not found'
return 30, post.page_url()
if notif.subspace:
subs = db.get_subspace(id=notif.subspace)
if not subs:
return 51, 'Not found'
return 30, session.path + subs.title()
notif_src = db.get_user(id=notif.src)
if notif_src:
if notif.type == Notification.USER_FLAIR_CHANGED:
return 30, f'{session.path}settings/flair/{notif_src.name}'
return 30, f'{session.path}u/{notif_src.name}'
return 42, 'Invalid notification'
class Subgrouping:
"""Sorts notifications by related post ID and descending time."""
def __init__(self, user, notifs):
self.user = user
self.by_post = {}
ordered = sorted(notifs, key=lambda n: -n.ts)
for notif in ordered:
if not notif.post in self.by_post:
self.by_post[notif.post] = []
self.by_post[notif.post].append(notif)
def render(self):
src = []
top_head = False
prev_top_head = False
for post_id, notifs in self.by_post.items():
prev_top_head = top_head
top_head = post_id and len(notifs) > 1
# Print a common heading.
if top_head:
n = notifs[0]
if n.post_subname and n.post_subowner != self.user.id:
sub_text = f" in {'u/' if n.post_subowner else 's/'}{n.post_subname}"
else:
sub_text = ''
if src:
src.append('')
src.append(f'"{n.title_text()}"{sub_text}:')
elif prev_top_head:
# Sepearate from previous grouping.
src.append('')
# Print the notifications.
for notif in notifs:
src.append('=> %s %s' % notif.entry(with_title=not top_head))
return '\n'.join(src) + '\n'
def make_dashboard_page(session):
if not session.user:
return 60, "Login required"
user = session.user
db = session.db
page = f'# {session.user.name}: Dashboard\n'
if session.user.role == User.ADMIN:
page += f'=> /admin/ 🔧 {session.bubble.site_name} administration\n'
page += f'=> /settings ⚙️ Settings\n'
page += f'=> /u/{session.user.name} {session.user.avatar} u/{session.user.name}\n'
notifs = db.get_notifications(session.user)
n = len(notifs)
page += f'\n## {n} Notification{plural_s(n)}\n'
if n:
# Group in two: @user and Followed.
yours = []
moderated = []
followed = []
for notif in notifs:
if not notif.type in Notification.PRIORITY or \
notif.type == Notification.REACTION or \
Notification.PRIORITY[notif.type] >= 10:
yours.append(notif)
else:
if notif.post_subid in user.moderated_subspace_ids:
moderated.append(notif)
else:
followed.append(notif)
if yours:
page += f'\n### @{session.user.name}\n'
page += Subgrouping(user, yours).render()
if moderated:
page += f'\n### Moderated\n'
page += Subgrouping(user, moderated).render()
if followed:
page += f'\n### Followed\n'
page += Subgrouping(user, followed).render()
page += '\n=> /notif/clear 🧹 Clear all\n'
page += '=> /notif/feed Notifications feed\nSubscribe to this Gemini feed to view your unread notifications in a feed reader.\n'
page += '=> /notif/history 🕓 View history\n'
# Visiting the Dashboard will prevent emails from being sent for these notifications.
db.mark_notifications_sent(session.user)
drafts = db.get_posts(user=session.user, draft=True)
n = len(drafts)
page += f'\n## {n} Draft{plural_s(n)}\n'
if n:
for post in drafts:
if not post.sub_owner and post.parent == 0:
sub = f' · s/{post.sub_name}'
else:
sub = ''
page += f'=> /edit/{post.id} {post.ymd_date(tz=session.tz)} {post.title_text()}{sub}\n'
subs = db.get_subspaces(mod=session.user.id)
n = len(subs)
if n:
page += f'\n## {n} Subspace{plural_s(n)}\n'
for sub in subs:
page += sub.subspace_link()
page += '\n## Index\n'
page += f'=> /u/{user.name}/index/posts {plural(db.count_posts(user=user, ignore_omit_flags=True), "post")}\n'
page += f'=> /u/{user.name}/index/comments {plural(db.count_posts(user=user, is_comment=True), "comment")}\n'
return page
def make_user_index_page(session, mode):
db = session.db
user = session.c_user
page = ''
is_posts = not (mode == 'comments')
page += f'# {user.name}: {"Posts" if is_posts else "Comments"}\n'
page += '=> /dashboard Back to Dashboard\n'
if is_posts:
posts = db.get_posts(user=user, draft=False, comment=False, sort_by_subspace=True)
cur_sub = None
ymd = None
for post in posts:
# Headings.
if cur_sub != post.subspace:
cur_sub = post.subspace
ymd = None
page += f"\n## {post.sub_name}\n"
post_ymd = post.ymd_date(tz=session.tz)
if ymd != post_ymd[:7]:
ymd = post_ymd[:7]
page += f"\n### {ymd}\n"
# List entry linking to post
title = post.title if post.title else shorten_text(strip_links(clean_title(post.summary)), 120)
if post.issueid:
title = f'[#{post.issueid}] ' + title
SEP = " · "
meta = []
if post.num_cmts:
meta.append(f'💬 {post.num_cmts}')
if post.num_likes:
meta.append(f'👍 {post.num_likes}')
if post.tags:
meta.append(post.tags)
entry = f'=> {post.page_url()} {post_ymd} {title}{SEP + SEP.join(meta) if meta else ""}\n'
page += entry
else:
comments = db.get_posts(user=user, draft=False, comment=True,
sort_by_subspace=True, sort_by_post=True)
cur_parent = None
cur_sub = None
for cmt in comments:
# Headings.
if cur_sub != cmt.subspace:
cur_sub = cmt.subspace
ymd = None
page += f"\n## {cmt.sub_name}\n"
if cur_parent != cmt.parent:
cur_parent = cmt.parent
parent_post = db.get_post(id=cur_parent)
if parent_post:
parent_title = parent_post.title if parent_post.title else f'"{shorten_text(strip_links(clean_title(parent_post.summary)), 120)}"'
if parent_post.issueid:
parent_title = f'[#{parent_post.issueid}] ' + parent_title
else:
if not session.user or session.user.id != cmt.user:
continue # Don't show other people's comments on deleted posts.
parent_title = f"Deleted post (ID:{cur_parent})"
page += f"\n{parent_title}\n"
cmt_ymd = cmt.ymd_date(tz=session.tz)
title = shorten_text(strip_links(clean_title(cmt.summary)), 120)
entry = f'=> {cmt.page_url()} {cmt_ymd} "{title}"\n'
page += entry
return page