bubble/subspace.py

960 lines
38 KiB
Python
Raw Normal View History

import datetime
import io
import time
import urllib.parse as urlparse
import zipfile
from model import Search, Segment, User, Subspace, Post
from utils import *
def make_subspace(session):
if not session.user:
return 60, "Login required"
if session.user.role == User.LIMITED:
return 61, "Not authorized"
if not session.bubble.user_subspaces and session.user.role == User.BASIC:
return 61, "Not authorized"
db = session.db
req = session.req
if is_empty_query(req):
return 10, "Enter name for new subspace: " + session.NAME_HINT
name = clean_query(req)
if not is_valid_name(name):
return 10, "That is an invalid subspace name. " + session.NAME_HINT
try:
db.create_subspace(name, session.user.id)
return 30, f'/s/{name}/admin'
except:
return 10, "That subspace already exists. " + session.NAME_HINT
def make_subspaces_page(session):
user = session.user
req = session.req
db = session.db
LABELS = {
'active': 'Activity',
'alpha': 'Summary',
'chrono': 'Chronological Summary',
'name': 'Index'
}
LOCK = ' 🔒'
ACTIVE_AGE = 60 * 24 * 3600
ARCHIVAL_MONTHS = 12
ARCHIVAL_AGE = ARCHIVAL_MONTHS * 30 * 24 * 3600
arg = clean_query(req)
if arg == 'mode':
page = "Choose view mode for the subspace index:\n\n"
for key, label in LABELS.items():
page += f"=> /s/?{key} {label}\n"
return page
page = '# Subspaces\n'
if user:
page += session.dashboard_link()
if user.role != User.LIMITED and (
session.bubble.user_subspaces or user.role == User.ADMIN):
page += f'=> /new-subspace 🌒 New subspace\n'
2023-11-12 10:41:03 +00:00
page += '=> ?mode Change view mode\n'
page += '=> / Back to front page\n\n'
# View mode is determined by the query string.
if arg in LABELS:
view_mode = arg
else:
view_mode = 'name'
subs = db.get_subspaces(owner=0)
locked_subs = db.get_subspaces(locked=True)
locked_ids = set([sub.id for sub in locked_subs])
def sub_link(sub):
lock_icon = LOCK if sub.id in locked_ids else ''
return f'=> /{sub.title()} {sub.title()}{lock_icon}\n'
def sub_stats(sub):
stats = []
if sub.num_posts:
kind = 'issue' if sub.flags & Subspace.ISSUE_TRACKER else 'post'
stats.append(f"{sub.num_posts} {kind}{plural_s(sub.num_posts)}")
if sub.num_cmts:
stats.append(f"{sub.num_cmts} comment{plural_s(sub.num_cmts)}")
if sub.num_people > 1:
stats.append(f"{sub.num_people} people")
return ' · '.join(stats) + "\n"
def sub_latest_post(sub):
latest = db.get_post(id=sub.latest_post_id) if sub.latest_post_id else None
if latest:
title = latest.quoted_title()
age = latest.age(tz=session.tz)
return f"{title} by {latest.poster_avatar} {latest.poster_name} · {age}\n"
return ''
if len(subs) == 0:
page += 'No subspaces.\n'
elif view_mode == 'name':
page += f'## {LABELS[view_mode]}\n\n'
for sub in subs:
page += sub_link(sub)
elif view_mode == 'alpha' or view_mode == 'chrono':
# Basic alphabetical index.
page += f'## {LABELS[view_mode]}\n'
if view_mode == 'chrono':
2023-11-12 10:41:03 +00:00
page += '\nSubspaces with recent activity are listed first.\n'
subs = sorted(subs, key=lambda s: s.ts_active, reverse=True)
for sub in subs:
2023-11-12 10:41:03 +00:00
page += '\n' + sub_link(sub)
if sub.num_posts > 0:
kind = 'issue' if sub.flags & Subspace.ISSUE_TRACKER else 'post'
page += f"{sub.num_posts} {kind}{plural_s(sub.num_posts)} · {ago_text(sub.ts_active, tz=session.tz)}\n"
else:
page += 'Empty\n'
else:
# Divide into Active, Dormant, Empty, Pending Archival.
active = []
dormant = []
empty = []
pending = []
now_time = time.time()
for sub in sorted(subs, key=lambda s: s.ts_active, reverse=True):
since_active = now_time - sub.ts_active
if since_active > ARCHIVAL_AGE:
pending.append(sub)
elif not sub.latest_post_id:
empty.append(sub)
elif since_active > ACTIVE_AGE:
dormant.append(sub)
else:
active.append(sub)
if len(active):
page += f'## {len(active)} Active\n'
for sub in active:
page += '\n' + sub_link(sub)
page += sub_stats(sub)
page += sub_latest_post(sub)
if len(dormant):
page += f'\n## {len(dormant)} Inactive 😴\n'
for sub in dormant:
page += '\n' + sub_link(sub)
page += sub_stats(sub)
page += sub_latest_post(sub)
if len(empty):
page += f'\n## {len(empty)} Empty\n\n'
for sub in empty:
page += sub_link(sub)
if len(pending):
page += f'\n## {len(pending)} Pending Archival 🪦\n'
page += f'No activity in the last {ARCHIVAL_MONTHS} months:\n'
for sub in pending:
page += '\n' + sub_link(sub)
page += sub_stats(sub)
return page
2023-05-10 10:01:08 +00:00
def subspace_admin_actions(session, action):
if not session.is_user_mod:
2023-05-10 12:20:32 +00:00
return 61, "Moderator rights required"
2023-05-10 10:01:08 +00:00
user = session.user
2023-05-10 10:01:08 +00:00
req = session.req
db = session.db
subspace = session.context
admin_link = f'/{subspace.title()}/admin'
is_subspace_deletable = subspace.owner == 0 and (user.role == User.ADMIN or
db.is_empty_subspace(subspace))
2023-05-10 10:01:08 +00:00
page = f'# {subspace.title()}: Administration\n'
if user.role == User.ADMIN:
m = re.search(r'/repo/(new|delete|clone-url|view-url|idlabel)?(/([0-9a-zA-Z]{10}))?$', req.path)
if m:
if m[1] == 'new':
db.create_repository(subspace)
return 30, f'{admin_link}/repo/'
elif m[1] == 'delete' and m[3]:
if not db.verify_token(user, m[3]):
return 61, 'Not authorized'
if is_empty_query(req):
return 10, 'Really delete repository and commit history? (DELETE to confirm)'
if req.query == 'DELETE':
db.destroy_repository(db.get_repository(subspace=subspace))
return 30, f'{admin_link}/repo/'
elif m[1] == 'clone-url':
if req.query is None:
return 10, 'HTTPS URL where to clone repository:'
db.update_repository(db.get_repository(subspace=subspace),
clone_url=clean_query(req))
return 30, f'{admin_link}/repo/'
elif m[1] == 'view-url':
if req.query is None:
return 10, 'Gemini URL for viewing commits:'
db.update_repository(db.get_repository(subspace=subspace),
view_url=clean_query(req))
return 30, f'{admin_link}/repo/'
elif m[1] == 'idlabel':
if req.query is None:
return 10, 'Label for marking issue IDs in commit messages: (For example, "IssueID")'
db.update_repository(db.get_repository(subspace=subspace),
idlabel=clean_query(req))
return 30, f'{admin_link}/repo/'
page += f'=> {admin_link} Go back\n'
page += '\n## Git Repository\n'
repo = db.get_repository(subspace=subspace)
if not repo:
page += '=> {admin_link}/repo/new 🛢️ New repository\n'
return page
page += f'\n=> {admin_link}/repo/clone-url Clone HTTPS URL: {repo.clone_url if repo.clone_url else "(not set)"}\n'
page += f'=> {admin_link}/repo/idlabel Issue ID label: {repo.idlabel if repo.idlabel else ""}\n'
page += f'\n=> {admin_link}/repo/view-url Commit view Gemini URL: {repo.view_url if repo.view_url else "(not set)"}\n'
# Status information.
page += '\n### Status\n'
if repo.ts_fetch is None:
if not repo.clone_url:
page += 'Not configured.\n'
else:
page += 'Repository will be fetched soon.\n'
else:
n = db.count_commits(repo)
page += f'{n} commits in history. Repository was last fetched on {datetime.datetime.fromtimestamp(repo.ts_fetch, UTC).strftime("%Y-%m-%d at %H:%M:%S %Z")}.\n'
page += f'\n=> {admin_link}/repo/delete/{db.get_token(user)} ❌ Delete repository\n'
return page
2023-05-10 10:01:08 +00:00
if action == 'info':
if req.query == None:
return 10, f"Description for {session.context.title()}:"
db.update_subspace(session.context, info=clean_description(clean_query(req)), actor_id=user.id)
2023-05-10 10:01:08 +00:00
return 30, admin_link
if action == 'url':
if req.query == None:
return 10, f"Featured link for {session.context.title()}: (URL and label)"
try:
link = form_link(parse_link_segment_query(req))
except:
link = ''
db.update_subspace(session.context, url=link, actor_id=user.id)
2023-05-10 10:01:08 +00:00
return 30, admin_link
2023-05-10 12:20:32 +00:00
if action == 'omit-all':
if session.context.flags & Subspace.HIDE_OMIT_SETTING_FLAG:
return 61, 'Not authorized'
2023-05-10 12:20:32 +00:00
db.update_subspace(session.context, flags=session.context.flags ^ Subspace.OMIT_FROM_ALL_FLAG)
return 30, admin_link
# Actions that require link verification.
m = re.search(r'/(delete|tracker|rename|add-mod|remove-mod)/([0-9a-zA-Z]{10})$', req.path)
2023-05-10 12:20:32 +00:00
if m:
token = m[2]
if not db.verify_token(session.user, token):
2023-05-10 10:01:08 +00:00
return 61, 'Not authorized'
2023-05-10 12:20:32 +00:00
if m[1] == 'add-mod':
if is_empty_query(req):
return 10, 'Enter user to add as moderator:'
adding = db.get_user(name=clean_query(req))
if not adding:
return 51, 'Not found'
db.modify_mods(session.context, actor=session.user, add=adding)
return 30, admin_link
if m[1] == 'remove-mod':
if is_empty_query(req):
return 10, 'Enter user to remove as moderator:'
removing = db.get_user(name=clean_query(req))
if not removing:
return 51, 'Not found'
db.modify_mods(session.context, actor=session.user, remove=removing)
return 30, admin_link
if m[1] == 'rename':
prompt = f'Enter new name for {session.context.name}? (Warning: Links to subspace will break!)'
if is_empty_query(req):
return 10, prompt
new_name = clean_query(req)
if not is_valid_name(new_name):
return 10, prompt
try:
db.update_subspace(session.context, name=new_name, actor_id=user.id)
except:
return 10, prompt
return 30, f'/s/{new_name}/admin'
if m[1] == 'delete' and is_subspace_deletable:
2023-05-10 12:20:32 +00:00
if is_empty_query(req):
return 10, f'Really delete {session.context.title()}? (Enter DELETE to confirm.)'
if req.query == 'DELETE':
db.destroy_subspace(subspace)
return 30, '/dashboard'
2023-05-10 12:20:32 +00:00
return 30, admin_link
elif m[1] == 'tracker' and is_subspace_deletable:
2023-05-10 12:20:32 +00:00
new_flags = session.context.flags ^ Subspace.ISSUE_TRACKER
if new_flags & Subspace.ISSUE_TRACKER:
# Issues shouldn't be listed in All Posts.
new_flags = new_flags | Subspace.OMIT_FROM_ALL_FLAG
db.update_subspace(session.context, flags=new_flags)
return 30, admin_link
2023-05-10 10:01:08 +00:00
page += session.context.subspace_link()
2023-05-10 10:01:08 +00:00
if not session.context.flags & (Subspace.ISSUE_TRACKER | Subspace.HIDE_OMIT_SETTING_FLAG):
page += f'\n=> {admin_link}/omit-all {session.CHECKS[session.context.flags & Subspace.OMIT_FROM_ALL_FLAG]} Omit from All Posts\n'
page += '\n## About\n'
2023-05-10 10:01:08 +00:00
page += '\n### Description\n'
page += (session.context.info if session.context.info else '(no description)') + '\n'
page += f'=> {admin_link}/info ✏️ Edit\n'
2023-05-10 10:01:08 +00:00
page += '\n### Featured Link\n'
page += (f'=> {session.context.url}' if session.context.url else '(no featured link)') + '\n'
page += f'=> {admin_link}/url ✏️ Edit\n'
2023-05-10 10:01:08 +00:00
page += '\n## Moderators\n\n'
2023-05-10 10:01:08 +00:00
mods = db.get_mods(session.context)
for mod in mods:
page += f'=> /u/{mod.name} {mod.avatar} {mod.name}\n'
page += f'=> {admin_link}/add-mod/{session.get_token()} Add moderator\n'
if len(mods) > (0 if user.role == User.ADMIN else 1):
page += f'=> {admin_link}/remove-mod/{session.get_token()} Remove moderator\n'
2023-05-10 10:01:08 +00:00
if is_subspace_deletable:
2023-05-10 12:20:32 +00:00
page += '\n## Issue Tracking\n'
page += f'\n=> {admin_link}/tracker/{db.get_token(session.user)} {session.CHECKS[nonzero(session.context.flags & Subspace.ISSUE_TRACKER)]} Subspace is an issue tracker\n'
page += 'Posts in an issue tracker are designated issue IDs and have an Open/Closed status. Issues may refer to Git repository commits via hash, and commit messages can refer to issues by ID. Non-issue posts are not allowed in an issue tracker subspace.\n'
if session.user.role == User.ADMIN:
page += f'\n=> {admin_link}/repo/ ⚙️ Git repository settings\n'
2023-05-10 12:20:32 +00:00
2023-05-10 10:01:08 +00:00
page += '\n## Actions\n'
2023-05-10 12:20:32 +00:00
page += '\n=> /export/' + session.context.title() + '.gpub 📤 Export data archive\n'
2023-05-10 10:01:08 +00:00
page += f'Download a ZIP archive containing all posts and comments in {session.context.title()}. The archive has Gempub metadata so it can also be viewed in a Gempub reader.\n'
page += f'\n=> {admin_link}/rename/{session.get_token()} Rename subspace\n'
page += 'Links pointing to the subspace will break when the name is changed.\n'
if is_subspace_deletable:
page += f'\n=> {admin_link}/delete/{session.get_token()} ⚠️ Delete subspace {session.context.title()}\n'
if not db.is_empty_subspace(subspace):
page += 'All posts and comments in the subspace will be deleted. Exporting a backup beforehand is recommended.\n'
else:
page += 'There are no posts in the subspace.\n'
2023-05-10 10:01:08 +00:00
return page
def split_terms(text):
import shlex
2023-05-21 11:54:27 +00:00
return list(filter(lambda t: len(t) >= 2, map(str.strip,
shlex.split(text.replace("'", "\\'")))))
def make_search_page(session):
req = session.req
db = session.db
user = session.user
LIMIT = 30
m = re.match(r'(/([us])/([\w%-]+))?/search(/(\d+))?', req.path)
if not m:
return 59, 'Bad request'
if m[2] or m[3]:
ident = urlparse.unquote(m[3])
scope = db.get_subspace(name=ident)
if m[2] == 'u' and not scope.owner:
return 51, 'Not found'
if not scope:
return 51, 'Not found'
else:
scope = None
page_index = max(0, int(m[5])) if m[5] else 0
if req.query is None:
return 10, f'Search {"in " + scope.title() if scope else session.bubble.site_name}:'
search_url = ('/' if not scope else f'/{scope.title()}/') + 'search'
terms = split_terms(clean_query(req))
if scope:
page = f'# Search in {scope.title()}\n'
else:
page = '# Search\n'
page += f'=> {search_url} 🔍 New search\n'
if scope:
page += f'=> /{scope.title()} Back to {scope.title()}\n'
else:
page += '=> / 🌒 Back to front page\n'
if terms:
page += '\n## ' + ' '.join(terms) + '\n'
# Perform the search.
search = Search(db)
count = search.run(terms, scope, limit=LIMIT, page_index=page_index)
# TODO: Just counting the matches without returning anything might be
# a useful addition in `model.Search`.
#page += f'Found {count} match{plural_s(count, "es")}.\n'
if page_index > 0:
page += f'\n=> {search_url}/{page_index - 1}?{req.query} Previous page\n'
if count == 0:
page += 'Found nothing matching the search terms.\n' if page_index == 0 else \
'No more results.\n'
for result in search.results:
page += '\n'
#ts = result[0]
obj = result[1]
if isinstance(obj, User):
page += f'=> /u/{obj.name} {obj.avatar} u/{obj.name}\n'
if obj.info:
page += f'{clean_title(strip_links(obj.info))[:300].strip()}\n'
elif isinstance(obj, Subspace):
page += f'=> /s/{obj.name} s/{obj.name}\n'
if obj.info:
page += f'{clean_title(strip_links(obj.info))[:300].strip()}\n'
elif isinstance(obj, Post):
ctx = ("u/" if obj.sub_owner else "s/") + obj.sub_name
kind = "Comment" if obj.parent else f"Issue #{obj.issueid}" if obj.issueid else "Post"
title = f' "{shorten_text(obj.title, 30)}"' if obj.title else ''
scope_desc = f"in {ctx} " if not scope and not obj.sub_owner else ""
2023-06-23 15:11:28 +00:00
page += f'=> /{ctx}/{obj.issueid if obj.issueid else obj.id} {kind}{title} {scope_desc}by {obj.poster_avatar} {obj.poster_name} on {obj.ymd_date(tz=session.tz)} {" · " if obj.tags else ""}{obj.tags}\n'
SEGTYPES = ['content', 'URL', 'image', 'attachment', 'poll option']
if result[2] != Segment.TEXT:
page += f'(matching {SEGTYPES[result[2]]}) '
page += obj.summary.replace('\n', ' ').replace('=>', ' ').strip() + '\n'
if count >= LIMIT:
page += f'\n=> {search_url}/{page_index + 1}?{req.query} Next page\nPage {page_index + 1}\n'
return page
2023-05-09 18:33:07 +00:00
def listed_items(items):
if len(items) == 0:
return ''
if len(items) == 1:
return items[0]
return ', '.join(items[0:-1]) + ' and ' + items[-1]
def make_timestamp(ts, fmt="%Y-%m-%d at %H:%M"):
return datetime.datetime.fromtimestamp(ts, UTC).strftime(fmt)
class GempubArchive:
class Entry:
def __init__(self, post, label, page, file=None):
self.ts = post.ts_created
self.dt = datetime.datetime.fromtimestamp(self.ts, UTC)
self.post_id = post.id
self.issueid = post.issueid
self.title = post.title
self.subspace_id = post.subspace
self.user_id = post.user
self.label = label
self.page = page
self.file = file
self.tags = post.tags
self.num_cmts = post.num_cmts
self.num_likes = post.num_likes
self.referenced_from_posts = []
def ymd(self):
return self.dt.strftime('%Y-%m-%d')
def path(self):
if self.file:
pos = self.file.segment_url.rfind('/') + 1
return f'file{self.file.id}_{self.file.segment_url[pos:]}'
fn = re.sub(r'[^\w\d-]', '', self.title.replace(' ', '-')).lower().strip() # clean it up
if len(fn) > 0:
fn = '_' + fn
#if len(fn) == 0:
# fn = f'{self.dt.day}_post{self.post_id}.gmi'
return f'{self.dt.year:04d}-{self.dt.month:02d}/{self.post_id}{fn}.gmi'
def __init__(self, session, user=None, subspace=None, month_range=None):
self.session = session
self.db = session.db
self.ts_range = None
if month_range:
year, month = month_range
end_month = month + 1 if month < 12 else 1
end_year = year if month < 12 else year + 1
self.ts_range = (
datetime.datetime(year, month, 1, 0, 0, 0, tzinfo=UTC).timestamp(),
datetime.datetime(end_year, end_month, 1, 0, 0, 0, tzinfo=UTC).timestamp()
)
self.user = user
self.subspace = subspace
self.is_user = self.ts_range is None and subspace.owner != 0
assert self.is_user and self.user or not self.is_user and not self.user
assert self.ts_range or self.subspace is not None
# Modify settion so rendered pages appear to be not logged in.
session.is_archive = True
session.user = None
2023-05-13 11:55:30 +00:00
self.site_link = session.server_root()
if month_range:
archive_title = f'{datetime.datetime(year, month, 1).strftime("%B %Y")}'
archive_description = f'All posts and comments made on {session.bubble.site_name}. '
else:
archive_title = f'{"s/" if not self.is_user else ""}{subspace.name} on {session.bubble.site_name}'
archive_description = \
(f'All posts and comments made in the subspace {subspace.title()} on {session.bubble.site_name}. ' if not self.is_user else f'All posts and comments made by {user.name} on {session.bubble.site_name}. ')
self.metadata = {
'gpubVersion': '1.0.0',
'title': archive_title,
'description': archive_description,
'author': f'Bubble v{session.bubble.version}',
'publishDate': time.strftime('%Y-%m-%d'),
'index': 'index.gmi'
}
self.local_entries = [] # posts in the archive's subspace
self.foreign_entries = [] # posts in other subspaces
self.subspace_entries = {} # subspace name => list of entries
self.comment_entries = [] # posts where user has commented
self.file_entries = [] # files
self.entry_index = {} # indexed by post ID
self.file_index = {} # indexed by file ID
self.referenced_users = {} # info about posters
self.total_count = [0, 0]
self.subspace_count = {} # [posts, comments]
self.subspaces = {}
self.users = {}
if self.is_user:
self.users[self.user.id] = user
self.add_user_page(self.user)
def user_page(self, user):
src = f'# {user.avatar} {user.name}\n'
if user.info:
src += user.info + '\n'
if user.url:
src += f'=> {user.url}\n'
src += f'\n\n=> {self.site_link}/u/{user.name} {user.name} on {self.session.bubble.site_name}\n'
src += 'The account was created on ' + \
make_timestamp(user.ts_created, '%Y-%m-%d') + '.\n'
return src
def get_subspace(self, id):
if id not in self.subspaces:
self.subspaces[id] = self.db.get_subspace(id=id)
return self.subspaces[id]
def get_user(self, id):
if id not in self.users:
self.users[id] = self.db.get_user(id=id)
return self.users[id]
def add_user_page(self, user):
if not user.name in self.referenced_users:
self.referenced_users[user.name] = (user, self.user_page(user))
def add_post_entry(self, post, is_comment=False):
from feeds import make_post_page
self.add_user_page(self.get_user(post.user))
# Modify session according to the post's subspace.
self.session.context = self.get_subspace(post.subspace)
self.session.is_context_tracker = (self.session.context.flags & Subspace.ISSUE_TRACKER) != 0
is_local = (post.subspace == self.subspace.id) if self.subspace else False
if not self.ts_range:
where = self.session.context.title() if not is_local and (
not self.is_user or is_comment) else None
label_sub = ' · ' + where if where else ''
page = make_post_page(self.session, post)
if self.ts_range:
label = shorten_text(clean_title(strip_links(post.summary)), 150)
else:
label = (post.title if post.title else shorten_text(clean_title(strip_links(post.summary)), 100)) + label_sub
entry = GempubArchive.Entry(post, label, page)
# Check for referenced users.
for username in re.findall(r'=> /u/([\w-]+)\s', page):
ref = self.db.get_user(name=username)
if ref:
self.add_user_page(ref)
if is_comment:
self.comment_entries.append(entry)
elif is_local:
self.local_entries.append(entry)
else:
self.foreign_entries.append(entry)
skey = self.session.context.name
if skey in self.subspace_entries:
self.subspace_entries[skey].append(entry)
else:
self.subspace_entries[skey] = [entry]
if not post.id in self.entry_index:
if not is_comment:
self.add_count(post.subspace,
(1, self.db.count_posts(parent_id=post.id, draft=False)))
self.entry_index[post.id] = entry
def add_count(self, subspace_id, count):
self.total_count[0] += count[0]
self.total_count[1] += count[1]
if not subspace_id in self.subspace_count:
self.subspace_count[subspace_id] = [count[0], count[1]]
else:
self.subspace_count[subspace_id][0] += count[0]
self.subspace_count[subspace_id][1] += count[1]
def render_post_entries(self):
db = self.db
# Entries for the user/subspace posts.
if self.is_user:
posts = db.get_posts(user=self.user, comment=False, draft=False)
elif self.ts_range:
posts = db.get_posts(ts_range=self.ts_range, comment=False, draft=False,
sort_descending=False)
else:
posts = db.get_posts(subspace=self.subspace, comment=False, draft=False)
for post in posts:
self.add_post_entry(post)
if self.is_user:
# Make entries for posts where user has commented in.
# TODO: Add a proper database query for this.
commented_in = set()
for cmt in db.get_posts(user=self.user, comment=True, draft=False,
sort_descending=False):
commented_in.add(cmt.parent)
for post in [db.get_post(id=post_id) for post_id in commented_in]:
if post and post.user != self.user.id:
self.add_post_entry(post, is_comment=True)
def render_file_entries(self):
db = self.db
for file in db.get_user_files(self.user) if self.user \
else db.get_subspace_files(self.subspace) if self.subspace \
else db.get_time_files(self.ts_range):
post = db.get_post(id=file.segment_post)
filesize = len(file.data)
entry = GempubArchive.Entry(post,
file.segment_label + f' [{filesize / 1024:.1f} KB, {file.mimetype}]',
file.data,
file)
self.file_entries.append(entry)
self.file_index[file.id] = entry
def rewrite_internal_urls(self, entry: Entry):
src = entry.page
src_post_id = entry.post_id
user_pattern = re.compile(r'^=>\s*/u/([\w%-]+)\s')
if self.subspace:
post_pattern = re.compile(r'^=>\s*/([us])/' + self.subspace.name + r'/(\d+)\s')
else:
post_pattern = re.compile(r'^=>\s*/([us])/[\w%-]+/(\d+)\s')
file_pattern = re.compile(r'^=>\s*/([us])/[\w%-]+/(image|file)/(\d+)[^ ]*\s')
root_pattern = re.compile(r'^=>\s*/([^ ]*)\s')
rewritten = []
for line in src.split('\n'):
m = user_pattern.search(line)
if m:
line = f'=> ../../users/{urlparse.unquote(m[1])}.gmi ' + line[m.end():]
rewritten.append(line)
continue
m = post_pattern.search(line)
if m:
post_id = int(m[2])
if post_id in self.entry_index:
line = f'=> ../../posts/{self.entry_index[post_id].path()} ' + line[m.end():]
rewritten.append(line)
continue
m = file_pattern.search(line)
if m:
file_id = int(m[3])
if file_id in self.file_index:
entry = self.file_index[file_id]
line = f'=> ../../files/{entry.path()} ' + line[m.end():]
rewritten.append(line)
entry.referenced_from_posts.append(src_post_id)
continue
m = root_pattern.search(line)
if m:
2023-05-13 11:55:30 +00:00
line = f'=> {self.session.server_root()}/{m[1]} ' + line[m.end():]
rewritten.append(line)
continue
rewritten.append(line)
return '\n'.join(rewritten)
def compress(self):
# Create the ZIP archive.
buffer = io.BytesIO()
zip = zipfile.ZipFile(buffer, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=9)
def counter_text(count):
parts = []
if count[0]:
parts.append(f'{count[0]} post{plural_s(count[0])}')
if count[1]:
parts.append(f'{count[1]} comment{plural_s(count[1])}')
return ' and '.join(parts)
with zip.open('metadata.txt', 'w') as f:
for entry in self.metadata:
f.write(f"{entry}: {self.metadata[entry]}\n".encode('utf-8'))
with zip.open('title.gmi', 'w') as f:
f.write(f"""
# {self.user.name if self.is_user else self.subspace.name if self.subspace else self.metadata['title']}
## Gempub Archive
{self.metadata['description']}
Exported on {self.metadata['publishDate']}.
""".encode('utf-8'))
# Information about the user/subspace.
if self.is_user:
index_page = f'# {self.user.avatar} {self.user.name}\n\nTable of Contents:\n'
index_page += '\n=> title.gmi Title page\n'
profile_path = 'users/' + self.user.name + '.gmi'
index_page += f'=> {profile_path} {self.user.avatar} {self.user.name}\n'
elif self.subspace:
index_page = f'# s/{self.subspace.name}\n\nTable of Contents:\n'
index_page += '\n=> title.gmi Title page\n'
profile_path = self.subspace.name + '.gmi'
index_page += f'=> {profile_path} {self.subspace.name}\n'
with zip.open(profile_path, 'w') as f:
src = f'# {self.subspace.title()}\n'
if self.subspace.info:
src += self.subspace.info + '\n'
if self.subspace.url:
src += f'=> {self.subspace.url}\n'
src += '\nThe subspace was created on ' + \
make_timestamp(self.subspace.ts_created, '%Y-%m-%d') + '.\n'
f.write(src.encode('utf-8'))
else:
index_page = '# ' + self.metadata['title'] + '\n\nTable of Contents:\n\n'
if self.local_entries:
index_page += f'\n=> posts/index.gmi Posts in {self.subspace.title()}\n'
local_index_page = f'# Posts in {self.subspace.title()}\n\n'
for entry in self.local_entries:
entry_path = 'posts/' + entry.path()
local_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
with zip.open(entry_path, 'w') as content:
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
with zip.open('posts/index.gmi', 'w') as content:
content.write(local_index_page.encode('utf-8'))
if self.ts_range:
sub_links = []
for sub_name in sorted(self.subspace_entries.keys(), key=str.lower):
first_entry = self.subspace_entries[sub_name][0]
sub = self.get_subspace(first_entry.subspace_id)
entry_path = f'{sub.title()[0]}_{sub.name}.gmi'
sub_links.append(f'=> {entry_path} {sub.title()}\n')
title_icon = ''
if sub.owner:
title_icon = f'{self.get_user(first_entry.user_id).avatar} '
sub_page = f'# {title_icon}{sub.title()}\n'
sub_page += f'{counter_text(self.subspace_count[sub.id])} in this subspace.\n'
for entry in self.subspace_entries[sub_name]:
entry_user = self.get_user(entry.user_id)
author = f'{entry_user.avatar} {entry_user.name}'
meta = []
top = None
if entry.issueid:
top = f'[#{entry.issueid}] {entry.title}'
meta.append(author)
if entry.tags:
top += f' · {entry.tags}'
elif not sub.owner:
meta.append(author)
meta.append(entry.dt.strftime('%Y-%m-%d %H:%M'))
if entry.num_cmts > 0:
meta.append(f'{entry.num_cmts} comment{plural_s(entry.num_cmts)}')
if entry.num_likes > 0:
meta.append(f'{entry.num_likes} like{plural_s(entry.num_likes)}')
if entry.tags and not entry.issueid:
meta.append(entry.tags)
link = f'=> posts/{entry.path()}'
if top:
sub_page += f'\n{link} {top}\n{entry.label}\n{" · ".join(meta)}\n'
else:
sub_page += f'\n{entry.label}\n{link} {" · ".join(meta)}\n'
# Write to the archive.
with zip.open('posts/' + entry.path(), 'w') as content:
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
with zip.open(entry_path, 'w') as content:
content.write(sub_page.encode('utf-8'))
prev_type = None
for link in sorted(sub_links, key=str.lower):
if prev_type and prev_type != link[3]:
index_page += '\n'
index_page += link
prev_type = link[3] # u or s
index_page += '\n'
elif self.foreign_entries:
index_page += f'=> other/index.gmi Posts in Other Subspaces\n'
foreign_index_page = '# Posts in Other Subspaces\n'
last_sub = None
for entry in sorted(self.foreign_entries,
key=lambda e: self.get_subspace(e.subspace_id).name.lower()):
entry_sub = self.get_subspace(entry.subspace_id)
if entry_sub != last_sub:
foreign_index_page += f'\n## {entry_sub.name}\n'
last_sub = entry_sub
entry_path = 'other/' + entry.path()
foreign_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
with zip.open(entry_path, 'w') as content:
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
with zip.open('other/index.gmi', 'w') as content:
content.write(foreign_index_page.encode('utf-8'))
if self.comment_entries:
index_page += f'=> comments/index.gmi Commented Posts\n'
comment_index_page = '# Commented Posts\n'
for entry in self.comment_entries:
entry_path = 'comments/' + entry.path()
comment_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
with zip.open(entry_path, 'w') as content:
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
with zip.open('comments/index.gmi', 'w') as content:
content.write(comment_index_page.encode('utf-8'))
if self.file_entries:
index_page += '=> files/index.gmi File attachments\n'
file_index_page = '# File Attachments\n'
for entry in self.file_entries:
entry_path = 'files/' + entry.path()
file_index_page += f'\n=> {entry.path()} {entry.ymd()} {entry.label}\n'
# List of posts that link to this file.
for ref in entry.referenced_from_posts:
ref_entry = self.entry_index[ref]
file_index_page += f'=> ../posts/{ref_entry.path()} Referenced in: "{ref_entry.label}"\n'
with zip.open(entry_path, 'w') as content:
content.write(entry.page)
with zip.open('files/index.gmi', 'w') as content:
content.write(file_index_page.encode('utf-8'))
index_page += '=> users/index.gmi Users\n'
users_index_page = '# Users\n\nPosts and comments in this archive reference these users:\n\n'
# Sort users case insensitively.
for ref, (user, profile_text) in \
sorted(self.referenced_users.items(), key=lambda u: u[0].lower()):
users_index_page += f'=> {ref}.gmi {user.avatar} {ref}\n'
with zip.open('users/' + ref + '.gmi', 'w') as f:
f.write(profile_text.encode('utf-8'))
with zip.open('users/index.gmi', 'w') as f:
f.write(users_index_page.encode('utf-8'))
index_page += f'\n=> about/bubble.gmi 💬 About Bubble\n'
with zip.open('about/bubble.gmi', 'w') as f:
f.write(self.session.ABOUT.encode('utf-8'))
with zip.open('index.gmi', 'w') as f:
f.write(index_page.encode('utf-8'))
zip.close()
return buffer.getvalue()
def export_gempub_archive(session):
req = session.req
db = session.db
user = session.user
if not user:
return 60, 'Login required'
# Determine subspace to export.
m = re.search(r'/export/(s/|month/)?([\w%-]+)\.gpub$', req.path)
if not m or not m[2]:
return 59, 'Bad request'
name = urlparse.unquote(m[2])
if m[1] == 'month/':
month_range = map(int, m[2].split('-'))
subspace = None
else:
month_range = None
subspace = db.get_subspace(name=name)
is_user = m[1] is None
# Check access rights. At the moment, exporting is only possible via user
# settings and subspace admin pages, so the user must have moderation
# rights in the exported subspace.
if month_range:
if not user:
# Have to be logged in.
return 61, 'Not authorized'
elif is_user:
if subspace.owner != user.id:
return 61, 'Not authorized'
else:
if user.id not in map(lambda u: u.id, db.get_mods(subspace)):
return 61, 'Not authorized'
archive = GempubArchive(session, user if is_user else None, subspace, month_range)
archive.render_post_entries()
archive.render_file_entries()
data = archive.compress()
return 20, 'application/gpub+zip', data