Merge pull request 'Show new activity on all screens' (#101) from show_unread_replies into master

Reviewed-on: #101
This commit is contained in:
cmccabe 2021-07-27 14:40:07 +00:00
commit ecbf2b66d2
4 changed files with 157 additions and 38 deletions

89
data.py
View File

@ -75,7 +75,8 @@ def get_parent_record(parent_id: str, link_data: list) -> list:
if parent_id == "":
raise ValueError("parent_id cannot be empty")
for record in link_data:
if record[0] == parent_id.partition("+")[2]:
timestamp = record[2]
if timestamp == parent_id.partition("+")[2]:
return record
raise KeyError("there's no parent record for the specified parent_id")
@ -167,28 +168,47 @@ class LinkData:
new_post_id = 1
record = record._replace(ID_if_parent=new_post_id)
self.link_data.insert(0, list(record))
self.generate_category_data()
else:
self.link_data.insert(0, list(record))
self.generate_category_data()
return new_post_id
def generate_category_data(self):
"""generate categories list and category count from sorted link data"""
self.categories.clear()
i = (record for record in self.link_data if record[4] != "")
for record in i:
for record in self.link_data:
name = record[4]
timestamp = record[2]
if name not in [cat_record["name"] for cat_record in self.categories]:
self.categories.append(
{"name": name, "count": 1, "last_updated": timestamp}
)
if name != "": # only replies have column 4 empty
if name not in [cat_record["name"] for cat_record in self.categories]:
self.categories.append(
{"name": name, "count": 1, "last_updated": timestamp}
)
else:
for cat_record in self.categories:
if cat_record["name"] == name:
cat_record["count"] += 1
if cat_record["last_updated"] < timestamp:
cat_record["last_updated"] = timestamp
else:
for cat_record in self.categories:
if cat_record["name"] == name:
cat_record["count"] += 1
if cat_record["last_updated"] < timestamp:
cat_record["last_updated"] = timestamp
parent_id = record[3]
try:
parent_record = get_parent_record(parent_id, self.link_data)
except KeyError:
continue
parent_cat_name = parent_record[4]
if parent_cat_name not in [
cat_record["name"] for cat_record in self.categories
]:
self.categories.append(
# append a record, but set the count to 0 because the parent record will be counted at some stage
{"name": parent_cat_name, "count": 0, "last_updated": timestamp}
)
else:
for cat_record in self.categories:
if cat_record["name"] == parent_cat_name:
if cat_record["last_updated"] < timestamp:
cat_record["last_updated"] = timestamp
def search(self, keyword: str) -> list:
"""returns a unique list of link_data records for posts that contain
@ -217,3 +237,46 @@ class LinkData:
search_results.add(tuple(parent_record))
return sorted(search_results, key=lambda x: x[0], reverse=True)
def list_category_details(self, selected_category: str) -> list:
"""returns a sorted list of posts belonging to the specified category"""
links = []
for record in self.link_data:
category = record[4]
if category == selected_category:
postid = record[0]
userid = record[1]
timestamp = record[2]
parent_id = userid + "+" + str(timestamp)
description = record[6]
replies = [i for i in self.link_data if i[3] == parent_id]
new_replies = [i for i in replies if i[2] >= config.USER.lastlogin]
if replies:
last_modified_timestamp = str(
max(
float(timestamp),
max([float(i[2]) for i in replies if i[2]]),
)
)
else:
last_modified_timestamp = timestamp
has_new_replies = (
True if new_replies or timestamp >= config.USER.lastlogin else False
)
links.append(
{
"postid": postid,
"link_timestamp": timestamp,
"link_author": userid,
"reply_count": len(replies),
"description": description,
"has_new_replies": has_new_replies,
"last_modified_timestamp": last_modified_timestamp,
}
)
return sorted(links, key=lambda x: x["last_modified_timestamp"], reverse=True)

View File

@ -69,22 +69,25 @@ def print_category_details(view_cat):
out = ""
link_count = 0
thread_index = {}
category_details = LinkData.list_category_details(view_cat["name"])
for line in link_data:
if line[4] == view_cat["name"]:
link_count += 1
thread_index[link_count] = line[0]
parent_id = line[1] + "+" + str(line[2])
replies = [line for line in link_data if line[3] == parent_id]
new_replies = len(
[line for line in replies if line[2] >= config.USER.lastlogin]
)
desc = textwrap.shorten(line[6], width=desclen, placeholder="...")
newmarker = "*" if new_replies or line[2] >= config.USER.lastlogin else ""
_dt = datetime.fromtimestamp(float(line[2])).strftime("%Y-%m-%d")
out += " {:3d} {:>10s} {:<{namelen}s} [{:3d}] {:s}{}\n".format(
link_count, _dt, line[1], len(replies), desc, newmarker, namelen=namelen
)
for link in category_details:
link_count += 1
thread_index[link_count] = link["postid"]
desc = textwrap.shorten(link["description"], width=desclen, placeholder="...")
newmarker = (
"*" if link["last_modified_timestamp"] >= config.USER.lastlogin else ""
)
_dt = datetime.fromtimestamp(float(link["link_timestamp"])).strftime("%Y-%m-%d")
out += " {:3d} {:>10s} {:<{namelen}s} [{:3d}] {:s}{}\n".format(
link_count,
_dt,
link["link_author"],
link["reply_count"],
desc,
newmarker,
namelen=namelen,
)
if len(out) > 0:
print(header)
@ -273,15 +276,17 @@ def is_valid_input(entry: str) -> bool:
return False
return True
def is_correct_category(entry: str) -> bool:
"""Make sure the user purposefully created a new category and not by
"""Make sure the user purposefully created a new category and not by
accident (mistyped, tried to use category number instead of name)"""
if entry not in [ record["name"] for record in categories ]:
if entry not in [record["name"] for record in categories]:
question = "Do you want to create a new category '{}'? Y/[N]".format(entry)
answer = input(question).lower().strip()
return answer != "" and answer[0] == "y"
return True
def get_input(item: str) -> str:
"""Get user input with the specified prompt, validate and return it, or
break if invalid"""
@ -403,7 +408,9 @@ def menu_view_category_details(cat_index):
menu_view_thread_details(post_id)
except (KeyError, ValueError):
# Catch a Post ID that is not in the thread list or is not a number
print("{}\n\n".format(style_text("Invalid category ID/entry", False, "bold")))
print(
"{}\n\n".format(style_text("Invalid category ID/entry", False, "bold"))
)
def menu_view_thread_details(post_id):
@ -455,7 +462,7 @@ def style_text(text: str, is_input: bool, *args) -> str:
# (ctrl+shift+esc+E to enable evil mode.)
"inverse": "\033[7m", # Make fg and bg color swap
"rl_left_fix": "\001" if is_input else "",
"rl_right_fix": "\002" if is_input else ""
"rl_right_fix": "\002" if is_input else "",
}
out = ""
for arg in args:

