237 lines
8.2 KiB
Python
237 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
"""This module takes input and returns link_data, the data structure linkulator works from"""
|
|
from time import time
|
|
from typing import NamedTuple
|
|
from pathlib import PurePath
|
|
from glob import glob
|
|
import re
|
|
import os
|
|
|
|
import config
|
|
|
|
# regex for removing escape characters from https://stackoverflow.com/a/14693789
|
|
ESCAPE_CHARS = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
|
|
BAD_CHARS = re.compile(r"[\t\r\n\f\v]*")
|
|
|
|
|
|
class LinkDataRecord(NamedTuple):
|
|
"""Represents a record in LinkData.link_data"""
|
|
|
|
ID_if_parent: str = ""
|
|
username: str = ""
|
|
timestamp: str = ""
|
|
parent_id: str = ""
|
|
category: str = ""
|
|
link_URL: str = ""
|
|
link_title_or_comment: str = ""
|
|
|
|
|
|
def is_well_formed_line(line: str) -> bool:
|
|
"""Checks if current line is valid or not, returns true or false respectively."""
|
|
pipe_count = (
|
|
4 ## A PROPERLY FORMATED LINE IN linkulator.data HAS EXACTLY FOUR PIPES.
|
|
)
|
|
return line.count("|") == pipe_count
|
|
|
|
|
|
def is_valid_time(timestamp: str) -> bool:
|
|
"""identifies future dated timestamps - returns true if valid time, false if invalid"""
|
|
return float(timestamp) < time()
|
|
|
|
|
|
def wash_line(line: str) -> str:
|
|
"""take line and return a version with bad characters removed"""
|
|
line = ESCAPE_CHARS.sub("", line)
|
|
line = BAD_CHARS.sub("", line)
|
|
return line
|
|
|
|
|
|
def process(line: str, file_owner: str) -> list:
|
|
"""Takes a line, returns a list based on the delimeter pipe character"""
|
|
if not is_well_formed_line(line):
|
|
raise ValueError("Not a well formed record")
|
|
line = wash_line(line)
|
|
split_line: list = line.split("|")
|
|
if split_line[0] and not is_valid_time(split_line[0]):
|
|
raise ValueError("Invalid date")
|
|
split_line.insert(0, file_owner)
|
|
return split_line
|
|
|
|
|
|
def parse_ignore_file() -> list:
|
|
"""reads the current user's ignore file, returns a list of usernames to ignore"""
|
|
ignore_names: list = []
|
|
if config.USER.ignorefile.exists():
|
|
_s = config.USER.ignorefile.read_text()
|
|
_l = _s.splitlines()
|
|
for line in _l:
|
|
name = line.split(" ")[0]
|
|
ignore_names.append(name)
|
|
return ignore_names
|
|
|
|
|
|
def get_parent_record(parent_id: str, link_data: list) -> list:
|
|
"""given a parent ID, return the ID for the parent record or -1"""
|
|
if parent_id == "":
|
|
raise ValueError("parent_id cannot be empty")
|
|
for record in link_data:
|
|
timestamp = record[2]
|
|
if timestamp == parent_id.partition("+")[2]:
|
|
return record
|
|
raise KeyError("there's no parent record for the specified parent_id")
|
|
|
|
|
|
class LinkData:
|
|
"""Class that contains link_data, categories and categories count tables,
|
|
plus methods to generate and update these items"""
|
|
|
|
def __init__(self):
|
|
self.link_data: list = []
|
|
self.categories: list = []
|
|
|
|
self.get()
|
|
|
|
def get(self):
|
|
"""reads data files for non-ignored users, sets valid data in
|
|
linkulator formats
|
|
|
|
whenever this function is called, the data is refreshed from files.
|
|
since disk io is probably the heaviest part of this script, don't do
|
|
this often."""
|
|
|
|
ignore_names = parse_ignore_file()
|
|
|
|
files_pattern = str(
|
|
PurePath(config.PATHS.all_homedir_pattern).joinpath(
|
|
config.PATHS.datadir, config.PATHS.datafile
|
|
)
|
|
)
|
|
linkulator_files = glob(files_pattern)
|
|
|
|
id_iterator = 1
|
|
|
|
for filename in linkulator_files:
|
|
with open(filename) as cfile:
|
|
# get file owner username from path
|
|
file_owner = PurePath(filename).parent.parent.name
|
|
if file_owner in ignore_names:
|
|
# ignore names found in ignore file
|
|
continue
|
|
for line in cfile:
|
|
try:
|
|
split_line = process(line, file_owner)
|
|
except ValueError:
|
|
continue
|
|
|
|
# assign parent items (links) an ID
|
|
if split_line[2] == "":
|
|
split_line.insert(0, id_iterator)
|
|
id_iterator += 1
|
|
else:
|
|
split_line.insert(0, "")
|
|
|
|
self.link_data.append(split_line)
|
|
|
|
self.sort_link_data()
|
|
self.generate_category_data()
|
|
|
|
def sort_link_data(self):
|
|
"""sort link_data by creation date"""
|
|
self.link_data.sort(key=lambda x: x[2], reverse=True)
|
|
|
|
def add(self, record) -> int:
|
|
"""Add a record to the data file, and to link_data. Returns a new post
|
|
ID, if record is a post, or -1"""
|
|
if os.path.exists(config.USER.datafile):
|
|
append_write = "a" # append if already exists
|
|
else:
|
|
append_write = "w+" # make a new file if not
|
|
with open(config.USER.datafile, append_write) as file:
|
|
file.write(
|
|
"{}|{}|{}|{}|{}\n".format(
|
|
record.timestamp,
|
|
record.parent_id,
|
|
record.category,
|
|
record.link_URL,
|
|
record.link_title_or_comment,
|
|
)
|
|
)
|
|
|
|
new_post_id = -1
|
|
if record.category:
|
|
if self.link_data:
|
|
new_post_id = (
|
|
max([record[0] if record[0] else 0 for record in self.link_data])
|
|
+ 1
|
|
)
|
|
else:
|
|
new_post_id = 1
|
|
record = record._replace(ID_if_parent=new_post_id)
|
|
self.link_data.insert(0, list(record))
|
|
else:
|
|
self.link_data.insert(0, list(record))
|
|
self.generate_category_data()
|
|
return new_post_id
|
|
|
|
def generate_category_data(self):
|
|
"""generate categories list and category count from sorted link data"""
|
|
self.categories.clear()
|
|
for record in self.link_data:
|
|
name = record[4]
|
|
timestamp = record[2]
|
|
if name != "":
|
|
if name not in [cat_record["name"] for cat_record in self.categories]:
|
|
self.categories.append(
|
|
{"name": name, "count": 1, "last_updated": timestamp}
|
|
)
|
|
else:
|
|
for cat_record in self.categories:
|
|
if cat_record["name"] == name:
|
|
cat_record["count"] += 1
|
|
if cat_record["last_updated"] < timestamp:
|
|
cat_record["last_updated"] = timestamp
|
|
else:
|
|
parent_id = record[3]
|
|
try:
|
|
parent_record = get_parent_record(parent_id, self.link_data)
|
|
except KeyError:
|
|
continue
|
|
parent_cat_name = parent_record[4]
|
|
if parent_cat_name not in [cat_record["name"] for cat_record in self.categories]:
|
|
self.categories.append(
|
|
{"name": parent_cat_name, "count": 1, "last_updated": timestamp}
|
|
)
|
|
else:
|
|
for cat_record in self.categories:
|
|
if cat_record["name"] == parent_cat_name:
|
|
if cat_record["last_updated"] < timestamp:
|
|
cat_record["last_updated"] = timestamp
|
|
|
|
def search(self, keyword: str) -> list:
|
|
"""returns a unique list of link_data records for posts that contain
|
|
the specified keyword"""
|
|
if keyword == "":
|
|
raise ValueError("a search keyword must be specified")
|
|
|
|
query = (
|
|
record
|
|
for record in self.link_data
|
|
if keyword.lower() in str(record).lower()
|
|
)
|
|
|
|
if query:
|
|
search_results: set = set()
|
|
for record in query:
|
|
post_id = record[0]
|
|
parent_id = record[3]
|
|
if post_id:
|
|
search_results.add(tuple(record))
|
|
else:
|
|
try:
|
|
parent_record = get_parent_record(parent_id, self.link_data)
|
|
except KeyError:
|
|
continue
|
|
search_results.add(tuple(parent_record))
|
|
|
|
return sorted(search_results, key=lambda x: x[0], reverse=True)
|