bubble/worker.py

224 lines
7.9 KiB
Python
Raw Normal View History

import io
import json
2023-05-05 11:20:19 +00:00
import os
import re
2023-05-05 11:20:19 +00:00
import threading
import time
2023-05-05 11:20:19 +00:00
import subprocess
from model import Database, Notification, Repository
from utils import plural_s
2023-05-05 11:20:19 +00:00
pjoin = os.path.join
2023-05-05 11:20:19 +00:00
class Emailer (threading.Thread):
2023-05-13 11:55:30 +00:00
def __init__(self, capsule, hostname, port, cfg):
2023-05-05 11:20:19 +00:00
super().__init__()
self.capsule = capsule
self.cfg = cfg
2023-05-13 11:55:30 +00:00
self.hostname = hostname
self.server_link = f'gemini://{hostname}'
if port != 1965:
self.server_link += f':{port}'
2023-05-05 11:20:19 +00:00
# TODO: These are also in 50_bubble.py, should be DRY.
self.site_icon = cfg.get('icon', '💬')
self.site_name = cfg.get('name', 'Bubble')
self.site_info = cfg.get('info', "Bulletin Boards for Gemini")
self.email_interval = int(cfg.get("email.interval", 60 * 15))
2023-05-05 11:30:17 +00:00
self.email_cmd = cfg.get("email.cmd", "")
2023-05-05 11:20:19 +00:00
self.email_from = cfg.get('email.from', 'nobody@example.com')
self.email_footer = f'\n---\n{self.site_name}\n{self.site_info}\n'
def send_notifications(self, db):
cur = db.conn.cursor()
# Find users with unsent notifications.
cur.execute("""
SELECT u.id, name, email, notif
2023-05-05 11:20:19 +00:00
FROM users u
JOIN notifs n ON u.id=n.dst AND n.is_sent=FALSE
WHERE email!='' AND notif!=0
2023-05-05 11:20:19 +00:00
""")
pending_notifs = []
for (id, name, email, enabled_types) in cur:
pending_notifs.append((id, name, email, enabled_types))
2023-05-05 11:20:19 +00:00
messages = {}
2023-05-05 11:30:17 +00:00
footer = \
f'\nView notifications in your Dashboard:\n=> {self.server_link}/dashboard\n' + \
f'\nChange notification settings:\n=> {self.server_link}/settings\n' + \
2023-05-05 11:30:17 +00:00
self.email_footer
2023-05-05 11:20:19 +00:00
for id, name, email, enabled_types in pending_notifs:
2023-05-05 11:20:19 +00:00
cur.execute("""
SELECT
notifs.id, type, dst, src, post,
UNIX_TIMESTAMP(ts), su.name, p.title
2023-05-05 11:20:19 +00:00
FROM notifs
JOIN users su ON src=su.id
JOIN posts p ON post=p.id
2023-05-05 11:30:17 +00:00
WHERE dst=? AND is_sent=FALSE AND type & ?
2023-05-05 11:20:19 +00:00
ORDER BY ts
""", (id, enabled_types))
2023-05-05 11:30:17 +00:00
2023-05-05 11:20:19 +00:00
count = 0
body = f'Hello {name}!\n\n'
body += f'There has been activity on {self.site_name}:\n\n'
2023-05-05 11:30:17 +00:00
2023-05-05 11:20:19 +00:00
for (notif_id, type, dst, src, post, ts, src_name, post_title) in cur:
notif = Notification(notif_id, type, dst, src, post, False, ts,
src_name, post_title)
count += 1
_, label = notif.entry(show_age=False)
2023-05-05 11:20:19 +00:00
body += '* ' + label + '\n'
2023-05-05 11:30:17 +00:00
if count:
subject = f'[{self.site_name}] {name}: {count} new notification{plural_s(count)}'
messages[email] = (subject, body + footer)
2023-05-05 11:20:19 +00:00
# Mark everything as sent.
if len(pending_notifs):
cur.execute(f"""
UPDATE notifs
SET is_sent=TRUE
WHERE dst IN ({('?,' * len(pending_notifs))[:-1]})
""", list(map(lambda p: p[0], pending_notifs)))
db.commit()
cur = None
db.close()
for email in messages:
2023-05-05 11:30:17 +00:00
subject, body = messages[email]
2023-05-05 11:20:19 +00:00
try:
msg = f'From: {self.email_from}\n' + \
f'To: {email}\n' + \
f'Subject: {subject}\n\n' + \
2023-05-05 11:30:17 +00:00
body
2023-05-05 11:20:19 +00:00
args = [self.email_cmd, '-i', email]
#print(args, msg)
subprocess.check_output(args, input=msg, encoding='utf-8')
2023-05-05 11:20:19 +00:00
except Exception as x:
print('Emailer error:', x)
def run(self):
2023-05-05 11:30:17 +00:00
if not self.email_cmd:
# Emailter is disabled.
return
2023-05-05 11:20:19 +00:00
while not self.capsule.shutdown_event().wait(self.email_interval):
db = Database(self.cfg)
try:
self.send_notifications(db)
finally:
db.close()
class RepoFetcher (threading.Thread):
INTERVAL = 60 * 20
class Git:
"""Helper for running git in a specific directory."""
def __init__(self, cmd, path):
self.cmd = cmd
self.path = path
def run(self, args, as_bytes=False, without_path=False):
result = subprocess.check_output(
([self.cmd] if without_path else
[self.cmd, '-C', self.path]) + args
)
if as_bytes: return result
return result.decode('utf-8').rstrip()
def log(self, count=None, skip=0):
try:
count_arg = [f'-n{count}'] if count else []
out = self.run([
'log',
'--all'] +
count_arg +
[f'--skip={skip}',
"--pretty=format:{^@^fullHash^@^:^@^%H^@^,^@^message^@^:^@^%s^@^,^@^body^@^:^@^%b^@^,^@^commitDate^@^:^@^%ai^@^},^@&@^"
])
out = out.replace('^@&@^\n', '').replace(',^@&@^', '') \
.replace('\n', '\\n').replace('\t', ' ').replace('"', '\\"').replace('^@^', '"') \
.replace('\\n#', '\\n') \
.replace('"body":"#', '"body":"')
out = '[' + out + ']'
#print(out)
return json.loads(out)
except Exception as x:
print('Error:', x)
return []
def __init__(self, capsule, cfg):
super().__init__()
self.capsule = capsule
self.cfg = cfg
self.cache_dir = cfg.get("repo.cachedir", "")
self.git_cmd = cfg.get("repo.git", "/usr/bin/git")
def fetch_pending(self, db, repo):
now = time.time()
if repo.ts_fetch != None and \
time.time() - repo.ts_fetch < RepoFetcher.INTERVAL:
return
if not repo.clone_url:
return
# It's time to fetch now.
cache_path = pjoin(self.cache_dir, str(repo.id))
os.makedirs(cache_path, exist_ok=True)
git = RepoFetcher.Git(self.git_cmd, cache_path)
if not os.path.exists(pjoin(cache_path, 'config')):
git.run(['clone', '--bare', repo.clone_url, cache_path], without_path=True)
num_commits = None
else:
git.run(['fetch'])
num_commits = 100 # Since the last `INTERVAL` mins, so probably enough.
# Update the fetch timestamp.
cur = db.conn.cursor()
cur.execute("UPDATE repos SET ts_fetch=CURRENT_TIMESTAMP() WHERE id=?", (repo.id,))
db.commit()
issue_pattern = re.compile(r'\b' + repo.idlabel + r'\s*#(\d+)\b')
# Read the history to find out about commits.
for commit in git.log(num_commits):
hash = commit['fullHash']
date = commit['commitDate']
message = commit['message']
body = commit['body']
issuerefs = map(int, issue_pattern.findall(message + '\n' + body))
cur.execute("INSERT IGNORE INTO commits (repo, hash, msg, ts) VALUES (?, ?, ?, ?)",
(repo.id, hash, message, date))
for issue in issuerefs:
cur.execute("INSERT IGNORE INTO issuerefs (repo, commit, issue) VALUES (?, ?, ?)",
(repo.id, hash, issue))
db.commit()
def run(self):
if not self.cache_dir:
# Fetcher disabled.
return
print("RepoFetcher is running")
while not self.capsule.shutdown_event().wait(5.0):
db = Database(self.cfg)
try:
for repo in db.get_repositories():
try:
self.fetch_pending(db, repo)
except subprocess.CalledProcessError:
print('Error when fetching repository:', repo.clone_url)
finally:
db.close()