View File

@ -18,5 +18,5 @@ setup(
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: POSIX",
],
python_requires='>=3.6',
python_requires=">=3.6",
)

View File

@ -108,10 +108,48 @@ class TestDataHelperFunctions(unittest.TestCase):
)
def test_get_parent_record(self):
# test that an empty parent_id throws a value error
test_link_data = []
parent_id = ""
with self.assertRaises(ValueError):
data.get_parent_record("", test_link_data)
data.get_parent_record(parent_id, test_link_data)
# confirm the function returns a valid parent record
test_parent_record = [
1,
"testuser",
"1000",
"",
"test_category",
"test_url",
"test_title",
]
test_link_data = (
[
[
1,
"testuser",
"1000",
"",
"test_category",
"test_url",
"test_title",
]
],
)
parent_id = 1000
self.assertListEqual(
test_parent_record, data.get_parent_record(parent_id, test_link_data)
)
# confirm key error raised if no results found
parent_id = 69
with self.assertRaises(KeyError):
data.get_parent_record(parent_id, test_link_data)
class TestLinkDataSearch(unittest.TestCase):
@ -177,12 +215,22 @@ class TestLinkDataSearch(unittest.TestCase):
"",
"this is an orphaned reply but it contains the keyword",
],
[66, "keyword", "1576461366.5580268", "", "c", "c", "c"],
[66, "keyword", "1576461366.5580261", "", "c", "c", "c"],
[65, "poster6", "1576461367.5580268", "", "keyword", "c", "c"],
[64, "poster7", "1576461368.5580268", "", "c", "keyword", "c"],
[63, "poster8", "1576461369.5580268", "", "c", "c", "keyword"],
[62, "poster9", "1576461370.5580268", "", "c", "c", "ssskeywordsubstring"],
[61, "poster0", "1576461370.5580268", "", "c", "c", "KEYWORD capital"],
[61, "poster0", "1576461371.5580268", "", "c", "c", "KEYWORD capital"],
[60, "poste99", "1576461372.5580268", "", "c", "c", "c"],
[
"",
"user99",
"1576461372.6680268",
"poste99+1576461372.5580268",
"",
"",
"the post doesn't contain the keyword but this reply does",
],
]
test_results = [
@ -204,12 +252,13 @@ class TestLinkDataSearch(unittest.TestCase):
"gemini://keyword",
"keyword site with no replies",
),
(66, "keyword", "1576461366.5580268", "", "c", "c", "c"),
(66, "keyword", "1576461366.5580261", "", "c", "c", "c"),
(65, "poster6", "1576461367.5580268", "", "keyword", "c", "c"),
(64, "poster7", "1576461368.5580268", "", "c", "keyword", "c"),
(63, "poster8", "1576461369.5580268", "", "c", "c", "keyword"),
(62, "poster9", "1576461370.5580268", "", "c", "c", "ssskeywordsubstring"),
(61, "poster0", "1576461370.5580268", "", "c", "c", "KEYWORD capital"),
(61, "poster0", "1576461371.5580268", "", "c", "c", "KEYWORD capital"),
(60, "poste99", "1576461372.5580268", "", "c", "c", "c"),
]
self.assertEqual(link_data.search("keyword"), test_results)