slog/slog.py

444 lines
15 KiB
Python

import os
import sys
import subprocess
import uuid
import datetime
import json
VERSION = 0.2
PAGER_CMD = "~chiptune/bin/glow -p"
# for Utils.pub2gemini
GEMINI_DIR = f"{os.environ['HOME']}/public_gemini"
SLOG_DIR = f"{os.environ['HOME']}/.slog"
class Utils:
def timestamp():
"""UTC timestamp"""
return str(datetime.datetime.utcnow())
def list_active_users():
"""returns a list of all users having a (populated) slog index"""
active_users = []
for homedir in os.listdir("/home/"):
if "." in homedir:
continue
try:
if "index" in os.listdir(f"/home/{homedir}/.slog/"):
# check if index is not empty
with open(f"/home/{homedir}/.slog/index", "r", encoding="utf-8") as indexfile:
index_data = json.loads(indexfile.read())
if len(index_data) != 0:
active_users.append(homedir)
except (FileNotFoundError, PermissionError):
pass
return active_users
def random_id():
"""return a random string"""
return uuid.uuid4().hex[:10]
def open_editor(path: str):
"""launch $EDITOR for path if set, else launch nano"""
try:
input(f"{os.environ['EDITOR']} will be launched to edit the temporary file. press enter to continue")
os.system(f"{os.environ['EDITOR']} {path}")
except KeyError:
input(f"nano will be launched to edit the temporary file. Ctrl+X will save and exit. press enter to continue")
os.system(f"nano -t {path}")
def latest_post_by_user(user: str):
"""returns object for latest post in a user's index"""
with open(f"/home/{user}/.slog/index", "r", encoding="utf-8") as indexfile:
index_data = json.loads(indexfile.read())
if len(index_data) == 0:
return None
return index_data[-1]
def menu(items):
"""(fzf must be installed) takes a list as input and returns the index selected by the user from that list"""
menu_items_string = '\n'.join(items[::-1])
tmp_file_path = f"{os.environ['HOME']}/.slog/tmp{uuid.uuid4().hex}"
with open(tmp_file_path, "w") as tmpfile:
tmpfile.write(menu_items_string)
try:
response = subprocess.check_output(f"cat {tmp_file_path} | fzf", shell=True)
except subprocess.CalledProcessError:
os.system(f"rm {tmp_file_path}")
print("an error occurred")
sys.exit(0)
os.system(f"rm {tmp_file_path}")
selected_line = response.decode("utf-8")
return items.index(selected_line.strip())
def select_post_by_user(user: str):
"""(depends on Uils.menu()) menu to select a user's post and return its index within the user's list of posts, along with that list"""
# load post index of the user
with open(f"/home/{user}/.slog/index", "r", encoding="utf-8") as indexfile:
index_data = json.loads(indexfile.read())
# list of all titles
titles = [f"{i+1} - {post['id']} - {post['title']}" for i, post in enumerate(index_data)]
# menu
selected_title = Utils.menu(titles)
return (selected_title, index_data)
def pub2gemini():
"""publish slog index to public_gemini"""
with open(f"{SLOG_DIR}/index", 'r') as indexfile:
index = json.loads(indexfile.read())
os.system(f"rm -rf {GEMINI_DIR}/slog/ {GEMINI_DIR}/slog_index.gmi")
os.system(f"cp -r {SLOG_DIR}/posts/ {GEMINI_DIR}/slog/")
for file in os.listdir(f"{GEMINI_DIR}/slog/"):
os.system(f"mv {GEMINI_DIR}/slog/{file} {GEMINI_DIR}/slog/{file}.gmi")
slog_gmi_index = []
for post in index[::-1]:
slog_gmi_index.append(f"=> slog/{post['id']}.gmi {post['timestamp']} {post['title']}\n")
with open(f"{GEMINI_DIR}/slog_index.gmi", 'w') as slog_index:
slog_index.writelines(slog_gmi_index)
class Display:
def helptext():
print(f"slog v{VERSION}")
print("""
blog (locally) on the CLI like a boss
available commands:
help - print this help text
[timeline] - latest posts from all users
post - create a new post
read <user> - read posts by a user
subscribe <user> - subscribe to a user
unsubscribe <user> - unsubscribe from a user
subs - latest posts from users you are subscribed to
subs show - list of users you are subscribed to
del - delete a post
edit - edit a post
gmi (experimental) - publish your slog to gemini
source at https://tildegit.org/chiptune/slog
""")
def timeline():
"""show latest posts from all users in a chronological order"""
# get list of active users
active_users = Utils.list_active_users()
# list to hold timestamps for sorting
intermediate_timeline = []
# list to hold the timeline
timeline = []
# getting longest length of username for formatting
longest_length = max([len(name) for name in active_users])
# populating intermediate list
for user in active_users:
# getting latest post
latest_post = Utils.latest_post_by_user(user)
if latest_post == None:
continue
# loading slog user's "read" file
with open(f"{os.environ['HOME']}/.slog/read", "r") as f:
read = [i.strip() for i in f.readlines()]
# adding to list
try:
intermediate_timeline.append(
{
"user": user,
"title": latest_post['title'],
"timestamp": datetime.datetime.fromisoformat(latest_post['timestamp']),
"id": latest_post['id']
}
)
except IndexError:
pass
# sorting by timestamp in decreasing order
intermediate_timeline = sorted(intermediate_timeline, key=lambda k: k['timestamp'], reverse=True)
# populating the timeline list
for post in intermediate_timeline:
user = post["user"]
try:
if post['id'] in read or user == os.environ['USER']:
timeline.append(f"\x1b[33m{user + ' '*(longest_length - len(user))}\033[00m \x1b[37m{str(post['timestamp']).split('.')[0]}\033[00m {post['title']}")
else:
timeline.append(f"\x1b[33m{user + ' '*(longest_length - len(user))}\033[00m \x1b[37m{str(post['timestamp']).split('.')[0]}\033[00m \033[96m{post['title']}\033[00m")
except IndexError:
pass
if len(timeline) != 0:
print("User; Timestamp; Title")
# printing timeline list
for item in timeline:
print(item)
def subscribed_feed():
"""show latest posts from subscriptions"""
with open(f"{os.environ['HOME']}/.slog/read", "r") as f:
read = [i.strip() for i in f.readlines()]
with open(f"{os.environ['HOME']}/.slog/subscribed", "r") as f:
subscriptions = f.readlines()
subscriptions = [item.strip() for item in subscriptions]
if len(subscriptions) == 0:
print("you haven't subscribed to any users")
sys.exit(0)
feed = []
# getting longest length of username for formatting
longest_length = max([len(name) for name in subscriptions])
# populating the feed
for user in subscriptions:
latest_post = Utils.latest_post_by_user(user)
if latest_post == None:
feed.append(f"{user} doesn't have any posts")
continue
try:
if latest_post['id'] in read:
#feed.append(f"\x1b[33m{user + ' '*(longest_length - len(user))}\033[00m \x1b[37m{latest_post['timestamp'].split('.')[0]}\033[00m {latest_post['title']}")
pass
else:
feed.append(f"\x1b[33m{user + ' '*(longest_length - len(user))}\033[00m \x1b[37m{latest_post['timestamp'].split('.')[0]}\033[00m \033[96m{latest_post['title']}\033[00m")
except IndexError:
pass
# print the feed
if len(feed) != 0:
print("User; Timestamp; Title")
for item in feed:
print(item)
class Actions:
def post():
"""create a new post"""
# title input and random filename
title = input("TITLE: ")
post_id = Utils.random_id()
os.system(f"touch {os.environ['HOME']}/.slog/posts/{post_id}")
# write post in $EDITOR or nano
Utils.open_editor(f"{os.environ['HOME']}/.slog/posts/{post_id}")
# entry object for user's post index
entry = {"id":post_id, "timestamp": Utils.timestamp(), "title": title}
# load post index
with open(f"{os.environ['HOME']}/.slog/index", "r", encoding="utf-8") as indexfile:
index_data = json.loads(indexfile.read())
# append entry object to post index
index_data.append(entry)
# save post index
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
indexfile.write(json.dumps(index_data))
print("posted!")
def read(user: str):
"""select a post from all of a user's posts and read it"""
if user in os.listdir("/home/"):
# selecting post to view
selected_title, index_data = Utils.select_post_by_user(user)
# load selected post in pager
os.system(f"{PAGER_CMD} /home/{user}/.slog/posts/{index_data[selected_title]['id']}")
# mark post ID as read, ignoring if viewing own post
with open(f"{os.environ['HOME']}/.slog/read", 'r') as f:
read_list = f.readlines()
read_list = [i.strip() for i in read_list]
with open(f"{os.environ['HOME']}/.slog/read", 'a') as read_record:
if index_data[selected_title]['id'] not in read_list and user != os.environ['USER']:
read_record.write(f"{index_data[selected_title]['id']}\n")
else:
print(f"there's no such user")
def edit():
"""edit a post"""
# editing post content
selected_title, index_data = Utils.select_post_by_user(os.environ['USER'])
Utils.open_editor(f"{os.environ['HOME']}/.slog/posts/{index_data[selected_title]['id']}")
# editing post title
new_title = input("NEW TITLE (leave blank for unchanged): ")
if new_title.strip() != "":
index_data[selected_title]['title'] = new_title.strip()
# updating index file
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
indexfile.write(json.dumps(index_data))
print("saved")
def delete():
"""delete a post"""
# selecting post to delete
selected_title, index_data = Utils.select_post_by_user(os.environ['USER'])
# removing associated file
os.system(f"rm {os.environ['HOME']}/.slog/posts/{index_data[selected_title]['id']}")
# updating index file
del index_data[selected_title]
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
indexfile.write(json.dumps(index_data))
def subscribe(user: str):
"""add a username and id of its latest post to subscriptions file"""
# exit if username doesn't exist
if user not in os.listdir("/home/"):
print("there is no such user")
sys.exit(0)
# exit if trying to subscribe to oneself
if user == os.environ['USER']:
print("you can't subscribe to yourself")
sys.exit(0)
# get latest post data
latest_post = (Utils.latest_post_by_user(user))
# list of usernames from subscriptions file
with open(f"{os.environ['HOME']}/.slog/subscribed", "r") as sub_file:
subs = sub_file.readlines()
subs = [line.strip() for line in subs]
#subs = [line.split('-')[0].strip() for line in subs]
# check if already subscribed
if user in subs:
print("already subscribed")
sys.exit(0)
# append username to subscriptions file
with open(f"{os.environ['HOME']}/.slog/subscribed", "a") as sub_file:
sub_file.write(f"{user}\n")
print(f"subscribed to {user}")
def unsubscribe(user: str):
"""remove subscription from subscriptions file"""
# exit if running on own username
if user == os.environ['USER']:
print("you can't unsubscribe to yourself")
sys.exit(0)
# lines in subscriptions file
with open(f"{os.environ['HOME']}/.slog/subscribed", "r") as sub_file:
subs = sub_file.readlines()
subs = [item.strip() for item in subs]
# proceed to remove relevant line if username is present in subscriptions file
found = False
for i, item in enumerate(subs):
if user in item:
del subs[i]
found = True
break
# exit if user not in subscriptions file
if not found:
print(f"you are not subscribed to {user}")
sys.exit(0)
# update subscriptions file
subs = [item+'\n' for item in subs]
with open(f"{os.environ['HOME']}/.slog/subscribed", "w") as sub_file:
sub_file.writelines(subs)
print(f"unsubscribed from {user}")
if __name__ == "__main__":
# set up .slog directory if doesn't exist
if ".slog" not in os.listdir(os.environ["HOME"]):
print(".slog dir not found, creating files...")
os.mkdir(f"{os.environ['HOME']}/.slog/")
os.mkdir(f"{os.environ['HOME']}/.slog/posts")
os.system(f"touch {os.environ['HOME']}/.slog/read")
os.system(f"touch {os.environ['HOME']}/.slog/subscribed")
os.system(f"chmod go-rx {os.environ['HOME']}/.slog/subscribed {os.environ['HOME']}/.slog/read")
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
indexfile.write("[]")
# process modes
try:
match sys.argv[1]:
case "post":
Actions.post()
case "help":
Display.helptext()
case "timeline":
Display.timeline()
case "read":
Actions.read(sys.argv[-1])
case "edit":
Actions.edit()
case "del":
Actions.delete()
case "subs":
if sys.argv[-1] == "show":
print("\x1b[33musers you are subscribed to:\033[00m")
os.system(f"cat {os.environ['HOME']}/.slog/subscribed")
sys.exit(0)
Display.subscribed_feed()
case "subscribe":
Actions.subscribe(sys.argv[-1])
case "unsubscribe":
Actions.unsubscribe(sys.argv[-1])
case "gmi":
Utils.pub2gemini()
case _:
print("no such command")
except IndexError:
Display.timeline()