fix aformentioned bugs

This commit is contained in:
John Sennesael 2021-10-19 17:56:28 -05:00
parent bb9c3da3d8
commit 28b5a2457a
5 changed files with 134 additions and 71 deletions

View File

@ -55,15 +55,14 @@ struct DatabaseException: public UsenetSearchException
class Database
{
Application& m_app;
std::filesystem::path m_databasePath;
std::uint64_t m_databaseVersion{DatabaseVersion};
std::vector<std::filesystem::path> m_lockedFiles;
std::mutex m_lockedFilesMutex;
std::uint8_t m_maxTreeDepth{5};
SerializableFile m_newsGroupFileIO;
void CheckDbVersion(const SerializableFile& f) const;
std::filesystem::path GetNewsGroupFilePath() const;
std::filesystem::path GetTokenFilePath(
const std::string& token,
bool mkdirs=false
@ -80,12 +79,15 @@ class Database
const std::filesystem::path dbFile,
const std::string& subtoken
);
void OpenNewsGroupFile();
void SaveToken(
const std::string& subToken,
std::uint64_t newsgroupID,
std::uint32_t articleID
);
static void SyncLastUpdated(
NntpListEntry& a,
NntpListEntry& b
);
public:

View File

@ -21,6 +21,7 @@
#include <codecvt>
#include <fstream>
#include <functional>
#include <limits>
#include <locale>
#include <memory>
#include <string>
@ -43,7 +44,7 @@ struct UsenetClientException: public UsenetSearchException
struct NntpHeader
{
std::uint64_t articleID;
std::uint64_t articleID{0};
std::string subject;
/*
There's a lot more headers, but trying to limit it to the fields we care
@ -55,17 +56,20 @@ typedef std::vector<NntpHeader> NntpHeaders;
struct NntpMessage
{
std::uint16_t code;
std::uint16_t code{0};
std::string message;
};
struct NntpListEntry
{
std::uint64_t id;
std::uint32_t lastIndexedArticle;
std::uint64_t count;
std::uint64_t high;
std::uint64_t low;
static constexpr std::uint32_t NOT_INDEXED{
std::numeric_limits<std::uint32_t>::max()
};
std::uint64_t id{0};
std::uint32_t lastIndexedArticle{NOT_INDEXED};
std::uint64_t count{0};
std::uint64_t high{0};
std::uint64_t low{0};
std::string name;
std::string status;
};

View File

@ -42,20 +42,35 @@ Database::Database(Application& app): m_app(app)
Database::~Database()
{
m_newsGroupFileIO.Close();
}
void Database::CheckDbVersion(const SerializableFile& f) const
{
f.Seek(0);
const std::uint64_t ver = f.ReadInt64();
if (ver != m_databaseVersion)
{
throw DatabaseException(EINVAL,
"Wrong database version - Got: " + std::to_string(ver) + " want: "
+ std::to_string(m_databaseVersion)
);
}
}
std::unique_ptr<NntpListEntry> Database::FindNntpEntry(
const std::string& subject)
{
OpenNewsGroupFile();
ScopeExit closeNewsGroupFile([&](){ m_newsGroupFileIO.Close(); });
const std::uint64_t numGroups = m_newsGroupFileIO.ReadInt64();
const auto path = GetNewsGroupFilePath();
if (!std::filesystem::exists(path)) return nullptr;
SerializableFile io;
io.Open(path);
CheckDbVersion(io);
const std::uint64_t numGroups = io.ReadInt64();
std::unique_ptr<NntpListEntry> result = nullptr;
for (std::uint64_t n = 0; n != numGroups; ++n)
{
NntpListEntry entry;
m_newsGroupFileIO >> entry;
io >> entry;
if (entry.name == subject)
{
result = std::make_unique<NntpListEntry>(entry);
@ -67,19 +82,27 @@ std::unique_ptr<NntpListEntry> Database::FindNntpEntry(
std::uint32_t Database::GetLastIndexedArticle(std::uint64_t newsgroupID)
{
OpenNewsGroupFile();
ScopeExit closeNewsGroupFile([&](){ m_newsGroupFileIO.Close(); });
const std::uint64_t numGroups = m_newsGroupFileIO.ReadInt64();
const auto path = GetNewsGroupFilePath();
if (!std::filesystem::exists(path))
{
throw DatabaseException(ENOTFOUND, "No indexed articles for newsgroup: "
+ std::to_string(newsgroupID));
}
SerializableFile io;
io.Open(path);
CheckDbVersion(io);
const std::uint64_t numGroups = io.ReadInt64();
for (std::uint64_t n = 0; n != numGroups; ++n)
{
NntpListEntry entry;
m_newsGroupFileIO >> entry;
io >> entry;
if (entry.id == newsgroupID)
{
return entry.lastIndexedArticle;
}
}
return 0;
throw DatabaseException(ENOTFOUND, "No indexed articles for newsgroup: "
+ std::to_string(newsgroupID));
}
std::filesystem::path Database::GetTokenFilePath(
@ -125,15 +148,18 @@ void Database::MaxTreeDepth(std::uint8_t depth)
std::unique_ptr<std::vector<NntpListEntry>> Database::LoadNewsgroupList()
{
OpenNewsGroupFile();
ScopeExit closeNewsGroupFile([&](){ m_newsGroupFileIO.Close(); });
const size_t newsGroupCount = m_newsGroupFileIO.ReadInt64();
auto result = std::make_unique<std::vector<NntpListEntry>>();
const auto path = GetNewsGroupFilePath();
if (!std::filesystem::exists(path)) return result;
SerializableFile io;
io.Open(path);
CheckDbVersion(io);
const size_t newsGroupCount = io.ReadInt64();
for (size_t numLoaded = 0; numLoaded != newsGroupCount; ++numLoaded)
{
NntpListEntry entry;
m_newsGroupFileIO >> entry;
io >> entry;
result->emplace_back(entry);
}
return result;
@ -148,35 +174,9 @@ void Database::Open(std::filesystem::path dbPath)
}
}
void Database::OpenNewsGroupFile()
std::filesystem::path Database::GetNewsGroupFilePath() const
{
if (m_newsGroupFileIO.IsOpen())
{
m_newsGroupFileIO.Seek(sizeof(m_databaseVersion), std::ios_base::beg);
return;
}
const std::filesystem::path newsGroupFilePath =
m_databasePath / "newsgroups.db";
const bool exists = std::filesystem::exists(newsGroupFilePath);
m_newsGroupFileIO.Open(newsGroupFilePath);
if (exists)
{
const std::uint64_t ver = m_newsGroupFileIO.ReadInt64();
if (ver != m_databaseVersion)
{
throw DatabaseException(EBADF,
std::string{"Mismatching newgroup file database version:"}
+ " have: " + std::to_string(ver) + " - want: "
+ std::to_string(m_databaseVersion)
);
}
}
else
{
m_newsGroupFileIO << m_databaseVersion;
m_newsGroupFileIO << std::uint64_t{0}; // newsgroup count.
m_newsGroupFileIO.Seek(sizeof(m_databaseVersion), std::ios_base::beg);
}
return m_databasePath / "newsgroups.db";
}
void Database::ParseTokenFile(
@ -215,6 +215,7 @@ void Database::SetLastIndexedArticle(
{
entry.lastIndexedArticle = articleID;
found = true;
break;
}
}
}
@ -364,6 +365,35 @@ std::unique_ptr<std::vector<ArticleEntry>> Database::Search(
return result;
}
void Database::SyncLastUpdated(
NntpListEntry& a,
NntpListEntry& b
)
{
// If both are equal, nothing to do here.
if (a.lastIndexedArticle == b.lastIndexedArticle) return;
// Whichever one's not indexed, gets the value of the other.
if (a.lastIndexedArticle == NntpListEntry::NOT_INDEXED)
{
a.lastIndexedArticle = b.lastIndexedArticle;
return;
}
if (b.lastIndexedArticle == NntpListEntry::NOT_INDEXED)
{
b.lastIndexedArticle = a.lastIndexedArticle;
return;
}
// Otherwise, whichever's higher wins.
if (a.lastIndexedArticle > b.lastIndexedArticle)
{
b.lastIndexedArticle = a.lastIndexedArticle;
}
else
{
a.lastIndexedArticle = b.lastIndexedArticle;
}
}
void Database::UpdateNewsgroupList(std::vector<NntpListEntry>& list)
{
if (list.size() == 0) return;
@ -371,43 +401,50 @@ void Database::UpdateNewsgroupList(std::vector<NntpListEntry>& list)
auto outList = LoadNewsgroupList();
for (auto& entry: list)
{
NntpListEntry newEntry(entry);
bool found{false};
if (outList)
{
std::for_each(
outList->begin(),
outList->end(),
[&entry, &found](NntpListEntry& oldEntry)
[this, &entry, &found](NntpListEntry& oldEntry)
{
if (oldEntry.name == entry.name)
{
// update existing (copy everything but ID & name)
// update existing- the passed entry has updated counts
// and status.
found = true;
oldEntry.count = entry.count;
oldEntry.high = entry.high;
oldEntry.lastIndexedArticle = entry.lastIndexedArticle;
oldEntry.low = entry.low;
oldEntry.status = entry.status;
// As for lastIndexed: whoemever's higher wins.
SyncLastUpdated(entry, oldEntry);
// The ID should be sticky - whatever's already in the
// db is what we care about, so update it in the passed
// entries.
entry.id = oldEntry.id;
}
}
);
}
if (found) continue;
// add new.
NntpListEntry newEntry(entry);
newEntry.id = GetUniqueNntpEntryId(*outList);
outList->emplace_back(newEntry);
entry.id = newEntry.id;
}
OpenNewsGroupFile();
ScopeExit closeNewsGroupFile([&](){ m_newsGroupFileIO.Close(); });
m_newsGroupFileIO << std::uint64_t{outList->size()};
SerializableFile io;
io.Open(GetNewsGroupFilePath());
io << m_databaseVersion;
io << std::uint64_t{outList->size()};
std::for_each(
outList->begin(),
outList->end(),
[&](const NntpListEntry& e)
{
m_newsGroupFileIO << e;
io << e;
}
);
}

View File

@ -147,14 +147,29 @@ void Indexer::Index(const std::vector<NntpListEntry>& newsgroups)
std::atomic<std::uint64_t> headerCount{0};
const std::atomic<std::uint64_t> groupID = group.id;
std::reference_wrapper<Database> dbref = std::ref(m_app.GetDb());
const std::uint32_t startMessage = dbref.get().GetLastIndexedArticle(
groupID
);
std::uint32_t startMessage = 0;
try
{
startMessage = dbref.get().GetLastIndexedArticle(groupID);
if (startMessage == NntpListEntry::NOT_INDEXED)
{
startMessage = 0;
}
else
{
++startMessage;
}
}
catch (const DatabaseException&)
{
startMessage = 0;
}
std::cout << "Indexing starting at message: "
<< std::to_string(startMessage) << std::endl;
m_client.ProcessHeaders(startMessage,
[this, &headerCount, &dbref, &groupID](std::shared_ptr<NntpHeaders> headers){
m_threads.Queue([this, headers, &headerCount, &dbref, &groupID](){
[this, &startMessage, &headerCount, &dbref, &groupID](std::shared_ptr<NntpHeaders> headers){
m_threads.Queue([this, headers, &startMessage, &headerCount, &dbref, &groupID](){
std::uint64_t lastArticle{0};
for (const auto& header: *headers)
{
@ -172,14 +187,19 @@ void Indexer::Index(const std::vector<NntpListEntry>& newsgroups)
headerCount++;
if (articleID > lastArticle) lastArticle = articleID;
}
if (lastArticle == NntpListEntry::NOT_INDEXED)
{
lastArticle = 0;
}
// Update last-indexed id for the newsgroup.
const std::uint32_t lastIndexedID =
dbref.get().GetLastIndexedArticle(groupID);
if (lastIndexedID < lastArticle)
if (startMessage < lastArticle)
{
dbref.get().SetLastIndexedArticle(
groupID, lastArticle
);
std::cout << "Setting last indexed article for newsgroup ID "
<< groupID << " to: "
<< lastArticle << std::endl;
}
std::cout << ".";
std::cout.flush();

View File

@ -188,7 +188,7 @@ std::unique_ptr<std::vector<NntpListEntry>> UsenetClient::List()
entry.count = std::stoul(fields[3]);
entry.status = fields[4];
entry.id = 0; // incremented by db when saving.
entry.lastIndexedArticle = 0;
entry.lastIndexedArticle = NntpListEntry::NOT_INDEXED;
if (m_app.GetFilter().ProcessNewsgroup(entry.name))
{
result->emplace_back(entry);