linkulator2/data.py

283 lines
10 KiB
Python

#!/usr/bin/env python3
"""This module takes input and returns link_data, the data structure linkulator works from"""
from time import time
from typing import NamedTuple
from pathlib import PurePath
from glob import glob
import re
import os
import config
# regex for removing escape characters from https://stackoverflow.com/a/14693789
ESCAPE_CHARS = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
BAD_CHARS = re.compile(r"[\t\r\n\f\v]*")
class LinkDataRecord(NamedTuple):
"""Represents a record in LinkData.link_data"""
ID_if_parent: str = ""
username: str = ""
timestamp: str = ""
parent_id: str = ""
category: str = ""
link_URL: str = ""
link_title_or_comment: str = ""
def is_well_formed_line(line: str) -> bool:
"""Checks if current line is valid or not, returns true or false respectively."""
pipe_count = (
4 ## A PROPERLY FORMATED LINE IN linkulator.data HAS EXACTLY FOUR PIPES.
)
return line.count("|") == pipe_count
def is_valid_time(timestamp: str) -> bool:
"""identifies future dated timestamps - returns true if valid time, false if invalid"""
return float(timestamp) < time()
def wash_line(line: str) -> str:
"""take line and return a version with bad characters removed"""
line = ESCAPE_CHARS.sub("", line)
line = BAD_CHARS.sub("", line)
return line
def process(line: str, file_owner: str) -> list:
"""Takes a line, returns a list based on the delimeter pipe character"""
if not is_well_formed_line(line):
raise ValueError("Not a well formed record")
line = wash_line(line)
split_line: list = line.split("|")
if split_line[0] and not is_valid_time(split_line[0]):
raise ValueError("Invalid date")
split_line.insert(0, file_owner)
return split_line
def parse_ignore_file() -> list:
"""reads the current user's ignore file, returns a list of usernames to ignore"""
ignore_names: list = []
if config.USER.ignorefile.exists():
_s = config.USER.ignorefile.read_text()
_l = _s.splitlines()
for line in _l:
name = line.split(" ")[0]
ignore_names.append(name)
return ignore_names
def get_parent_record(parent_id: str, link_data: list) -> list:
"""given a parent ID, return the ID for the parent record or -1"""
if parent_id == "":
raise ValueError("parent_id cannot be empty")
for record in link_data:
timestamp = record[2]
if timestamp == parent_id.partition("+")[2]:
return record
raise KeyError("there's no parent record for the specified parent_id")
class LinkData:
"""Class that contains link_data, categories and categories count tables,
plus methods to generate and update these items"""
def __init__(self):
self.link_data: list = []
self.categories: list = []
self.get()
def get(self):
"""reads data files for non-ignored users, sets valid data in
linkulator formats
whenever this function is called, the data is refreshed from files.
since disk io is probably the heaviest part of this script, don't do
this often."""
ignore_names = parse_ignore_file()
files_pattern = str(
PurePath(config.PATHS.all_homedir_pattern).joinpath(
config.PATHS.datadir, config.PATHS.datafile
)
)
linkulator_files = glob(files_pattern)
id_iterator = 1
for filename in linkulator_files:
with open(filename) as cfile:
# get file owner username from path
file_owner = PurePath(filename).parent.parent.name
if file_owner in ignore_names:
# ignore names found in ignore file
continue
for line in cfile:
try:
split_line = process(line, file_owner)
except ValueError:
continue
# assign parent items (links) an ID
if split_line[2] == "":
split_line.insert(0, id_iterator)
id_iterator += 1
else:
split_line.insert(0, "")
self.link_data.append(split_line)
self.sort_link_data()
self.generate_category_data()
def sort_link_data(self):
"""sort link_data by creation date"""
self.link_data.sort(key=lambda x: x[2], reverse=True)
def add(self, record) -> int:
"""Add a record to the data file, and to link_data. Returns a new post
ID, if record is a post, or -1"""
if os.path.exists(config.USER.datafile):
append_write = "a" # append if already exists
else:
append_write = "w+" # make a new file if not
with open(config.USER.datafile, append_write) as file:
file.write(
"{}|{}|{}|{}|{}\n".format(
record.timestamp,
record.parent_id,
record.category,
record.link_URL,
record.link_title_or_comment,
)
)
new_post_id = -1
if record.category:
if self.link_data:
new_post_id = (
max([record[0] if record[0] else 0 for record in self.link_data])
+ 1
)
else:
new_post_id = 1
record = record._replace(ID_if_parent=new_post_id)
self.link_data.insert(0, list(record))
else:
self.link_data.insert(0, list(record))
self.generate_category_data()
return new_post_id
def generate_category_data(self):
"""generate categories list and category count from sorted link data"""
self.categories.clear()
for record in self.link_data:
name = record[4]
timestamp = record[2]
if name != "": # only replies have column 4 empty
if name not in [cat_record["name"] for cat_record in self.categories]:
self.categories.append(
{"name": name, "count": 1, "last_updated": timestamp}
)
else:
for cat_record in self.categories:
if cat_record["name"] == name:
cat_record["count"] += 1
if cat_record["last_updated"] < timestamp:
cat_record["last_updated"] = timestamp
else:
parent_id = record[3]
try:
parent_record = get_parent_record(parent_id, self.link_data)
except KeyError:
continue
parent_cat_name = parent_record[4]
if parent_cat_name not in [
cat_record["name"] for cat_record in self.categories
]:
self.categories.append(
# append a record, but set the count to 0 because the parent record will be counted at some stage
{"name": parent_cat_name, "count": 0, "last_updated": timestamp}
)
else:
for cat_record in self.categories:
if cat_record["name"] == parent_cat_name:
if cat_record["last_updated"] < timestamp:
cat_record["last_updated"] = timestamp
def search(self, keyword: str) -> list:
"""returns a unique list of link_data records for posts that contain
the specified keyword"""
if keyword == "":
raise ValueError("a search keyword must be specified")
query = (
record
for record in self.link_data
if keyword.lower() in str(record).lower()
)
if query:
search_results: set = set()
for record in query:
post_id = record[0]
parent_id = record[3]
if post_id:
search_results.add(tuple(record))
else:
try:
parent_record = get_parent_record(parent_id, self.link_data)
except KeyError:
continue
search_results.add(tuple(parent_record))
return sorted(search_results, key=lambda x: x[0], reverse=True)
def list_category_details(self, selected_category: str) -> list:
"""returns a sorted list of posts belonging to the specified category"""
links = []
for record in self.link_data:
category = record[4]
if category == selected_category:
postid = record[0]
userid = record[1]
timestamp = record[2]
parent_id = userid + "+" + str(timestamp)
description = record[6]
replies = [i for i in self.link_data if i[3] == parent_id]
new_replies = [i for i in replies if i[2] >= config.USER.lastlogin]
if replies:
last_modified_timestamp = str(
max(
float(timestamp),
max([float(i[2]) for i in replies if i[2]]),
)
)
else:
last_modified_timestamp = timestamp
has_new_replies = (
True if new_replies or timestamp >= config.USER.lastlogin else False
)
links.append(
{
"postid": postid,
"link_timestamp": timestamp,
"link_author": userid,
"reply_count": len(replies),
"description": description,
"has_new_replies": has_new_replies,
"last_modified_timestamp": last_modified_timestamp,
}
)
return sorted(links, key=lambda x: x["last_modified_timestamp"], reverse=True)