444 lines
15 KiB
Python
444 lines
15 KiB
Python
import os
|
|
import sys
|
|
import subprocess
|
|
import uuid
|
|
import datetime
|
|
import json
|
|
|
|
VERSION = 0.2
|
|
|
|
PAGER_CMD = "~chiptune/bin/glow -p"
|
|
|
|
# for Utils.pub2gemini
|
|
GEMINI_DIR = f"{os.environ['HOME']}/public_gemini"
|
|
SLOG_DIR = f"{os.environ['HOME']}/.slog"
|
|
|
|
class Utils:
|
|
def timestamp():
|
|
"""UTC timestamp"""
|
|
return str(datetime.datetime.utcnow())
|
|
|
|
def list_active_users():
|
|
"""returns a list of all users having a (populated) slog index"""
|
|
active_users = []
|
|
for homedir in os.listdir("/home/"):
|
|
if "." in homedir:
|
|
continue
|
|
try:
|
|
if "index" in os.listdir(f"/home/{homedir}/.slog/"):
|
|
# check if index is not empty
|
|
with open(f"/home/{homedir}/.slog/index", "r", encoding="utf-8") as indexfile:
|
|
index_data = json.loads(indexfile.read())
|
|
|
|
if len(index_data) != 0:
|
|
active_users.append(homedir)
|
|
except (FileNotFoundError, PermissionError):
|
|
pass
|
|
|
|
return active_users
|
|
|
|
def random_id():
|
|
"""return a random string"""
|
|
return uuid.uuid4().hex[:10]
|
|
|
|
def open_editor(path: str):
|
|
"""launch $EDITOR for path if set, else launch nano"""
|
|
try:
|
|
input(f"{os.environ['EDITOR']} will be launched to edit the temporary file. press enter to continue")
|
|
os.system(f"{os.environ['EDITOR']} {path}")
|
|
except KeyError:
|
|
input(f"nano will be launched to edit the temporary file. Ctrl+X will save and exit. press enter to continue")
|
|
os.system(f"nano -t {path}")
|
|
|
|
def latest_post_by_user(user: str):
|
|
"""returns object for latest post in a user's index"""
|
|
|
|
with open(f"/home/{user}/.slog/index", "r", encoding="utf-8") as indexfile:
|
|
index_data = json.loads(indexfile.read())
|
|
|
|
if len(index_data) == 0:
|
|
return None
|
|
|
|
return index_data[-1]
|
|
|
|
def menu(items):
|
|
"""(fzf must be installed) takes a list as input and returns the index selected by the user from that list"""
|
|
|
|
menu_items_string = '\n'.join(items[::-1])
|
|
|
|
tmp_file_path = f"{os.environ['HOME']}/.slog/tmp{uuid.uuid4().hex}"
|
|
with open(tmp_file_path, "w") as tmpfile:
|
|
tmpfile.write(menu_items_string)
|
|
|
|
try:
|
|
response = subprocess.check_output(f"cat {tmp_file_path} | fzf", shell=True)
|
|
except subprocess.CalledProcessError:
|
|
os.system(f"rm {tmp_file_path}")
|
|
print("an error occurred")
|
|
sys.exit(0)
|
|
|
|
os.system(f"rm {tmp_file_path}")
|
|
|
|
selected_line = response.decode("utf-8")
|
|
|
|
return items.index(selected_line.strip())
|
|
|
|
def select_post_by_user(user: str):
|
|
"""(depends on Uils.menu()) menu to select a user's post and return its index within the user's list of posts, along with that list"""
|
|
|
|
# load post index of the user
|
|
with open(f"/home/{user}/.slog/index", "r", encoding="utf-8") as indexfile:
|
|
index_data = json.loads(indexfile.read())
|
|
|
|
# list of all titles
|
|
titles = [f"{i+1} - {post['id']} - {post['title']}" for i, post in enumerate(index_data)]
|
|
|
|
# menu
|
|
selected_title = Utils.menu(titles)
|
|
|
|
return (selected_title, index_data)
|
|
|
|
def pub2gemini():
|
|
"""publish slog index to public_gemini"""
|
|
|
|
with open(f"{SLOG_DIR}/index", 'r') as indexfile:
|
|
index = json.loads(indexfile.read())
|
|
|
|
os.system(f"rm -rf {GEMINI_DIR}/slog/ {GEMINI_DIR}/slog_index.gmi")
|
|
os.system(f"cp -r {SLOG_DIR}/posts/ {GEMINI_DIR}/slog/")
|
|
|
|
for file in os.listdir(f"{GEMINI_DIR}/slog/"):
|
|
os.system(f"mv {GEMINI_DIR}/slog/{file} {GEMINI_DIR}/slog/{file}.gmi")
|
|
|
|
slog_gmi_index = []
|
|
for post in index[::-1]:
|
|
slog_gmi_index.append(f"=> slog/{post['id']}.gmi {post['timestamp']} {post['title']}\n")
|
|
|
|
with open(f"{GEMINI_DIR}/slog_index.gmi", 'w') as slog_index:
|
|
slog_index.writelines(slog_gmi_index)
|
|
|
|
class Display:
|
|
def helptext():
|
|
print(f"slog v{VERSION}")
|
|
print("""
|
|
blog (locally) on the CLI like a boss
|
|
|
|
available commands:
|
|
help - print this help text
|
|
|
|
[timeline] - latest posts from all users
|
|
|
|
post - create a new post
|
|
read <user> - read posts by a user
|
|
|
|
subscribe <user> - subscribe to a user
|
|
unsubscribe <user> - unsubscribe from a user
|
|
subs - latest posts from users you are subscribed to
|
|
subs show - list of users you are subscribed to
|
|
|
|
del - delete a post
|
|
edit - edit a post
|
|
|
|
gmi (experimental) - publish your slog to gemini
|
|
|
|
source at https://tildegit.org/chiptune/slog
|
|
""")
|
|
|
|
def timeline():
|
|
"""show latest posts from all users in a chronological order"""
|
|
|
|
# get list of active users
|
|
active_users = Utils.list_active_users()
|
|
|
|
# list to hold timestamps for sorting
|
|
intermediate_timeline = []
|
|
|
|
# list to hold the timeline
|
|
timeline = []
|
|
|
|
# getting longest length of username for formatting
|
|
longest_length = max([len(name) for name in active_users])
|
|
|
|
# populating intermediate list
|
|
for user in active_users:
|
|
# getting latest post
|
|
latest_post = Utils.latest_post_by_user(user)
|
|
|
|
if latest_post == None:
|
|
continue
|
|
|
|
# loading slog user's "read" file
|
|
with open(f"{os.environ['HOME']}/.slog/read", "r") as f:
|
|
read = [i.strip() for i in f.readlines()]
|
|
|
|
# adding to list
|
|
try:
|
|
intermediate_timeline.append(
|
|
{
|
|
"user": user,
|
|
"title": latest_post['title'],
|
|
"timestamp": datetime.datetime.fromisoformat(latest_post['timestamp']),
|
|
"id": latest_post['id']
|
|
}
|
|
)
|
|
except IndexError:
|
|
pass
|
|
|
|
# sorting by timestamp in decreasing order
|
|
intermediate_timeline = sorted(intermediate_timeline, key=lambda k: k['timestamp'], reverse=True)
|
|
|
|
# populating the timeline list
|
|
for post in intermediate_timeline:
|
|
user = post["user"]
|
|
|
|
try:
|
|
if post['id'] in read or user == os.environ['USER']:
|
|
timeline.append(f"\x1b[33m{user + ' '*(longest_length - len(user))}\033[00m \x1b[37m{str(post['timestamp']).split('.')[0]}\033[00m {post['title']}")
|
|
else:
|
|
timeline.append(f"\x1b[33m{user + ' '*(longest_length - len(user))}\033[00m \x1b[37m{str(post['timestamp']).split('.')[0]}\033[00m \033[96m{post['title']}\033[00m")
|
|
except IndexError:
|
|
pass
|
|
|
|
if len(timeline) != 0:
|
|
print("User; Timestamp; Title")
|
|
|
|
# printing timeline list
|
|
for item in timeline:
|
|
print(item)
|
|
|
|
def subscribed_feed():
|
|
"""show latest posts from subscriptions"""
|
|
|
|
with open(f"{os.environ['HOME']}/.slog/read", "r") as f:
|
|
read = [i.strip() for i in f.readlines()]
|
|
|
|
with open(f"{os.environ['HOME']}/.slog/subscribed", "r") as f:
|
|
subscriptions = f.readlines()
|
|
subscriptions = [item.strip() for item in subscriptions]
|
|
|
|
if len(subscriptions) == 0:
|
|
print("you haven't subscribed to any users")
|
|
sys.exit(0)
|
|
|
|
feed = []
|
|
|
|
# getting longest length of username for formatting
|
|
longest_length = max([len(name) for name in subscriptions])
|
|
|
|
# populating the feed
|
|
for user in subscriptions:
|
|
latest_post = Utils.latest_post_by_user(user)
|
|
|
|
if latest_post == None:
|
|
feed.append(f"{user} doesn't have any posts")
|
|
continue
|
|
|
|
try:
|
|
if latest_post['id'] in read:
|
|
#feed.append(f"\x1b[33m{user + ' '*(longest_length - len(user))}\033[00m \x1b[37m{latest_post['timestamp'].split('.')[0]}\033[00m {latest_post['title']}")
|
|
pass
|
|
else:
|
|
feed.append(f"\x1b[33m{user + ' '*(longest_length - len(user))}\033[00m \x1b[37m{latest_post['timestamp'].split('.')[0]}\033[00m \033[96m{latest_post['title']}\033[00m")
|
|
except IndexError:
|
|
pass
|
|
|
|
# print the feed
|
|
if len(feed) != 0:
|
|
print("User; Timestamp; Title")
|
|
|
|
for item in feed:
|
|
print(item)
|
|
|
|
|
|
class Actions:
|
|
def post():
|
|
"""create a new post"""
|
|
# title input and random filename
|
|
title = input("TITLE: ")
|
|
|
|
post_id = Utils.random_id()
|
|
|
|
os.system(f"touch {os.environ['HOME']}/.slog/posts/{post_id}")
|
|
|
|
# write post in $EDITOR or nano
|
|
Utils.open_editor(f"{os.environ['HOME']}/.slog/posts/{post_id}")
|
|
|
|
# entry object for user's post index
|
|
entry = {"id":post_id, "timestamp": Utils.timestamp(), "title": title}
|
|
|
|
# load post index
|
|
with open(f"{os.environ['HOME']}/.slog/index", "r", encoding="utf-8") as indexfile:
|
|
index_data = json.loads(indexfile.read())
|
|
|
|
# append entry object to post index
|
|
index_data.append(entry)
|
|
|
|
# save post index
|
|
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
|
|
indexfile.write(json.dumps(index_data))
|
|
|
|
print("posted!")
|
|
|
|
def read(user: str):
|
|
"""select a post from all of a user's posts and read it"""
|
|
if user in os.listdir("/home/"):
|
|
# selecting post to view
|
|
selected_title, index_data = Utils.select_post_by_user(user)
|
|
|
|
# load selected post in pager
|
|
os.system(f"{PAGER_CMD} /home/{user}/.slog/posts/{index_data[selected_title]['id']}")
|
|
|
|
# mark post ID as read, ignoring if viewing own post
|
|
with open(f"{os.environ['HOME']}/.slog/read", 'r') as f:
|
|
read_list = f.readlines()
|
|
read_list = [i.strip() for i in read_list]
|
|
|
|
with open(f"{os.environ['HOME']}/.slog/read", 'a') as read_record:
|
|
if index_data[selected_title]['id'] not in read_list and user != os.environ['USER']:
|
|
read_record.write(f"{index_data[selected_title]['id']}\n")
|
|
else:
|
|
print(f"there's no such user")
|
|
|
|
def edit():
|
|
"""edit a post"""
|
|
# editing post content
|
|
selected_title, index_data = Utils.select_post_by_user(os.environ['USER'])
|
|
|
|
Utils.open_editor(f"{os.environ['HOME']}/.slog/posts/{index_data[selected_title]['id']}")
|
|
|
|
# editing post title
|
|
new_title = input("NEW TITLE (leave blank for unchanged): ")
|
|
if new_title.strip() != "":
|
|
index_data[selected_title]['title'] = new_title.strip()
|
|
|
|
# updating index file
|
|
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
|
|
indexfile.write(json.dumps(index_data))
|
|
|
|
print("saved")
|
|
|
|
def delete():
|
|
"""delete a post"""
|
|
# selecting post to delete
|
|
selected_title, index_data = Utils.select_post_by_user(os.environ['USER'])
|
|
|
|
# removing associated file
|
|
os.system(f"rm {os.environ['HOME']}/.slog/posts/{index_data[selected_title]['id']}")
|
|
|
|
# updating index file
|
|
del index_data[selected_title]
|
|
|
|
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
|
|
indexfile.write(json.dumps(index_data))
|
|
|
|
def subscribe(user: str):
|
|
"""add a username and id of its latest post to subscriptions file"""
|
|
|
|
# exit if username doesn't exist
|
|
if user not in os.listdir("/home/"):
|
|
print("there is no such user")
|
|
sys.exit(0)
|
|
|
|
# exit if trying to subscribe to oneself
|
|
if user == os.environ['USER']:
|
|
print("you can't subscribe to yourself")
|
|
sys.exit(0)
|
|
|
|
# get latest post data
|
|
latest_post = (Utils.latest_post_by_user(user))
|
|
|
|
# list of usernames from subscriptions file
|
|
with open(f"{os.environ['HOME']}/.slog/subscribed", "r") as sub_file:
|
|
subs = sub_file.readlines()
|
|
subs = [line.strip() for line in subs]
|
|
#subs = [line.split('-')[0].strip() for line in subs]
|
|
|
|
# check if already subscribed
|
|
if user in subs:
|
|
print("already subscribed")
|
|
sys.exit(0)
|
|
|
|
# append username to subscriptions file
|
|
with open(f"{os.environ['HOME']}/.slog/subscribed", "a") as sub_file:
|
|
sub_file.write(f"{user}\n")
|
|
print(f"subscribed to {user}")
|
|
|
|
def unsubscribe(user: str):
|
|
"""remove subscription from subscriptions file"""
|
|
|
|
# exit if running on own username
|
|
if user == os.environ['USER']:
|
|
print("you can't unsubscribe to yourself")
|
|
sys.exit(0)
|
|
|
|
# lines in subscriptions file
|
|
with open(f"{os.environ['HOME']}/.slog/subscribed", "r") as sub_file:
|
|
subs = sub_file.readlines()
|
|
subs = [item.strip() for item in subs]
|
|
|
|
# proceed to remove relevant line if username is present in subscriptions file
|
|
found = False
|
|
for i, item in enumerate(subs):
|
|
if user in item:
|
|
del subs[i]
|
|
found = True
|
|
break
|
|
|
|
# exit if user not in subscriptions file
|
|
if not found:
|
|
print(f"you are not subscribed to {user}")
|
|
sys.exit(0)
|
|
|
|
# update subscriptions file
|
|
subs = [item+'\n' for item in subs]
|
|
with open(f"{os.environ['HOME']}/.slog/subscribed", "w") as sub_file:
|
|
sub_file.writelines(subs)
|
|
|
|
print(f"unsubscribed from {user}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# set up .slog directory if doesn't exist
|
|
if ".slog" not in os.listdir(os.environ["HOME"]):
|
|
print(".slog dir not found, creating files...")
|
|
os.mkdir(f"{os.environ['HOME']}/.slog/")
|
|
os.mkdir(f"{os.environ['HOME']}/.slog/posts")
|
|
os.system(f"touch {os.environ['HOME']}/.slog/read")
|
|
os.system(f"touch {os.environ['HOME']}/.slog/subscribed")
|
|
os.system(f"chmod go-rx {os.environ['HOME']}/.slog/subscribed {os.environ['HOME']}/.slog/read")
|
|
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
|
|
indexfile.write("[]")
|
|
|
|
# process modes
|
|
try:
|
|
match sys.argv[1]:
|
|
case "post":
|
|
Actions.post()
|
|
case "help":
|
|
Display.helptext()
|
|
case "timeline":
|
|
Display.timeline()
|
|
case "read":
|
|
Actions.read(sys.argv[-1])
|
|
case "edit":
|
|
Actions.edit()
|
|
case "del":
|
|
Actions.delete()
|
|
case "subs":
|
|
if sys.argv[-1] == "show":
|
|
print("\x1b[33musers you are subscribed to:\033[00m")
|
|
os.system(f"cat {os.environ['HOME']}/.slog/subscribed")
|
|
sys.exit(0)
|
|
Display.subscribed_feed()
|
|
case "subscribe":
|
|
Actions.subscribe(sys.argv[-1])
|
|
case "unsubscribe":
|
|
Actions.unsubscribe(sys.argv[-1])
|
|
case "gmi":
|
|
Utils.pub2gemini()
|
|
case _:
|
|
print("no such command")
|
|
|
|
except IndexError:
|
|
Display.timeline()
|