slog/slog.py

351 lines
12 KiB
Python

import os
import sys
import subprocess
import uuid
import datetime
import json
VERSION = 0.1
class Utils:
def timestamp():
"""UTC timestamp"""
return str(datetime.datetime.utcnow())
def list_active_users():
"""returns a list of all users having a slog index"""
active_users = []
for homedir in os.listdir("/home/"):
if "." in homedir:
continue
try:
if "index" in os.listdir(f"/home/{homedir}/.slog/"):
active_users.append(homedir)
except (FileNotFoundError, PermissionError):
pass
return active_users
def random_id():
return uuid.uuid4().hex[:10]
def open_editor(path: str):
"""launches $EDITOR for path if set, else launch nano"""
try:
input(f"{os.environ['EDITOR']} will be launched to edit the temporary file. press enter to continue")
os.system(f"{os.environ['EDITOR']} {path}")
except KeyError:
input(f"nano will be launched to edit the temporary file. Ctrl+X will save and exit. press enter to continue")
os.system(f"nano -t {path}")
def select_post_by_user(user: str):
"""menu to select a user's post and return its index within the user's list of posts, along with that list"""
# load post index of the user
with open(f"/home/{user}/.slog/index", "r", encoding="utf-8") as indexfile:
index_data = json.loads(indexfile.read())
# list of all titles
titles = [f"{i+1} - {post['id']} - {post['title']}" for i, post in enumerate(index_data)]
# menu
selected_title = Utils.menu(titles)
return (selected_title - 1, index_data)
def latest_post_by_user(user: str):
"""returns object for latest post in a user's index"""
with open(f"/home/{user}/.slog/index", "r", encoding="utf-8") as indexfile:
index_data = json.loads(indexfile.read())
if len(index_data) == 0:
return None
return index_data[-1]
def menu(items):
"""takes a list as input and returns the index selected by the user from that list"""
menu_items_string = '\n'.join(items)
tmp_file_path = f"{os.environ['HOME']}/.slog/tmp{uuid.uuid4().hex}"
with open(tmp_file_path, "w") as tmpfile:
tmpfile.write(menu_items_string)
try:
response = subprocess.check_output(f"cat {tmp_file_path} | fzf", shell=True)
except subprocess.CalledProcessError:
os.system(f"rm {tmp_file_path}")
print("an error occurred")
sys.exit(0)
os.system(f"rm {tmp_file_path}")
selected_line = response.decode("utf-8")
return items.index(selected_line.strip())
class Display:
def helptext():
print(f"slog v{VERSION}")
print("""
available commands:
help - print this help text
[timeline] - latest posts
post - create a new post
read <user> - read posts by a user
subscribe <user> - subscribe to a user
unsubscribe <user> - unsubscribe from a user
subs - unread posts from users you are subscribed to
del - delete a post
edit - edit a post
""")
def timeline():
"""show unread posts in timeline"""
active_users = Utils.list_active_users()
timeline = []
for user in active_users:
latest_post = Utils.latest_post_by_user(user)
if latest_post == None:
continue
with open(f"{os.environ['HOME']}/.slog/read", "r") as f:
read = [i.strip() for i in f.readlines()]
try:
if latest_post['id'] in read or user == os.environ['USER']:
timeline.append(f"\x1b[35m{user}\033[00m {latest_post['timestamp'].split('.')[0]} {latest_post['title']}")
else:
timeline.append(f"\x1b[35m{user}\033[00m {latest_post['timestamp'].split('.')[0]} \033[96m{latest_post['title']}\033[00m")
except IndexError:
pass
if len(timeline) != 0:
print("User; Timestamp; Title")
for item in timeline:
print(item)
def subscribed_feed():
"""show latest posts from subscriptions"""
active_users = Utils.list_active_users()
with open(f"{os.environ['HOME']}/.slog/read", "r") as f:
read = [i.strip() for i in f.readlines()]
with open(f"{os.environ['HOME']}/.slog/subscribed", "r") as f:
subscriptions = f.readlines()
#subscriptions = {item.split('-')[0].strip(): item.split('-')[1].strip() for item in subscriptions}
subscriptions = [item.strip() for item in subscriptions]
feed = []
for user in subscriptions:
latest_post = Utils.latest_post_by_user(user)
if latest_post == None:
feed.append(f"{user} doesn't have any posts")
continue
try:
if latest_post['id'] in read:
feed.append(f"\x1b[35m{user}\033[00m {latest_post['timestamp'].split('.')[0]} {latest_post['title']}")
else:
feed.append(f"\x1b[35m{user}\033[00m {latest_post['timestamp'].split('.')[0]} \033[96m{latest_post['title']}\033[00m")
except IndexError:
pass
if len(feed) != 0:
print("User; Timestamp; Title")
for item in feed:
print(item)
class Actions:
def post():
title = input("TITLE: ")
post_id = Utils.random_id()
os.system(f"touch {os.environ['HOME']}/.slog/posts/{post_id}")
Utils.open_editor(f"{os.environ['HOME']}/.slog/posts/{post_id}")
entry = {"id":post_id, "timestamp": Utils.timestamp(), "title": title}
with open(f"{os.environ['HOME']}/.slog/index", "r", encoding="utf-8") as indexfile:
index_data = json.loads(indexfile.read())
index_data.append(entry)
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
indexfile.write(json.dumps(index_data))
print("posted!")
def read(user: str):
"""select a post from all of a user's posts and read it"""
if user in os.listdir("/home/"):
# selecting post to view
selected_title, index_data = Utils.select_post_by_user(user)
# load selected post in pager
os.system(f"less /home/{user}/.slog/posts/{index_data[selected_title]['id']}")
# mark post ID as read, ignoring if viewing own post
with open(f"{os.environ['HOME']}/.slog/read", 'r') as f:
read_list = f.readlines()
read_list = [i.strip() for i in read_list]
with open(f"{os.environ['HOME']}/.slog/read", 'a') as read_record:
if index_data[selected_title]['id'] not in read_list and user != os.environ['USER']:
read_record.write(f"{index_data[selected_title]['id']}\n")
else:
print(f"there's no such user")
def edit():
"""edit a post"""
# editing post content
selected_title, index_data = Utils.select_post_by_user(os.environ['USER'])
Utils.open_editor(f"{os.environ['HOME']}/.slog/posts/{index_data[selected_title]['id']}")
# editing post title
new_title = input("NEW TITLE (leave blank for unchanged): ")
if new_title.strip() != "":
index_data[selected_title]['title'] = new_title.strip()
# updating index file
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
indexfile.write(json.dumps(index_data))
print("saved")
def delete():
"""delete a post"""
# selecting post to delete
selected_title, index_data = Utils.select_post_by_user(os.environ['USER'])
# removing associated file
os.system(f"rm {os.environ['HOME']}/.slog/posts/{index_data[selected_title]['id']}")
# updating index file
del index_data[selected_title]
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
indexfile.write(json.dumps(index_data))
def subscribe(user: str):
"""add a username and id of its latest post to subscriptions file"""
# exit if username doesn't exist
if user not in os.listdir("/home/"):
print("there is no such user")
sys.exit(0)
# exit if trying to subscribe to oneself
if user == os.environ['USER']:
print("you can't subscribe to yourself")
sys.exit(0)
# get latest post data
latest_post = (Utils.latest_post_by_user(user))
# list of usernames from subscriptions file
with open(f"{os.environ['HOME']}/.slog/subscribed", "r") as sub_file:
subs = sub_file.readlines()
subs = [line.strip() for line in subs]
#subs = [line.split('-')[0].strip() for line in subs]
# check if already subscribed
if user in subs:
print("already subscribed")
sys.exit(0)
# append username to subscriptions file
with open(f"{os.environ['HOME']}/.slog/subscribed", "a") as sub_file:
sub_file.write(f"{user}\n")
print(f"subscribed to {user}")
def unsubscribe(user: str):
"""remove subscription from subscriptions file"""
# exit if running on own username
if user == os.environ['USER']:
print("you can't unsubscribe to yourself")
sys.exit(0)
# lines in subscriptions file
with open(f"{os.environ['HOME']}/.slog/subscribed", "r") as sub_file:
subs = sub_file.readlines()
subs = [item.strip() for item in subs]
# proceed to remove relevant line if username is present in subscriptions file
found = False
for i, item in enumerate(subs):
if user in item:
del subs[i]
found = True
break
# exit if user not in subscriptions file
if not found:
print(f"you are not subscribed to {user}")
sys.exit(0)
# update subscriptions file
with open(f"{os.environ['HOME']}/.slog/subscribed", "w") as sub_file:
sub_file.writelines(subs)
print(f"unsubscribed from {user}")
if __name__ == "__main__":
# set up .slog directory if doesn't exist
if ".slog" not in os.listdir(os.environ["HOME"]):
print(".slog dir not found, creating files...")
os.mkdir(f"{os.environ['HOME']}/.slog/")
os.mkdir(f"{os.environ['HOME']}/.slog/posts")
os.system(f"touch {os.environ['HOME']}/.slog/read")
os.system(f"touch {os.environ['HOME']}/.slog/subscribed")
os.system(f"chmod go-rx {os.environ['HOME']}/.slog/subscribed {os.environ['HOME']}/.slog/read")
with open(f"{os.environ['HOME']}/.slog/index", "w", encoding="utf-8") as indexfile:
indexfile.write("[]")
# process modes
try:
match sys.argv[1]:
case "post":
Actions.post()
case "help":
Display.helptext()
case "timeline":
Display.timeline()
case "read":
Actions.read(sys.argv[-1])
case "edit":
Actions.edit()
case "del":
Actions.delete()
case "subs":
Display.subscribed_feed()
case "subscribe":
Actions.subscribe(sys.argv[-1])
case "unsubscribe":
Actions.unsubscribe(sys.argv[-1])
case _:
print("no such command")
except IndexError:
Display.timeline()