Refactored category data, updated display after post is added, tests

This commit is contained in:
asdf 2019-12-13 16:12:54 +11:00
parent a846d8788f
commit 4d7cc67876
5 changed files with 366 additions and 252 deletions

172
data.py
View File

@ -1,17 +1,33 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
"""This module takes input and returns link_data, the data structure linkulator works from"""
from time import time
from typing import NamedTuple
from pathlib import PurePath
from glob import glob
import re
import os
import config
# regex for removing escape characters from https://stackoverflow.com/a/14693789
ESCAPE_CHARS = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
BAD_CHARS = re.compile(r"[\t\r\n\f\v]*")
class LinkDataRecord(NamedTuple):
"""Represents a record in LinkData.link_data"""
ID_if_parent: str = ""
username: str = ""
timestamp: str = ""
parent_id: str = ""
category: str = ""
link_URL: str = ""
link_title_or_comment: str = ""
def is_well_formed_line(line: str) -> bool:
"""Checks if current line is valid or not, returns true and false respectively."""
"""Checks if current line is valid or not, returns true or false respectively."""
pipe_count = (
4 ## A PROPERLY FORMATED LINE IN linkulator.data HAS EXACTLY FOUR PIPES.
)
@ -19,7 +35,7 @@ def is_well_formed_line(line: str) -> bool:
def is_valid_time(timestamp: str) -> bool:
"""identifies future dated timestamps - returns true if valid time, false is invalid"""
"""identifies future dated timestamps - returns true if valid time, false if invalid"""
return float(timestamp) < time()
@ -30,70 +46,128 @@ def wash_line(line: str) -> str:
return line
def process(line: str, file_owner: str):
def process(line: str, file_owner: str) -> list:
"""Takes a line, returns a list based on the delimeter pipe character"""
if not is_well_formed_line(line):
raise ValueError("Not a well formed record")
line = wash_line(line)
split_line = line.split("|")
split_line: list = line.split("|")
if split_line[0] and not is_valid_time(split_line[0]):
raise ValueError("Invalid date")
split_line.insert(0, file_owner)
return split_line
def get(config, ignore_names):
"""reads data files for non-ignored users and returns valid data in linkulator formats"""
link_data = []
## username, datestamp, parent-id, category, link-url, link-title
categories = []
category_counts = {}
ignore_names = []
def parse_ignore_file() -> list:
"""reads the current user's ignore file, returns a list of usernames to ignore"""
ignore_names: list = []
if config.USER.ignorefile.exists():
_s = config.USER.ignorefile.read_text()
_l = _s.splitlines()
for line in _l:
name = line.split(" ")[0]
ignore_names.append(name)
return ignore_names
## WHENEVER THIS FUNCTION IS CALLED, THE DATA IS REFRESHED FROM FILES. SINCE
## DISK IO IS PROBABLY THE HEAVIEST PART OF THIS SCRIPT, DON'T DO THIS OFTEN.
files_pattern = str(
PurePath(config.PATHS.all_homedir_pattern).joinpath(
config.PATHS.datadir, config.PATHS.datafile
class LinkData:
"""Class that contains link_data, categories and categories count tables,
plus methods to generate and update these items"""
def __init__(self):
self.link_data: list = []
self.categories: list = []
self.get()
def get(self):
"""reads data files for non-ignored users, sets valid data in
linkulator formats
whenever this function is called, the data is refreshed from files.
since disk io is probably the heaviest part of this script, don't do
this often."""
ignore_names = parse_ignore_file()
files_pattern = str(
PurePath(config.PATHS.all_homedir_pattern).joinpath(
config.PATHS.datadir, config.PATHS.datafile
)
)
)
linkulator_files = glob(files_pattern)
linkulator_files = glob(files_pattern)
id_iterator = 1
id_iterator = 1
for filename in linkulator_files:
with open(filename) as cfile:
# get file owner username from path
file_owner = PurePath(filename).parent.parent.name
if file_owner in ignore_names:
# ignore names found in ignore file
continue
for line in cfile:
try:
split_line = process(line, file_owner)
except ValueError:
for filename in linkulator_files:
with open(filename) as cfile:
# get file owner username from path
file_owner = PurePath(filename).parent.parent.name
if file_owner in ignore_names:
# ignore names found in ignore file
continue
for line in cfile:
try:
split_line = process(line, file_owner)
except ValueError:
continue
# assign parent items (links) an ID
if split_line[2] == "":
split_line.insert(0, id_iterator)
id_iterator += 1
else:
split_line.insert(0, "")
# assign parent items (links) an ID
if split_line[2] == "":
split_line.insert(0, id_iterator)
id_iterator += 1
else:
split_line.insert(0, "")
link_data.append(split_line)
self.link_data.append(split_line)
# sort links by creation date
link_data.sort(key=lambda x: x[2], reverse=True)
self.sort_link_data()
self.generate_category_data()
# generate categories list and category count from sorted link data
for record in link_data:
cat = record[4]
if cat not in categories and cat != "":
categories.append(cat)
category_counts[cat] = 1
elif cat in categories:
category_counts[cat] += 1
def sort_link_data(self):
"""sort link_data by creation date"""
self.link_data.sort(key=lambda x: x[2], reverse=True)
return link_data, categories, category_counts
def add(self, record):
"""Add a record to the data file, and to link_data"""
if os.path.exists(config.USER.datafile):
append_write = "a" # append if already exists
else:
append_write = "w+" # make a new file if not
with open(config.USER.datafile, append_write) as file:
file.write(
"{}|{}|{}|{}|{}\n".format(
record.timestamp,
record.parent_id,
record.category,
record.link_URL,
record.link_title_or_comment,
)
)
if record.category:
record._replace(
ID_if_parent=max([record[0] for record in self.link_data if record[0]])
)
self.link_data.insert(0, list(record))
self.generate_category_data()
else:
self.link_data.insert(0, list(record))
def generate_category_data(self):
"""generate categories list and category count from sorted link data"""
self.categories.clear()
i = (record for record in self.link_data if record[4] != "")
for record in i:
name = record[4]
timestamp = record[2]
if name not in [cat_record["name"] for cat_record in self.categories]:
self.categories.append(
{"name": name, "count": 1, "last_updated": timestamp}
)
else:
for cat_record in self.categories:
if cat_record["name"] == name:
cat_record["count"] += 1
if cat_record["last_updated"] < timestamp:
cat_record["last_updated"] = timestamp

View File

@ -4,7 +4,6 @@
## If this script contains bugs, blame cmccabe.
import getpass
import os
import signal
import subprocess
import sys
@ -15,60 +14,25 @@ from shutil import which
import data
import config
import posts
from styling import style_text
## id (if parent), username, datestamp, parent-id, category, link-url, link-title
link_data: list = []
categories: list = []
category_counts: dict = {}
LinkData = data.LinkData()
link_data: list = LinkData.link_data
categories: list = LinkData.categories
def graceful_exit():
"""Prints a nice message, performs cleanup and then exits"""
print("\n\nThank you for linkulating. Goodbye.\n")
config.USER.save()
sys.exit(0)
def parse_ignore_file() -> list:
"""reads the current user's ignore file, returns a list of usernames to ignore"""
ignore_names: list = []
if config.USER.ignorefile.exists():
_s = config.USER.ignorefile.read_text()
_l = _s.splitlines()
for line in _l:
name = line.split(" ")[0]
ignore_names.append(name)
return ignore_names
def fetch_data(ignore_names):
"""fetches data from the file store, assigning it to the relevant global variables"""
global link_data
global categories
global category_counts
link_data, categories, category_counts = data.get(config, ignore_names)
if len(link_data) == 0:
print("It looks link there are no links yet. Run 'linkulator -p' to add one.")
graceful_exit()
def print_categories(categories):
def print_categories():
"""Prints the list of categories with an indicator for new activity"""
print("\n{:>4s} New {:<25s}".format("ID#", "Category"))
for i, cat in enumerate(categories):
new_links = [
1
for line in link_data
if line[2] >= config.USER.lastlogin and cat == line[4]
]
count = len(new_links)
for i, record in enumerate(categories):
print(
"{:4d} {} {} ({})".format(
i + 1, "x" if count else " ", cat, category_counts[cat]
i + 1,
"x" if record["last_updated"] >= config.USER.lastlogin else " ",
record["name"],
record["count"],
)
)
@ -84,7 +48,7 @@ def print_category_details(view_cat):
thread_index = {}
for line in link_data:
if line[4] == view_cat:
if line[4] == view_cat["name"]:
link_count += 1
thread_index[link_count] = str(line[0])
parent_id = line[1] + "+" + str(line[2])
@ -164,7 +128,7 @@ def view_link_in_browser(url):
subprocess.call([config.USER.browser, url])
def reply(owner, tstamp):
def reply(parent_user, parent_timestamp):
"""Prompt for reply, validate input, save validated input to disk and update
link_data. Calls view_thread when complete."""
while True:
@ -172,42 +136,92 @@ def reply(owner, tstamp):
if comment == "":
input("Reply aborted. Hit [Enter] to continue.")
break
elif not posts.is_valid(comment):
if not is_valid_input(comment):
print(
"Entries consisting of whitespace, or containing pipes, '|', are "
"not valid.Please try again."
)
else:
if os.path.exists(config.USER.datafile):
append_write = "a" # append if already exists
else:
append_write = "w+" # make a new file if not
with open(config.USER.datafile, append_write) as file:
timestamp = str(time.time())
file.write(
timestamp + "|" + owner + "+" + tstamp + "|||" + comment + "\n"
)
link_data.insert(
0,
[
"",
getpass.getuser(),
timestamp,
owner + "+" + tstamp,
"",
"",
comment,
],
record = data.LinkDataRecord(
username=getpass.getuser(),
timestamp=str(time.time()),
parent_id="{}+{}".format(parent_user, parent_timestamp),
link_title_or_comment=comment,
)
LinkData.add(record)
input("Reply added. Hit [Enter] to return to thread.")
break
def is_valid_input(entry: str) -> bool:
"""Determine validity of an input string
>>> is_valid_input("valid")
True
>>> is_valid_input("Not|valid")
False
>>> is_valid_input(" ")
False
"""
if "|" in entry:
return False
if entry.strip() == "":
return False
return True
def get_input(item: str) -> str:
"""Get user input with the specified prompt, validate and return it, or
break if invalid"""
while True:
i: str = input("{}: ".format(style_text(item, "underline")))
if i == "":
raise ValueError("Empty field")
if is_valid_input(i):
return i
print(
"Entries consisting of whitespace, or containing pipes, '|', are "
"not valid.Please try again."
)
def post_link():
"""Handles the link posting process"""
category_text = (
"None"
if not categories
else ", ".join(sorted(record["name"] for record in categories))
)
print("\nEnter link information here. Leaving any field blank aborts " "post.\n")
try:
url = get_input("URL")
print(
"Available categories: {}\n"
"♻️ Please help keep Linkulator tidy".format(category_text)
)
category = get_input("Category")
title = get_input("Title")
except ValueError:
print("Post cancelled")
return
record = data.LinkDataRecord(
username=getpass.getuser(),
timestamp=str(time.time()),
category=category,
link_URL=url,
link_title_or_comment=title,
)
LinkData.add(record)
def search(keyword):
"""Search function - not yet complete"""
print("Doesn't work yet. Would be searching title, category, comment for ", keyword)
raise NotImplementedError
## PSEUDOCODE:
@ -227,11 +241,14 @@ def search(keyword):
## if next_step...
def menu_view_categories(categories):
## CONTROLS
def menu_view_categories():
"""Displays list of categories, takes keyboard input and
executes corresponding functions."""
while True:
print_categories(categories)
print_categories()
view_cat = input(
"\nEnter a category ID, {} to post a link, or {} to quit: ".format(
@ -243,7 +260,7 @@ def menu_view_categories(categories):
return
if view_cat == "p":
# pass a copy of categories so it is not modified
posts.post_link(categories[:])
post_link()
else:
try:
cat_index = categories[int(view_cat) - 1]
@ -259,7 +276,7 @@ def menu_view_category_details(cat_index):
while True:
thread_index = print_category_details(cat_index)
pid = input(
option = input(
"Enter a post ID to see its thread, {} to go back, {} to "
"post a link, or {} to quit: ".format(
style_text("m", "underline"),
@ -268,18 +285,18 @@ def menu_view_category_details(cat_index):
)
).lower()
if pid == "q":
if option == "q":
graceful_exit()
if pid == "m":
if option == "m":
return
if pid == "p":
posts.post_link(categories)
if option == "p":
post_link()
else:
try:
link = thread_index[int(pid)]
link = thread_index[int(option)]
menu_view_thread_details(link)
except (KeyError, ValueError):
# Catch a pid that is not in the thread list or is not a number
# Catch a Post ID that is not in the thread list or is not a number
print("{}\n\n".format(style_text("Invalid category ID/entry", "bold")))
@ -295,19 +312,41 @@ def menu_view_thread_details(post_id):
while True:
parent_user, parent_timestamp, url = print_thread_details(post_id)
next_step = input(next_text).lower()
if next_step == "m":
option = input(next_text).lower()
if option == "m":
return
if next_step == "b":
if option == "b":
view_link_in_browser(url)
elif next_step == "r":
elif option == "r":
reply(parent_user, parent_timestamp)
elif next_step == "q":
elif option == "q":
graceful_exit()
else:
print("{}\n\n".format(style_text("Invalid entry", "bold")))
## GENERAL
def style_text(text: str, *args) -> str:
"""Style input strings as specified using terminal escape sequences. Returns a styled string"""
styles = {
"bold": "\033[1m",
"dim": "\033[2m",
"underline": "\033[4m",
"blink": "\033[5m", # This is here if you REALLY need it...dont use it.
# (ctrl+shift+esc+E to enable evil mode.)
"inverse": "\033[7m", # Make fg and bg color swap
}
out = ""
for arg in args:
if arg in styles:
out += styles[arg]
out += text
out += "\033[0m"
return out
def print_banner():
"""prints a banner"""
print(" ----------")
@ -315,6 +354,13 @@ def print_banner():
print(" ----------")
def graceful_exit():
"""Prints a nice message, performs cleanup and then exits"""
print("\n\nThank you for linkulating. Goodbye.\n")
config.USER.save()
sys.exit(0)
def signal_handler(sig, frame):
"""handle signals, exiting on SIGINT"""
graceful_exit()
@ -327,8 +373,7 @@ def main():
config.init()
if not args:
print_banner()
fetch_data(parse_ignore_file())
menu_view_categories(categories)
menu_view_categories()
elif args[0] in ["-h", "--help", "help"]:
print(HELP_TEXT)
else:

View File

@ -1,95 +0,0 @@
#!/usr/bin/env python3
"""handle the linkulator post process"""
import getpass
import os
from time import time
import config
from styling import style_text
USERNAME = getpass.getuser()
class Link:
"""represents a single link's data"""
url: str
category: str
title: str
timestamp: str
def is_valid(entry: str) -> bool:
"""Determine validity of an input string
>>> is_valid("valid")
True
>>> is_valid("Not|valid")
False
>>> is_valid(" ")
False
"""
if "|" in entry:
return False
if entry.strip() == "":
return False
return True
def get_input(item: str) -> str:
"""Get user input with the specified prompt, validate and return it, or
break if invalid"""
while True:
i: str = input("{}: ".format(style_text(item, "underline")))
if i == "":
raise ValueError("Empty field")
if is_valid(i):
return i
print(
"Entries consisting of whitespace, or containing pipes, '|', are "
"not valid.Please try again."
)
def save_link(link):
"""Saves the specified link data to the user's data file"""
if os.path.exists(config.USER.datafile):
append_write = "a" # append if already exists
else:
append_write = "w+" # make a new file if not
with open(config.USER.datafile, append_write) as file:
file.write(
link.timestamp
+ "||"
+ link.category
+ "|"
+ link.url
+ "|"
+ link.title
+ "\n"
)
print("Link added!")
def post_link(categories: list):
"""Handles the link posting process"""
link = Link()
category_text = "None" if not categories else ", ".join(sorted(categories))
print("\nEnter link information here. Leaving any field blank aborts " "post.\n")
try:
link.url = get_input("URL")
print(
"Available categories: {}\n"
"♻️ Please help keep Linkulator tidy".format(category_text)
)
link.category = get_input("Category")
link.title = get_input("Title")
except ValueError:
print("Post cancelled")
return
link.timestamp = str(time())
save_link(link)

View File

@ -1,19 +0,0 @@
"""Handles terminal text styles"""
def style_text(text, *args):
styles = {
"bold": "\033[1m",
"dim": "\033[2m",
"underline": "\033[4m",
"blink": "\033[5m", # This is here if you REALLY need it...dont use it. (ctrl+shift+esc+E to enable evil mode.)
"inverse": "\033[7m", # Make fg and bg color swap
}
out = ""
for arg in args:
if arg in styles:
out += styles[arg]
out += text
out += "\033[0m"
return out

109
tests/data_test.py Normal file
View File

@ -0,0 +1,109 @@
"""unit tests for the data module"""
import unittest
from time import time
import data
class TestDataHelperFunctions(unittest.TestCase):
"""Tests that cover helper functions, mostly handling string validation"""
def test_wash_line(self):
"""tests the data.wash_line function"""
teststrings = [
{"Test": "A line of text", "Result": "A line of text"},
{"Test": "A line of text\n", "Result": "A line of text"},
{"Test": "\033[95mPink text\033[0m", "Result": "Pink text"},
{"Test": "🚅\t\n", "Result": "🚅"},
{
"Test": "gemini://gemini.circumlunar.space",
"Result": "gemini://gemini.circumlunar.space",
},
]
for line in teststrings:
self.assertEqual(data.wash_line(line["Test"]), line["Result"])
def test_is_well_formed_line(self):
""" tests the data.is_well_formed_line function"""
teststrings = [
{"Test": "A line of text", "Result": False},
{"Test": "1 Pipe |", "Result": False},
{"Test": "4 Pipes ||||", "Result": True},
{"Test": "5 Pipes |||||", "Result": False},
{"Test": "|P|I|P|E|H|E|A|V|E|N||||||||||", "Result": False},
]
for line in teststrings:
self.assertEqual(data.is_well_formed_line(line["Test"]), line["Result"])
def test_is_valid_time(self):
"""tests the data.is_valid_time function"""
teststrings = [
{"Test": "946645140.0", "Result": True}, # 1999
{"Test": str(time() + 10), "Result": False},
]
for line in teststrings:
self.assertEqual(data.is_valid_time(line["Test"]), line["Result"])
def test_process(self):
"""tests the data.process function"""
# wash line
self.assertEqual(
data.process("\t|\033[95mPink text\033[0m|||\n", ""),
["", "", "Pink text", "", "", ""],
)
# is well formed line
with self.assertRaises(ValueError, msg="Not a well formed line"):
data.process("|||||\n", "")
# is valid date
with self.assertRaises(ValueError, msg="Invalid date"):
data.process("{}||||".format(str(time() + 10)), "")
# real data tests
teststrings_input = [
"1576123922.106229|user+1576032469.7391915|||a new reply\n",
"1576137798.4647715|user+1576032469.7391915|||Here is another new reply\n",
"575968281.7483418||tildes|gopher://tilde.town|Tilde Town\n",
"1575969313.8278663||tildes|http://tilde.team|Tilde Team\n",
]
teststrings_output = [
[
"username0",
"1576123922.106229",
"user+1576032469.7391915",
"",
"",
"a new reply",
],
[
"username1",
"1576137798.4647715",
"user+1576032469.7391915",
"",
"",
"Here is another new reply",
],
[
"username2",
"575968281.7483418",
"",
"tildes",
"gopher://tilde.town",
"Tilde Town",
],
[
"username3",
"1575969313.8278663",
"",
"tildes",
"http://tilde.team",
"Tilde Team",
],
]
for i, item in enumerate(teststrings_input):
self.assertEqual(
data.process(item, "username{}".format(i)), teststrings_output[i]
)
if __name__ == "__main__":
unittest.main()