#!/usr/bin/env python3 """This module takes input and returns link_data, the data structure linkulator works from""" from time import time from typing import NamedTuple from pathlib import PurePath from glob import glob import re import os import config # regex for removing escape characters from https://stackoverflow.com/a/14693789 ESCAPE_CHARS = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]") BAD_CHARS = re.compile(r"[\t\r\n\f\v]*") class LinkDataRecord(NamedTuple): """Represents a record in LinkData.link_data""" ID_if_parent: str = "" username: str = "" timestamp: str = "" parent_id: str = "" category: str = "" link_URL: str = "" link_title_or_comment: str = "" def is_well_formed_line(line: str) -> bool: """Checks if current line is valid or not, returns true or false respectively.""" pipe_count = ( 4 ## A PROPERLY FORMATED LINE IN linkulator.data HAS EXACTLY FOUR PIPES. ) return line.count("|") == pipe_count def is_valid_time(timestamp: str) -> bool: """identifies future dated timestamps - returns true if valid time, false if invalid""" return float(timestamp) < time() def wash_line(line: str) -> str: """take line and return a version with bad characters removed""" line = ESCAPE_CHARS.sub("", line) line = BAD_CHARS.sub("", line) return line def process(line: str, file_owner: str) -> list: """Takes a line, returns a list based on the delimeter pipe character""" if not is_well_formed_line(line): raise ValueError("Not a well formed record") line = wash_line(line) split_line: list = line.split("|") if split_line[0] and not is_valid_time(split_line[0]): raise ValueError("Invalid date") split_line.insert(0, file_owner) return split_line def parse_ignore_file() -> list: """reads the current user's ignore file, returns a list of usernames to ignore""" ignore_names: list = [] if config.USER.ignorefile.exists(): _s = config.USER.ignorefile.read_text() _l = _s.splitlines() for line in _l: name = line.split(" ")[0] ignore_names.append(name) return ignore_names class LinkData: """Class that contains link_data, categories and categories count tables, plus methods to generate and update these items""" def __init__(self): self.link_data: list = [] self.categories: list = [] self.get() def get(self): """reads data files for non-ignored users, sets valid data in linkulator formats whenever this function is called, the data is refreshed from files. since disk io is probably the heaviest part of this script, don't do this often.""" ignore_names = parse_ignore_file() files_pattern = str( PurePath(config.PATHS.all_homedir_pattern).joinpath( config.PATHS.datadir, config.PATHS.datafile ) ) linkulator_files = glob(files_pattern) id_iterator = 1 for filename in linkulator_files: with open(filename) as cfile: # get file owner username from path file_owner = PurePath(filename).parent.parent.name if file_owner in ignore_names: # ignore names found in ignore file continue for line in cfile: try: split_line = process(line, file_owner) except ValueError: continue # assign parent items (links) an ID if split_line[2] == "": split_line.insert(0, id_iterator) id_iterator += 1 else: split_line.insert(0, "") self.link_data.append(split_line) self.sort_link_data() self.generate_category_data() def sort_link_data(self): """sort link_data by creation date""" self.link_data.sort(key=lambda x: x[2], reverse=True) def add(self, record) -> int: """Add a record to the data file, and to link_data. Returns a new post ID, if record is a post, or -1""" if os.path.exists(config.USER.datafile): append_write = "a" # append if already exists else: append_write = "w+" # make a new file if not with open(config.USER.datafile, append_write) as file: file.write( "{}|{}|{}|{}|{}\n".format( record.timestamp, record.parent_id, record.category, record.link_URL, record.link_title_or_comment, ) ) new_post_id = -1 if record.category: new_post_id = max([record[0] for record in self.link_data if record[0]]) + 1 record = record._replace(ID_if_parent=new_post_id) self.link_data.insert(0, list(record)) self.generate_category_data() else: self.link_data.insert(0, list(record)) return new_post_id def generate_category_data(self): """generate categories list and category count from sorted link data""" self.categories.clear() i = (record for record in self.link_data if record[4] != "") for record in i: name = record[4] timestamp = record[2] if name not in [cat_record["name"] for cat_record in self.categories]: self.categories.append( {"name": name, "count": 1, "last_updated": timestamp} ) else: for cat_record in self.categories: if cat_record["name"] == name: cat_record["count"] += 1 if cat_record["last_updated"] < timestamp: cat_record["last_updated"] = timestamp