bubble/subspace.py

960 lines
38 KiB
Python

import datetime
import io
import time
import urllib.parse as urlparse
import zipfile
from model import Search, Segment, User, Subspace, Post
from utils import *
def make_subspace(session):
if not session.user:
return 60, "Login required"
if session.user.role == User.LIMITED:
return 61, "Not authorized"
if not session.bubble.user_subspaces and session.user.role == User.BASIC:
return 61, "Not authorized"
db = session.db
req = session.req
if is_empty_query(req):
return 10, "Enter name for new subspace: " + session.NAME_HINT
name = clean_query(req)
if not is_valid_name(name):
return 10, "That is an invalid subspace name. " + session.NAME_HINT
try:
db.create_subspace(name, session.user.id)
return 30, f'/s/{name}/admin'
except:
return 10, "That subspace already exists. " + session.NAME_HINT
def make_subspaces_page(session):
user = session.user
req = session.req
db = session.db
LABELS = {
'active': 'Activity',
'alpha': 'Summary',
'chrono': 'Chronological Summary',
'name': 'Index'
}
LOCK = ' 🔒'
ACTIVE_AGE = 60 * 24 * 3600
ARCHIVAL_MONTHS = 12
ARCHIVAL_AGE = ARCHIVAL_MONTHS * 30 * 24 * 3600
arg = clean_query(req)
if arg == 'mode':
page = "Choose view mode for the subspace index:\n\n"
for key, label in LABELS.items():
page += f"=> /s/?{key} {label}\n"
return page
page = '# Subspaces\n'
if user:
page += session.dashboard_link()
if user.role != User.LIMITED and (
session.bubble.user_subspaces or user.role == User.ADMIN):
page += f'=> /new-subspace 🌒 New subspace\n'
page += '=> ?mode Change view mode\n'
page += '=> / Back to front page\n\n'
# View mode is determined by the query string.
if arg in LABELS:
view_mode = arg
else:
view_mode = 'name'
subs = db.get_subspaces(owner=0)
locked_subs = db.get_subspaces(locked=True)
locked_ids = set([sub.id for sub in locked_subs])
def sub_link(sub):
lock_icon = LOCK if sub.id in locked_ids else ''
return f'=> /{sub.title()} {sub.title()}{lock_icon}\n'
def sub_stats(sub):
stats = []
if sub.num_posts:
kind = 'issue' if sub.flags & Subspace.ISSUE_TRACKER else 'post'
stats.append(f"{sub.num_posts} {kind}{plural_s(sub.num_posts)}")
if sub.num_cmts:
stats.append(f"{sub.num_cmts} comment{plural_s(sub.num_cmts)}")
if sub.num_people > 1:
stats.append(f"{sub.num_people} people")
return ' · '.join(stats) + "\n"
def sub_latest_post(sub):
latest = db.get_post(id=sub.latest_post_id) if sub.latest_post_id else None
if latest:
title = latest.quoted_title()
age = latest.age(tz=session.tz)
return f"{title} by {latest.poster_avatar} {latest.poster_name} · {age}\n"
return ''
if len(subs) == 0:
page += 'No subspaces.\n'
elif view_mode == 'name':
page += f'## {LABELS[view_mode]}\n\n'
for sub in subs:
page += sub_link(sub)
elif view_mode == 'alpha' or view_mode == 'chrono':
# Basic alphabetical index.
page += f'## {LABELS[view_mode]}\n'
if view_mode == 'chrono':
page += '\nSubspaces with recent activity are listed first.\n'
subs = sorted(subs, key=lambda s: s.ts_active, reverse=True)
for sub in subs:
page += '\n' + sub_link(sub)
if sub.num_posts > 0:
kind = 'issue' if sub.flags & Subspace.ISSUE_TRACKER else 'post'
page += f"{sub.num_posts} {kind}{plural_s(sub.num_posts)} · {ago_text(sub.ts_active, tz=session.tz)}\n"
else:
page += 'Empty\n'
else:
# Divide into Active, Dormant, Empty, Pending Archival.
active = []
dormant = []
empty = []
pending = []
now_time = time.time()
for sub in sorted(subs, key=lambda s: s.ts_active, reverse=True):
since_active = now_time - sub.ts_active
if since_active > ARCHIVAL_AGE:
pending.append(sub)
elif not sub.latest_post_id:
empty.append(sub)
elif since_active > ACTIVE_AGE:
dormant.append(sub)
else:
active.append(sub)
if len(active):
page += f'## {len(active)} Active\n'
for sub in active:
page += '\n' + sub_link(sub)
page += sub_stats(sub)
page += sub_latest_post(sub)
if len(dormant):
page += f'\n## {len(dormant)} Inactive 😴\n'
for sub in dormant:
page += '\n' + sub_link(sub)
page += sub_stats(sub)
page += sub_latest_post(sub)
if len(empty):
page += f'\n## {len(empty)} Empty\n\n'
for sub in empty:
page += sub_link(sub)
if len(pending):
page += f'\n## {len(pending)} Pending Archival 🪦\n'
page += f'No activity in the last {ARCHIVAL_MONTHS} months:\n'
for sub in pending:
page += '\n' + sub_link(sub)
page += sub_stats(sub)
return page
def subspace_admin_actions(session, action):
if not session.is_user_mod:
return 61, "Moderator rights required"
user = session.user
req = session.req
db = session.db
subspace = session.context
admin_link = f'/{subspace.title()}/admin'
is_subspace_deletable = subspace.owner == 0 and (user.role == User.ADMIN or
db.is_empty_subspace(subspace))
page = f'# {subspace.title()}: Administration\n'
if user.role == User.ADMIN:
m = re.search(r'/repo/(new|delete|clone-url|view-url|idlabel)?(/([0-9a-zA-Z]{10}))?$', req.path)
if m:
if m[1] == 'new':
db.create_repository(subspace)
return 30, f'{admin_link}/repo/'
elif m[1] == 'delete' and m[3]:
if not db.verify_token(user, m[3]):
return 61, 'Not authorized'
if is_empty_query(req):
return 10, 'Really delete repository and commit history? (DELETE to confirm)'
if req.query == 'DELETE':
db.destroy_repository(db.get_repository(subspace=subspace))
return 30, f'{admin_link}/repo/'
elif m[1] == 'clone-url':
if req.query is None:
return 10, 'HTTPS URL where to clone repository:'
db.update_repository(db.get_repository(subspace=subspace),
clone_url=clean_query(req))
return 30, f'{admin_link}/repo/'
elif m[1] == 'view-url':
if req.query is None:
return 10, 'Gemini URL for viewing commits:'
db.update_repository(db.get_repository(subspace=subspace),
view_url=clean_query(req))
return 30, f'{admin_link}/repo/'
elif m[1] == 'idlabel':
if req.query is None:
return 10, 'Label for marking issue IDs in commit messages: (For example, "IssueID")'
db.update_repository(db.get_repository(subspace=subspace),
idlabel=clean_query(req))
return 30, f'{admin_link}/repo/'
page += f'=> {admin_link} Go back\n'
page += '\n## Git Repository\n'
repo = db.get_repository(subspace=subspace)
if not repo:
page += '=> {admin_link}/repo/new 🛢️ New repository\n'
return page
page += f'\n=> {admin_link}/repo/clone-url Clone HTTPS URL: {repo.clone_url if repo.clone_url else "(not set)"}\n'
page += f'=> {admin_link}/repo/idlabel Issue ID label: {repo.idlabel if repo.idlabel else ""}\n'
page += f'\n=> {admin_link}/repo/view-url Commit view Gemini URL: {repo.view_url if repo.view_url else "(not set)"}\n'
# Status information.
page += '\n### Status\n'
if repo.ts_fetch is None:
if not repo.clone_url:
page += 'Not configured.\n'
else:
page += 'Repository will be fetched soon.\n'
else:
n = db.count_commits(repo)
page += f'{n} commits in history. Repository was last fetched on {datetime.datetime.fromtimestamp(repo.ts_fetch, UTC).strftime("%Y-%m-%d at %H:%M:%S %Z")}.\n'
page += f'\n=> {admin_link}/repo/delete/{db.get_token(user)} ❌ Delete repository\n'
return page
if action == 'info':
if req.query == None:
return 10, f"Description for {session.context.title()}:"
db.update_subspace(session.context, info=clean_description(clean_query(req)), actor_id=user.id)
return 30, admin_link
if action == 'url':
if req.query == None:
return 10, f"Featured link for {session.context.title()}: (URL and label)"
try:
link = form_link(parse_link_segment_query(req))
except:
link = ''
db.update_subspace(session.context, url=link, actor_id=user.id)
return 30, admin_link
if action == 'omit-all':
if session.context.flags & Subspace.HIDE_OMIT_SETTING_FLAG:
return 61, 'Not authorized'
db.update_subspace(session.context, flags=session.context.flags ^ Subspace.OMIT_FROM_ALL_FLAG)
return 30, admin_link
# Actions that require link verification.
m = re.search(r'/(delete|tracker|rename|add-mod|remove-mod)/([0-9a-zA-Z]{10})$', req.path)
if m:
token = m[2]
if not db.verify_token(session.user, token):
return 61, 'Not authorized'
if m[1] == 'add-mod':
if is_empty_query(req):
return 10, 'Enter user to add as moderator:'
adding = db.get_user(name=clean_query(req))
if not adding:
return 51, 'Not found'
db.modify_mods(session.context, actor=session.user, add=adding)
return 30, admin_link
if m[1] == 'remove-mod':
if is_empty_query(req):
return 10, 'Enter user to remove as moderator:'
removing = db.get_user(name=clean_query(req))
if not removing:
return 51, 'Not found'
db.modify_mods(session.context, actor=session.user, remove=removing)
return 30, admin_link
if m[1] == 'rename':
prompt = f'Enter new name for {session.context.name}? (Warning: Links to subspace will break!)'
if is_empty_query(req):
return 10, prompt
new_name = clean_query(req)
if not is_valid_name(new_name):
return 10, prompt
try:
db.update_subspace(session.context, name=new_name, actor_id=user.id)
except:
return 10, prompt
return 30, f'/s/{new_name}/admin'
if m[1] == 'delete' and is_subspace_deletable:
if is_empty_query(req):
return 10, f'Really delete {session.context.title()}? (Enter DELETE to confirm.)'
if req.query == 'DELETE':
db.destroy_subspace(subspace)
return 30, '/dashboard'
return 30, admin_link
elif m[1] == 'tracker' and is_subspace_deletable:
new_flags = session.context.flags ^ Subspace.ISSUE_TRACKER
if new_flags & Subspace.ISSUE_TRACKER:
# Issues shouldn't be listed in All Posts.
new_flags = new_flags | Subspace.OMIT_FROM_ALL_FLAG
db.update_subspace(session.context, flags=new_flags)
return 30, admin_link
page += session.context.subspace_link()
if not session.context.flags & (Subspace.ISSUE_TRACKER | Subspace.HIDE_OMIT_SETTING_FLAG):
page += f'\n=> {admin_link}/omit-all {session.CHECKS[session.context.flags & Subspace.OMIT_FROM_ALL_FLAG]} Omit from All Posts\n'
page += '\n## About\n'
page += '\n### Description\n'
page += (session.context.info if session.context.info else '(no description)') + '\n'
page += f'=> {admin_link}/info ✏️ Edit\n'
page += '\n### Featured Link\n'
page += (f'=> {session.context.url}' if session.context.url else '(no featured link)') + '\n'
page += f'=> {admin_link}/url ✏️ Edit\n'
page += '\n## Moderators\n\n'
mods = db.get_mods(session.context)
for mod in mods:
page += f'=> /u/{mod.name} {mod.avatar} {mod.name}\n'
page += f'=> {admin_link}/add-mod/{session.get_token()} Add moderator\n'
if len(mods) > (0 if user.role == User.ADMIN else 1):
page += f'=> {admin_link}/remove-mod/{session.get_token()} Remove moderator\n'
if is_subspace_deletable:
page += '\n## Issue Tracking\n'
page += f'\n=> {admin_link}/tracker/{db.get_token(session.user)} {session.CHECKS[nonzero(session.context.flags & Subspace.ISSUE_TRACKER)]} Subspace is an issue tracker\n'
page += 'Posts in an issue tracker are designated issue IDs and have an Open/Closed status. Issues may refer to Git repository commits via hash, and commit messages can refer to issues by ID. Non-issue posts are not allowed in an issue tracker subspace.\n'
if session.user.role == User.ADMIN:
page += f'\n=> {admin_link}/repo/ ⚙️ Git repository settings\n'
page += '\n## Actions\n'
page += '\n=> /export/' + session.context.title() + '.gpub 📤 Export data archive\n'
page += f'Download a ZIP archive containing all posts and comments in {session.context.title()}. The archive has Gempub metadata so it can also be viewed in a Gempub reader.\n'
page += f'\n=> {admin_link}/rename/{session.get_token()} Rename subspace\n'
page += 'Links pointing to the subspace will break when the name is changed.\n'
if is_subspace_deletable:
page += f'\n=> {admin_link}/delete/{session.get_token()} ⚠️ Delete subspace {session.context.title()}\n'
if not db.is_empty_subspace(subspace):
page += 'All posts and comments in the subspace will be deleted. Exporting a backup beforehand is recommended.\n'
else:
page += 'There are no posts in the subspace.\n'
return page
def split_terms(text):
import shlex
return list(filter(lambda t: len(t) >= 2, map(str.strip,
shlex.split(text.replace("'", "\\'")))))
def make_search_page(session):
req = session.req
db = session.db
user = session.user
LIMIT = 30
m = re.match(r'(/([us])/([\w%-]+))?/search(/(\d+))?', req.path)
if not m:
return 59, 'Bad request'
if m[2] or m[3]:
ident = urlparse.unquote(m[3])
scope = db.get_subspace(name=ident)
if m[2] == 'u' and not scope.owner:
return 51, 'Not found'
if not scope:
return 51, 'Not found'
else:
scope = None
page_index = max(0, int(m[5])) if m[5] else 0
if req.query is None:
return 10, f'Search {"in " + scope.title() if scope else session.bubble.site_name}:'
search_url = ('/' if not scope else f'/{scope.title()}/') + 'search'
terms = split_terms(clean_query(req))
if scope:
page = f'# Search in {scope.title()}\n'
else:
page = '# Search\n'
page += f'=> {search_url} 🔍 New search\n'
if scope:
page += f'=> /{scope.title()} Back to {scope.title()}\n'
else:
page += '=> / 🌒 Back to front page\n'
if terms:
page += '\n## ' + ' '.join(terms) + '\n'
# Perform the search.
search = Search(db)
count = search.run(terms, scope, limit=LIMIT, page_index=page_index)
# TODO: Just counting the matches without returning anything might be
# a useful addition in `model.Search`.
#page += f'Found {count} match{plural_s(count, "es")}.\n'
if page_index > 0:
page += f'\n=> {search_url}/{page_index - 1}?{req.query} Previous page\n'
if count == 0:
page += 'Found nothing matching the search terms.\n' if page_index == 0 else \
'No more results.\n'
for result in search.results:
page += '\n'
#ts = result[0]
obj = result[1]
if isinstance(obj, User):
page += f'=> /u/{obj.name} {obj.avatar} u/{obj.name}\n'
if obj.info:
page += f'{clean_title(strip_links(obj.info))[:300].strip()}\n'
elif isinstance(obj, Subspace):
page += f'=> /s/{obj.name} s/{obj.name}\n'
if obj.info:
page += f'{clean_title(strip_links(obj.info))[:300].strip()}\n'
elif isinstance(obj, Post):
ctx = ("u/" if obj.sub_owner else "s/") + obj.sub_name
kind = "Comment" if obj.parent else f"Issue #{obj.issueid}" if obj.issueid else "Post"
title = f' "{shorten_text(obj.title, 30)}"' if obj.title else ''
scope_desc = f"in {ctx} " if not scope and not obj.sub_owner else ""
page += f'=> /{ctx}/{obj.issueid if obj.issueid else obj.id} {kind}{title} {scope_desc}by {obj.poster_avatar} {obj.poster_name} on {obj.ymd_date(tz=session.tz)} {" · " if obj.tags else ""}{obj.tags}\n'
SEGTYPES = ['content', 'URL', 'image', 'attachment', 'poll option']
if result[2] != Segment.TEXT:
page += f'(matching {SEGTYPES[result[2]]}) '
page += obj.summary.replace('\n', ' ').replace('=>', ' ').strip() + '\n'
if count >= LIMIT:
page += f'\n=> {search_url}/{page_index + 1}?{req.query} Next page\nPage {page_index + 1}\n'
return page
def listed_items(items):
if len(items) == 0:
return ''
if len(items) == 1:
return items[0]
return ', '.join(items[0:-1]) + ' and ' + items[-1]
def make_timestamp(ts, fmt="%Y-%m-%d at %H:%M"):
return datetime.datetime.fromtimestamp(ts, UTC).strftime(fmt)
class GempubArchive:
class Entry:
def __init__(self, post, label, page, file=None):
self.ts = post.ts_created
self.dt = datetime.datetime.fromtimestamp(self.ts, UTC)
self.post_id = post.id
self.issueid = post.issueid
self.title = post.title
self.subspace_id = post.subspace
self.user_id = post.user
self.label = label
self.page = page
self.file = file
self.tags = post.tags
self.num_cmts = post.num_cmts
self.num_likes = post.num_likes
self.referenced_from_posts = []
def ymd(self):
return self.dt.strftime('%Y-%m-%d')
def path(self):
if self.file:
pos = self.file.segment_url.rfind('/') + 1
return f'file{self.file.id}_{self.file.segment_url[pos:]}'
fn = re.sub(r'[^\w\d-]', '', self.title.replace(' ', '-')).lower().strip() # clean it up
if len(fn) > 0:
fn = '_' + fn
#if len(fn) == 0:
# fn = f'{self.dt.day}_post{self.post_id}.gmi'
return f'{self.dt.year:04d}-{self.dt.month:02d}/{self.post_id}{fn}.gmi'
def __init__(self, session, user=None, subspace=None, month_range=None):
self.session = session
self.db = session.db
self.ts_range = None
if month_range:
year, month = month_range
end_month = month + 1 if month < 12 else 1
end_year = year if month < 12 else year + 1
self.ts_range = (
datetime.datetime(year, month, 1, 0, 0, 0, tzinfo=UTC).timestamp(),
datetime.datetime(end_year, end_month, 1, 0, 0, 0, tzinfo=UTC).timestamp()
)
self.user = user
self.subspace = subspace
self.is_user = self.ts_range is None and subspace.owner != 0
assert self.is_user and self.user or not self.is_user and not self.user
assert self.ts_range or self.subspace is not None
# Modify settion so rendered pages appear to be not logged in.
session.is_archive = True
session.user = None
self.site_link = session.server_root()
if month_range:
archive_title = f'{datetime.datetime(year, month, 1).strftime("%B %Y")}'
archive_description = f'All posts and comments made on {session.bubble.site_name}. '
else:
archive_title = f'{"s/" if not self.is_user else ""}{subspace.name} on {session.bubble.site_name}'
archive_description = \
(f'All posts and comments made in the subspace {subspace.title()} on {session.bubble.site_name}. ' if not self.is_user else f'All posts and comments made by {user.name} on {session.bubble.site_name}. ')
self.metadata = {
'gpubVersion': '1.0.0',
'title': archive_title,
'description': archive_description,
'author': f'Bubble v{session.bubble.version}',
'publishDate': time.strftime('%Y-%m-%d'),
'index': 'index.gmi'
}
self.local_entries = [] # posts in the archive's subspace
self.foreign_entries = [] # posts in other subspaces
self.subspace_entries = {} # subspace name => list of entries
self.comment_entries = [] # posts where user has commented
self.file_entries = [] # files
self.entry_index = {} # indexed by post ID
self.file_index = {} # indexed by file ID
self.referenced_users = {} # info about posters
self.total_count = [0, 0]
self.subspace_count = {} # [posts, comments]
self.subspaces = {}
self.users = {}
if self.is_user:
self.users[self.user.id] = user
self.add_user_page(self.user)
def user_page(self, user):
src = f'# {user.avatar} {user.name}\n'
if user.info:
src += user.info + '\n'
if user.url:
src += f'=> {user.url}\n'
src += f'\n\n=> {self.site_link}/u/{user.name} {user.name} on {self.session.bubble.site_name}\n'
src += 'The account was created on ' + \
make_timestamp(user.ts_created, '%Y-%m-%d') + '.\n'
return src
def get_subspace(self, id):
if id not in self.subspaces:
self.subspaces[id] = self.db.get_subspace(id=id)
return self.subspaces[id]
def get_user(self, id):
if id not in self.users:
self.users[id] = self.db.get_user(id=id)
return self.users[id]
def add_user_page(self, user):
if not user.name in self.referenced_users:
self.referenced_users[user.name] = (user, self.user_page(user))
def add_post_entry(self, post, is_comment=False):
from feeds import make_post_page
self.add_user_page(self.get_user(post.user))
# Modify session according to the post's subspace.
self.session.context = self.get_subspace(post.subspace)
self.session.is_context_tracker = (self.session.context.flags & Subspace.ISSUE_TRACKER) != 0
is_local = (post.subspace == self.subspace.id) if self.subspace else False
if not self.ts_range:
where = self.session.context.title() if not is_local and (
not self.is_user or is_comment) else None
label_sub = ' · ' + where if where else ''
page = make_post_page(self.session, post)
if self.ts_range:
label = shorten_text(clean_title(strip_links(post.summary)), 150)
else:
label = (post.title if post.title else shorten_text(clean_title(strip_links(post.summary)), 100)) + label_sub
entry = GempubArchive.Entry(post, label, page)
# Check for referenced users.
for username in re.findall(r'=> /u/([\w-]+)\s', page):
ref = self.db.get_user(name=username)
if ref:
self.add_user_page(ref)
if is_comment:
self.comment_entries.append(entry)
elif is_local:
self.local_entries.append(entry)
else:
self.foreign_entries.append(entry)
skey = self.session.context.name
if skey in self.subspace_entries:
self.subspace_entries[skey].append(entry)
else:
self.subspace_entries[skey] = [entry]
if not post.id in self.entry_index:
if not is_comment:
self.add_count(post.subspace,
(1, self.db.count_posts(parent_id=post.id, draft=False)))
self.entry_index[post.id] = entry
def add_count(self, subspace_id, count):
self.total_count[0] += count[0]
self.total_count[1] += count[1]
if not subspace_id in self.subspace_count:
self.subspace_count[subspace_id] = [count[0], count[1]]
else:
self.subspace_count[subspace_id][0] += count[0]
self.subspace_count[subspace_id][1] += count[1]
def render_post_entries(self):
db = self.db
# Entries for the user/subspace posts.
if self.is_user:
posts = db.get_posts(user=self.user, comment=False, draft=False)
elif self.ts_range:
posts = db.get_posts(ts_range=self.ts_range, comment=False, draft=False,
sort_descending=False)
else:
posts = db.get_posts(subspace=self.subspace, comment=False, draft=False)
for post in posts:
self.add_post_entry(post)
if self.is_user:
# Make entries for posts where user has commented in.
# TODO: Add a proper database query for this.
commented_in = set()
for cmt in db.get_posts(user=self.user, comment=True, draft=False,
sort_descending=False):
commented_in.add(cmt.parent)
for post in [db.get_post(id=post_id) for post_id in commented_in]:
if post and post.user != self.user.id:
self.add_post_entry(post, is_comment=True)
def render_file_entries(self):
db = self.db
for file in db.get_user_files(self.user) if self.user \
else db.get_subspace_files(self.subspace) if self.subspace \
else db.get_time_files(self.ts_range):
post = db.get_post(id=file.segment_post)
filesize = len(file.data)
entry = GempubArchive.Entry(post,
file.segment_label + f' [{filesize / 1024:.1f} KB, {file.mimetype}]',
file.data,
file)
self.file_entries.append(entry)
self.file_index[file.id] = entry
def rewrite_internal_urls(self, entry: Entry):
src = entry.page
src_post_id = entry.post_id
user_pattern = re.compile(r'^=>\s*/u/([\w%-]+)\s')
if self.subspace:
post_pattern = re.compile(r'^=>\s*/([us])/' + self.subspace.name + r'/(\d+)\s')
else:
post_pattern = re.compile(r'^=>\s*/([us])/[\w%-]+/(\d+)\s')
file_pattern = re.compile(r'^=>\s*/([us])/[\w%-]+/(image|file)/(\d+)[^ ]*\s')
root_pattern = re.compile(r'^=>\s*/([^ ]*)\s')
rewritten = []
for line in src.split('\n'):
m = user_pattern.search(line)
if m:
line = f'=> ../../users/{urlparse.unquote(m[1])}.gmi ' + line[m.end():]
rewritten.append(line)
continue
m = post_pattern.search(line)
if m:
post_id = int(m[2])
if post_id in self.entry_index:
line = f'=> ../../posts/{self.entry_index[post_id].path()} ' + line[m.end():]
rewritten.append(line)
continue
m = file_pattern.search(line)
if m:
file_id = int(m[3])
if file_id in self.file_index:
entry = self.file_index[file_id]
line = f'=> ../../files/{entry.path()} ' + line[m.end():]
rewritten.append(line)
entry.referenced_from_posts.append(src_post_id)
continue
m = root_pattern.search(line)
if m:
line = f'=> {self.session.server_root()}/{m[1]} ' + line[m.end():]
rewritten.append(line)
continue
rewritten.append(line)
return '\n'.join(rewritten)
def compress(self):
# Create the ZIP archive.
buffer = io.BytesIO()
zip = zipfile.ZipFile(buffer, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=9)
def counter_text(count):
parts = []
if count[0]:
parts.append(f'{count[0]} post{plural_s(count[0])}')
if count[1]:
parts.append(f'{count[1]} comment{plural_s(count[1])}')
return ' and '.join(parts)
with zip.open('metadata.txt', 'w') as f:
for entry in self.metadata:
f.write(f"{entry}: {self.metadata[entry]}\n".encode('utf-8'))
with zip.open('title.gmi', 'w') as f:
f.write(f"""
# {self.user.name if self.is_user else self.subspace.name if self.subspace else self.metadata['title']}
## Gempub Archive
{self.metadata['description']}
Exported on {self.metadata['publishDate']}.
""".encode('utf-8'))
# Information about the user/subspace.
if self.is_user:
index_page = f'# {self.user.avatar} {self.user.name}\n\nTable of Contents:\n'
index_page += '\n=> title.gmi Title page\n'
profile_path = 'users/' + self.user.name + '.gmi'
index_page += f'=> {profile_path} {self.user.avatar} {self.user.name}\n'
elif self.subspace:
index_page = f'# s/{self.subspace.name}\n\nTable of Contents:\n'
index_page += '\n=> title.gmi Title page\n'
profile_path = self.subspace.name + '.gmi'
index_page += f'=> {profile_path} {self.subspace.name}\n'
with zip.open(profile_path, 'w') as f:
src = f'# {self.subspace.title()}\n'
if self.subspace.info:
src += self.subspace.info + '\n'
if self.subspace.url:
src += f'=> {self.subspace.url}\n'
src += '\nThe subspace was created on ' + \
make_timestamp(self.subspace.ts_created, '%Y-%m-%d') + '.\n'
f.write(src.encode('utf-8'))
else:
index_page = '# ' + self.metadata['title'] + '\n\nTable of Contents:\n\n'
if self.local_entries:
index_page += f'\n=> posts/index.gmi Posts in {self.subspace.title()}\n'
local_index_page = f'# Posts in {self.subspace.title()}\n\n'
for entry in self.local_entries:
entry_path = 'posts/' + entry.path()
local_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
with zip.open(entry_path, 'w') as content:
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
with zip.open('posts/index.gmi', 'w') as content:
content.write(local_index_page.encode('utf-8'))
if self.ts_range:
sub_links = []
for sub_name in sorted(self.subspace_entries.keys(), key=str.lower):
first_entry = self.subspace_entries[sub_name][0]
sub = self.get_subspace(first_entry.subspace_id)
entry_path = f'{sub.title()[0]}_{sub.name}.gmi'
sub_links.append(f'=> {entry_path} {sub.title()}\n')
title_icon = ''
if sub.owner:
title_icon = f'{self.get_user(first_entry.user_id).avatar} '
sub_page = f'# {title_icon}{sub.title()}\n'
sub_page += f'{counter_text(self.subspace_count[sub.id])} in this subspace.\n'
for entry in self.subspace_entries[sub_name]:
entry_user = self.get_user(entry.user_id)
author = f'{entry_user.avatar} {entry_user.name}'
meta = []
top = None
if entry.issueid:
top = f'[#{entry.issueid}] {entry.title}'
meta.append(author)
if entry.tags:
top += f' · {entry.tags}'
elif not sub.owner:
meta.append(author)
meta.append(entry.dt.strftime('%Y-%m-%d %H:%M'))
if entry.num_cmts > 0:
meta.append(f'{entry.num_cmts} comment{plural_s(entry.num_cmts)}')
if entry.num_likes > 0:
meta.append(f'{entry.num_likes} like{plural_s(entry.num_likes)}')
if entry.tags and not entry.issueid:
meta.append(entry.tags)
link = f'=> posts/{entry.path()}'
if top:
sub_page += f'\n{link} {top}\n{entry.label}\n{" · ".join(meta)}\n'
else:
sub_page += f'\n{entry.label}\n{link} {" · ".join(meta)}\n'
# Write to the archive.
with zip.open('posts/' + entry.path(), 'w') as content:
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
with zip.open(entry_path, 'w') as content:
content.write(sub_page.encode('utf-8'))
prev_type = None
for link in sorted(sub_links, key=str.lower):
if prev_type and prev_type != link[3]:
index_page += '\n'
index_page += link
prev_type = link[3] # u or s
index_page += '\n'
elif self.foreign_entries:
index_page += f'=> other/index.gmi Posts in Other Subspaces\n'
foreign_index_page = '# Posts in Other Subspaces\n'
last_sub = None
for entry in sorted(self.foreign_entries,
key=lambda e: self.get_subspace(e.subspace_id).name.lower()):
entry_sub = self.get_subspace(entry.subspace_id)
if entry_sub != last_sub:
foreign_index_page += f'\n## {entry_sub.name}\n'
last_sub = entry_sub
entry_path = 'other/' + entry.path()
foreign_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
with zip.open(entry_path, 'w') as content:
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
with zip.open('other/index.gmi', 'w') as content:
content.write(foreign_index_page.encode('utf-8'))
if self.comment_entries:
index_page += f'=> comments/index.gmi Commented Posts\n'
comment_index_page = '# Commented Posts\n'
for entry in self.comment_entries:
entry_path = 'comments/' + entry.path()
comment_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
with zip.open(entry_path, 'w') as content:
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
with zip.open('comments/index.gmi', 'w') as content:
content.write(comment_index_page.encode('utf-8'))
if self.file_entries:
index_page += '=> files/index.gmi File attachments\n'
file_index_page = '# File Attachments\n'
for entry in self.file_entries:
entry_path = 'files/' + entry.path()
file_index_page += f'\n=> {entry.path()} {entry.ymd()} {entry.label}\n'
# List of posts that link to this file.
for ref in entry.referenced_from_posts:
ref_entry = self.entry_index[ref]
file_index_page += f'=> ../posts/{ref_entry.path()} Referenced in: "{ref_entry.label}"\n'
with zip.open(entry_path, 'w') as content:
content.write(entry.page)
with zip.open('files/index.gmi', 'w') as content:
content.write(file_index_page.encode('utf-8'))
index_page += '=> users/index.gmi Users\n'
users_index_page = '# Users\n\nPosts and comments in this archive reference these users:\n\n'
# Sort users case insensitively.
for ref, (user, profile_text) in \
sorted(self.referenced_users.items(), key=lambda u: u[0].lower()):
users_index_page += f'=> {ref}.gmi {user.avatar} {ref}\n'
with zip.open('users/' + ref + '.gmi', 'w') as f:
f.write(profile_text.encode('utf-8'))
with zip.open('users/index.gmi', 'w') as f:
f.write(users_index_page.encode('utf-8'))
index_page += f'\n=> about/bubble.gmi 💬 About Bubble\n'
with zip.open('about/bubble.gmi', 'w') as f:
f.write(self.session.ABOUT.encode('utf-8'))
with zip.open('index.gmi', 'w') as f:
f.write(index_page.encode('utf-8'))
zip.close()
return buffer.getvalue()
def export_gempub_archive(session):
req = session.req
db = session.db
user = session.user
if not user:
return 60, 'Login required'
# Determine subspace to export.
m = re.search(r'/export/(s/|month/)?([\w%-]+)\.gpub$', req.path)
if not m or not m[2]:
return 59, 'Bad request'
name = urlparse.unquote(m[2])
if m[1] == 'month/':
month_range = map(int, m[2].split('-'))
subspace = None
else:
month_range = None
subspace = db.get_subspace(name=name)
is_user = m[1] is None
# Check access rights. At the moment, exporting is only possible via user
# settings and subspace admin pages, so the user must have moderation
# rights in the exported subspace.
if month_range:
if not user:
# Have to be logged in.
return 61, 'Not authorized'
elif is_user:
if subspace.owner != user.id:
return 61, 'Not authorized'
else:
if user.id not in map(lambda u: u.id, db.get_mods(subspace)):
return 61, 'Not authorized'
archive = GempubArchive(session, user if is_user else None, subspace, month_range)
archive.render_post_entries()
archive.render_file_entries()
data = archive.compress()
return 20, 'application/gpub+zip', data