#!/usr/bin/env python3 """This module takes input and returns link_data, the data structure linkulator works from""" from time import time from typing import NamedTuple from pathlib import PurePath from glob import glob import re import os import config # regex for removing escape characters from https://stackoverflow.com/a/14693789 ESCAPE_CHARS = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]") BAD_CHARS = re.compile(r"[\t\r\n\f\v]*") class LinkDataRecord(NamedTuple): """Represents a record in LinkData.link_data""" ID_if_parent: str = "" username: str = "" timestamp: str = "" parent_id: str = "" category: str = "" link_URL: str = "" link_title_or_comment: str = "" def is_well_formed_line(line: str) -> bool: """Checks if current line is valid or not, returns true or false respectively.""" pipe_count = ( 4 ## A PROPERLY FORMATED LINE IN linkulator.data HAS EXACTLY FOUR PIPES. ) return line.count("|") == pipe_count def is_valid_time(timestamp: str) -> bool: """identifies future dated timestamps - returns true if valid time, false if invalid""" return float(timestamp) < time() def wash_line(line: str) -> str: """take line and return a version with bad characters removed""" line = ESCAPE_CHARS.sub("", line) line = BAD_CHARS.sub("", line) return line def process(line: str, file_owner: str) -> list: """Takes a line, returns a list based on the delimeter pipe character""" if not is_well_formed_line(line): raise ValueError("Not a well formed record") line = wash_line(line) split_line: list = line.split("|") if split_line[0] and not is_valid_time(split_line[0]): raise ValueError("Invalid date") split_line.insert(0, file_owner) return split_line def parse_ignore_file() -> list: """reads the current user's ignore file, returns a list of usernames to ignore""" ignore_names: list = [] if config.USER.ignorefile.exists(): _s = config.USER.ignorefile.read_text() _l = _s.splitlines() for line in _l: name = line.split(" ")[0] ignore_names.append(name) return ignore_names def get_parent_record(parent_id: str, link_data: list) -> list: """given a parent ID, return the ID for the parent record or -1""" if parent_id == "": raise ValueError("parent_id cannot be empty") for record in link_data: timestamp = record[2] if timestamp == parent_id.partition("+")[2]: return record raise KeyError("there's no parent record for the specified parent_id") class LinkData: """Class that contains link_data, categories and categories count tables, plus methods to generate and update these items""" def __init__(self): self.link_data: list = [] self.categories: list = [] self.get() def get(self): """reads data files for non-ignored users, sets valid data in linkulator formats whenever this function is called, the data is refreshed from files. since disk io is probably the heaviest part of this script, don't do this often.""" ignore_names = parse_ignore_file() files_pattern = str( PurePath(config.PATHS.all_homedir_pattern).joinpath( config.PATHS.datadir, config.PATHS.datafile ) ) linkulator_files = glob(files_pattern) id_iterator = 1 for filename in linkulator_files: with open(filename) as cfile: # get file owner username from path file_owner = PurePath(filename).parent.parent.name if file_owner in ignore_names: # ignore names found in ignore file continue for line in cfile: try: split_line = process(line, file_owner) except ValueError: continue # assign parent items (links) an ID if split_line[2] == "": split_line.insert(0, id_iterator) id_iterator += 1 else: split_line.insert(0, "") self.link_data.append(split_line) self.sort_link_data() self.generate_category_data() def sort_link_data(self): """sort link_data by creation date""" self.link_data.sort(key=lambda x: x[2], reverse=True) def add(self, record) -> int: """Add a record to the data file, and to link_data. Returns a new post ID, if record is a post, or -1""" if os.path.exists(config.USER.datafile): append_write = "a" # append if already exists else: append_write = "w+" # make a new file if not with open(config.USER.datafile, append_write) as file: file.write( "{}|{}|{}|{}|{}\n".format( record.timestamp, record.parent_id, record.category, record.link_URL, record.link_title_or_comment, ) ) new_post_id = -1 if record.category: if self.link_data: new_post_id = ( max([record[0] if record[0] else 0 for record in self.link_data]) + 1 ) else: new_post_id = 1 record = record._replace(ID_if_parent=new_post_id) self.link_data.insert(0, list(record)) else: self.link_data.insert(0, list(record)) self.generate_category_data() return new_post_id def generate_category_data(self): """generate categories list and category count from sorted link data""" self.categories.clear() for record in self.link_data: name = record[4] timestamp = record[2] if name != "": # only replies have column 4 empty if name not in [cat_record["name"] for cat_record in self.categories]: self.categories.append( {"name": name, "count": 1, "last_updated": timestamp} ) else: for cat_record in self.categories: if cat_record["name"] == name: cat_record["count"] += 1 if cat_record["last_updated"] < timestamp: cat_record["last_updated"] = timestamp else: parent_id = record[3] try: parent_record = get_parent_record(parent_id, self.link_data) except KeyError: continue parent_cat_name = parent_record[4] if parent_cat_name not in [ cat_record["name"] for cat_record in self.categories ]: self.categories.append( # append a record, but set the count to 0 because the parent record will be counted at some stage {"name": parent_cat_name, "count": 0, "last_updated": timestamp} ) else: for cat_record in self.categories: if cat_record["name"] == parent_cat_name: if cat_record["last_updated"] < timestamp: cat_record["last_updated"] = timestamp def search(self, keyword: str) -> list: """returns a unique list of link_data records for posts that contain the specified keyword""" if keyword == "": raise ValueError("a search keyword must be specified") query = ( record for record in self.link_data if keyword.lower() in str(record).lower() ) if query: search_results: set = set() for record in query: post_id = record[0] parent_id = record[3] if post_id: search_results.add(tuple(record)) else: try: parent_record = get_parent_record(parent_id, self.link_data) except KeyError: continue search_results.add(tuple(parent_record)) return sorted(search_results, key=lambda x: x[0], reverse=True) def list_category_details(self, selected_category: str) -> list: """returns a sorted list of posts belonging to the specified category""" links = [] for record in self.link_data: category = record[4] if category == selected_category: postid = record[0] userid = record[1] timestamp = record[2] parent_id = userid + "+" + str(timestamp) description = record[6] replies = [i for i in self.link_data if i[3] == parent_id] new_replies = [i for i in replies if i[2] >= config.USER.lastlogin] if replies: last_modified_timestamp = str( max( float(timestamp), max([float(i[2]) for i in replies if i[2]]), ) ) else: last_modified_timestamp = timestamp has_new_replies = ( True if new_replies or timestamp >= config.USER.lastlogin else False ) links.append( { "postid": postid, "link_timestamp": timestamp, "link_author": userid, "reply_count": len(replies), "description": description, "has_new_replies": has_new_replies, "last_modified_timestamp": last_modified_timestamp, } ) return sorted(links, key=lambda x: x["last_modified_timestamp"], reverse=True)