bubble/worker.py

252 lines
8.9 KiB
Python

import datetime
import json
import os
import re
import threading
import time
import subprocess
from model import Database, Notification, User
from utils import plural_s
pjoin = os.path.join
HOUR_RANGE = re.compile(r'(\d+)-(\d+)')
def is_hour_in_range(hour, hour_range):
try:
m = HOUR_RANGE.match(hour_range)
begin, end = int(m[1]), int(m[2])
if begin < end:
return hour >= begin and hour <= end
else:
# Range crosses midnight.
return hour >= begin or hour <= end
except:
return False
class Emailer (threading.Thread):
def __init__(self, capsule, hostname, port, cfg):
super().__init__()
self.capsule = capsule
self.cfg = cfg
self.hostname = hostname
self.server_link = f'gemini://{hostname}'
if port != 1965:
self.server_link += f':{port}'
# TODO: These are also in 50_bubble.py, should be DRY.
self.site_icon = cfg.get('icon', '💬')
self.site_name = cfg.get('name', 'Bubble')
self.site_info = cfg.get('info', "Bulletin Boards for Gemini")
self.email_interval = int(cfg.get("email.interval", 60 * 5))
self.email_cmd = cfg.get("email.cmd", "")
self.email_from = cfg.get('email.from', 'nobody@example.com')
self.email_footer = f'\n---\n{self.site_name}: {self.site_info}\n'
def send_notifications(self, db):
cur = db.conn.cursor()
# Find users with unsent notifications.
cur.execute("""
SELECT
u.id, name, email, email_range, notif
FROM users u
JOIN notifs n ON u.id=n.dst AND n.is_sent=FALSE
WHERE
email!='' AND
notif!=0 AND
TIMESTAMPDIFF(MINUTE, ts_email, CURRENT_TIMESTAMP())>email_inter
""")
pending_notifs = []
cur_hour = datetime.datetime.now(datetime.timezone.utc).hour
for (id, name, email, email_range, enabled_types) in cur:
# Check that the current hour is not excluded.
if is_hour_in_range(cur_hour, email_range):
continue
pending_notifs.append(User(id, name, None, None, None, None, None, None,
None, enabled_types, email, None,
email_range, None, None, None, None,
None, None, None))
messages = {}
footer = \
f'\nView notifications in your Dashboard:\n=> {self.server_link}/dashboard\n' + \
f'\nChange notification settings:\n=> {self.server_link}/settings/notif\n' + \
self.email_footer
for user in pending_notifs:
notifs = db.get_notifications(user, only_unsent=True)
count = 0
body = ''
def personal_first(n):
prio = Notification.PRIORITY[n.type] if n.type in Notification.PRIORITY else 10
return (-prio, n.ts)
for notif in sorted(notifs, key=personal_first):
count += 1
_, label = notif.entry(show_age=False)
body += label + '\n\n'
if count:
subject = f'{user.name}: {count} new notification{plural_s(count)}'
messages[user.email] = (subject, body + footer)
# Mark everything as sent.
if len(pending_notifs):
user_ids = list(map(lambda u: u.id, pending_notifs))
placeholders = ','.join(map(str, user_ids))
cur.execute(f"""
UPDATE notifs
SET is_sent=TRUE
WHERE dst IN ({placeholders})
""")
cur.execute(
f"UPDATE users SET ts_email=CURRENT_TIMESTAMP() WHERE id IN ({placeholders})")
db.commit()
cur = None
db.close()
for email in messages:
subject, body = messages[email]
try:
msg = f'From: {self.site_name} <{self.email_from}>\n' + \
f'To: {email}\n' + \
f'Subject: {subject}\n\n' + \
body
args = [self.email_cmd, '-i', email]
if self.email_cmd == 'stdout':
print(args, msg)
else:
subprocess.check_output(args, input=msg, encoding='utf-8')
except Exception as x:
print('Emailer error:', x)
def run(self):
if not self.email_cmd:
# Emailter is disabled.
return
print(" Emailer is running")
while not self.capsule.shutdown_event().wait(self.email_interval):
db = Database(self.cfg)
try:
self.send_notifications(db)
except:
import traceback
traceback.print_last()
finally:
db.close()
class RepoFetcher (threading.Thread):
INTERVAL = 60 * 20
class Git:
"""Helper for running git in a specific directory."""
def __init__(self, cmd, path):
self.cmd = cmd
self.path = path
def run(self, args, as_bytes=False, without_path=False):
result = subprocess.check_output(
([self.cmd] if without_path else
[self.cmd, '-C', self.path]) + args
)
if as_bytes: return result
return result.decode('utf-8').rstrip()
def log(self, count=None, skip=0):
try:
count_arg = [f'-n{count}'] if count else []
out = self.run([
'log',
'--all'] +
count_arg +
[f'--skip={skip}',
"--pretty=format:{^@^fullHash^@^:^@^%H^@^,^@^message^@^:^@^%s^@^,^@^body^@^:^@^%b^@^,^@^commitDate^@^:^@^%ai^@^},^@&@^"
])
out = out.replace('^@&@^\n', '').replace(',^@&@^', '') \
.replace('\\', '\\\\').replace('\n', '\\n') \
.replace('\t', ' ').replace('"', '\\"').replace('^@^', '"') \
.replace('\\n#', '\\n') \
.replace('"body":"#', '"body":"')
out = '[' + out + ']'
#print(out)
return json.loads(out)
except Exception as x:
print('RepoFetcher:', x)
return []
def __init__(self, capsule, cfg):
super().__init__()
self.capsule = capsule
self.cfg = cfg
self.cache_dir = cfg.get("repo.cachedir", "")
self.git_cmd = cfg.get("repo.git", "/usr/bin/git")
def fetch_pending(self, db, repo):
if repo.ts_fetch != None and \
time.time() - repo.ts_fetch < RepoFetcher.INTERVAL:
return
if not repo.clone_url:
return
# It's time to fetch now.
cache_path = pjoin(self.cache_dir, str(repo.id))
os.makedirs(cache_path, exist_ok=True)
git = RepoFetcher.Git(self.git_cmd, cache_path)
if not os.path.exists(pjoin(cache_path, 'config')):
git.run(['clone', '--bare', repo.clone_url, cache_path], without_path=True)
git.run(['config', 'remote.origin.fetch', 'refs/heads/*:refs/heads/*']) # enable fetch
num_commits = None
else:
git.run(['fetch', '--prune'])
num_commits = 100 # Since the last `INTERVAL` mins, so probably enough.
# Update the fetch timestamp.
cur = db.conn.cursor()
cur.execute("UPDATE repos SET ts_fetch=CURRENT_TIMESTAMP() WHERE id=?", (repo.id,))
db.commit()
issue_pattern = re.compile(r'\b' + repo.idlabel + r'\s*#(\d+)\b')
# Read the history to find out about commits.
for commit in git.log(num_commits):
hash = commit['fullHash']
date = commit['commitDate']
message = commit['message']
body = commit['body']
issuerefs = map(int, issue_pattern.findall(message + '\n' + body))
cur.execute("INSERT IGNORE INTO commits (repo, hash, msg, ts) VALUES (?, ?, ?, ?)",
(repo.id, hash, message, date))
for issue in issuerefs:
cur.execute("INSERT IGNORE INTO issuerefs (repo, commit, issue) VALUES (?, ?, ?)",
(repo.id, hash, issue))
db.commit()
def run(self):
if not self.cache_dir:
# Fetcher disabled.
return
print(" RepoFetcher is running")
while not self.capsule.shutdown_event().wait(5.0):
db = Database(self.cfg)
try:
for repo in db.get_repositories():
try:
self.fetch_pending(db, repo)
except subprocess.CalledProcessError:
print('Error when fetching repository:', repo.clone_url)
finally:
db.close()