From 67c829b1bcaf414a42c06c11353d76fc4d800059 Mon Sep 17 00:00:00 2001 From: John Sennesael Date: Wed, 29 Sep 2021 18:52:54 -0500 Subject: [PATCH] btree storage, multithreaded processing --- .kdev_ignore | 1 + CMakeLists.txt | 6 +- include/usenetsearch/Configuration.h | 5 +- include/usenetsearch/Database.h | 53 +++- include/usenetsearch/ScopeExit.h | 18 ++ include/usenetsearch/Serialize.h | 44 ++++ include/usenetsearch/StringUtils.h | 42 +++- include/usenetsearch/ThreadPool.h | 34 +++ include/usenetsearch/UsenetClient.h | 38 +-- src/Configuration.cpp | 22 +- src/Database.cpp | 352 ++++++++++++++++++++++++--- src/IoSocket.cpp | 1 + src/Serialize.cpp | 163 +++++++++++++ src/StringUtils.cpp | 61 ++--- src/ThreadPool.cpp | 113 +++++++++ src/UsenetClient.cpp | 187 +++++++------- src/main.cpp | 127 +++++++--- usenetsearch.example.conf | 4 + 18 files changed, 1039 insertions(+), 232 deletions(-) create mode 100644 .kdev_ignore create mode 100644 include/usenetsearch/ScopeExit.h create mode 100644 include/usenetsearch/Serialize.h create mode 100644 include/usenetsearch/ThreadPool.h create mode 100644 src/Serialize.cpp create mode 100644 src/ThreadPool.cpp diff --git a/.kdev_ignore b/.kdev_ignore new file mode 100644 index 0000000..567609b --- /dev/null +++ b/.kdev_ignore @@ -0,0 +1 @@ +build/ diff --git a/CMakeLists.txt b/CMakeLists.txt index c0881f8..5f51749 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,8 +18,8 @@ cmake_minimum_required(VERSION 3.5) set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -ggdb -fno-omit-frame-pointer -fsanitize=address") set (CMAKE_LINKER_FLAGS_DEBUG "${CMAKE_LINKER_FLAGS_DEBUG} -fno-omit-frame-pointer -fsanitize=address") -#set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -ggdb -fno-omit-frame-pointer") -#set (CMAKE_LINKER_FLAGS_DEBUG "${CMAKE_LINKER_FLAGS_DEBUG} -fno-omit-frame-pointer") +#set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -ggdb -fno-omit-frame-pointer -pthread") +#set (CMAKE_LINKER_FLAGS_DEBUG "${CMAKE_LINKER_FLAGS_DEBUG} -fno-omit-frame-pointer -pthread") if(NOT DEFINED CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE "DEBUG" CACHE STRING "Type of build: Debug|Release") @@ -39,9 +39,11 @@ add_executable(usenetsearch "src/Except.cpp" "src/IoSocket.cpp" "src/main.cpp" + "src/Serialize.cpp" "src/SSLConnection.cpp" "src/StringUtils.cpp" "src/TcpConnection.cpp" + "src/ThreadPool.cpp" "src/UsenetClient.cpp" ) diff --git a/include/usenetsearch/Configuration.h b/include/usenetsearch/Configuration.h index 0107e25..06566d5 100644 --- a/include/usenetsearch/Configuration.h +++ b/include/usenetsearch/Configuration.h @@ -34,7 +34,8 @@ struct ConfigurationException: public UsenetSearchException class Configuration { - + std::uint16_t m_batchSize{1}; + std::uint16_t m_maxThreads{1}; std::string m_nntpServerHost{"127.0.0.1"}; std::string m_nntpServerPassword{"password"}; int m_nntpServerPort{119}; @@ -44,7 +45,9 @@ class Configuration public: + std::uint16_t BatchSize() const; std::filesystem::path DatabasePath() const; + std::uint16_t MaxThreads() const; std::string NNTPServerHost() const; std::string NNTPServerPassword() const; int NNTPServerPort() const; diff --git a/include/usenetsearch/Database.h b/include/usenetsearch/Database.h index dc8793a..1f19c2b 100644 --- a/include/usenetsearch/Database.h +++ b/include/usenetsearch/Database.h @@ -23,14 +23,28 @@ #include #include #include +#include #include +#include "usenetsearch/Serialize.h" #include "usenetsearch/UsenetClient.h" namespace usenetsearch { static constexpr const std::uint64_t DatabaseVersion{1}; +struct ArticleEntry +{ + std::vector articleIDs; + std::string searchString; + + size_t Size() const; + +}; + +std::fstream& operator<<(std::fstream& out, const ArticleEntry& obj); +std::fstream& operator>>(std::fstream& in, ArticleEntry& obj); + struct DatabaseException: public UsenetSearchException { DatabaseException(int errorCode, const std::string& message): @@ -45,11 +59,40 @@ class Database std::wstring_convert> m_conv; std::filesystem::path m_databasePath; std::uint64_t m_databaseVersion{DatabaseVersion}; - std::ifstream m_newsGroupFileInput; - std::ofstream m_newsGroupFileOutput; + std::vector m_lockedFiles; + std::mutex m_lockedFilesMutex; + std::uint8_t m_maxTreeDepth{5}; + SerializableFile m_newsGroupFileIO; - std::filesystem::path GetArticleFilePath(const std::wstring& newsgroup); + bool GetArticleEntry( + const std::string& subToken, + const std::string& searchString, + ArticleEntry& entry, + size_t& startPosition, + size_t& endPosition, + size_t& count); + + std::filesystem::path GetArticleFilePath( + const std::wstring& newsgroup, + bool mkdirs=false + ); + std::filesystem::path GetTokenFilePath( + const std::string& token, + bool mkdirs=false + ); + bool HasToken( + const std::string& subToken, + const std::string& subject, + std::uint64_t articleID + ); + void LockFile(std::filesystem::path file); + void UnlockFile(std::filesystem::path file); void OpenNewsGroupFile(); + void SaveToken( + const std::string& subToken, + const std::string& subject, + std::uint64_t articleID + ); public: @@ -64,6 +107,10 @@ public: const std::vector& headers ); void UpdateNewsgroupList(const std::vector& list); + void SaveSearchTokens( + std::uint64_t articleID, + const std::string& searchString + ); }; diff --git a/include/usenetsearch/ScopeExit.h b/include/usenetsearch/ScopeExit.h new file mode 100644 index 0000000..53456e6 --- /dev/null +++ b/include/usenetsearch/ScopeExit.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +namespace usenetsearch { + +class ScopeExit +{ + + std::function m_function; + +public: + + ScopeExit(std::function fn): m_function(fn){} + ~ScopeExit(){ if (m_function) m_function(); } +}; + +} // namespace usenetsearch diff --git a/include/usenetsearch/Serialize.h b/include/usenetsearch/Serialize.h new file mode 100644 index 0000000..a7cb42e --- /dev/null +++ b/include/usenetsearch/Serialize.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include +#include + +#include "usenetsearch/Except.h" + +namespace usenetsearch { + +struct ArticleEntry; +struct NntpHeader; +struct NntpListEntry; + +struct SerializeException: public UsenetSearchException +{ + SerializeException(int errorCode, const std::string& message): + UsenetSearchException(errorCode, message){} + + virtual ~SerializeException() = default; +}; + +class SerializableFile: public std::fstream +{ +public: + virtual ~SerializableFile() = default; +}; +SerializableFile& operator<<( + SerializableFile& out, const std::vector& arr); +SerializableFile& operator>>( + SerializableFile& in, std::vector& arr); +SerializableFile& operator<<(SerializableFile& out, const ArticleEntry& obj); +SerializableFile& operator>>(SerializableFile& in, ArticleEntry& obj); +SerializableFile& operator<<(SerializableFile& out, const std::string& str); +SerializableFile& operator>>(SerializableFile& in, std::string& str); +SerializableFile& operator<<(SerializableFile& out, const std::wstring& str); +SerializableFile& operator>>(SerializableFile& in, std::wstring& str); +SerializableFile& operator<<(SerializableFile& out, const NntpHeader& obj); +SerializableFile& operator>>(SerializableFile& in, NntpHeader& obj); +SerializableFile& operator<<(SerializableFile& out, const NntpListEntry& obj); +SerializableFile& operator>>(SerializableFile& in, NntpListEntry& obj); +SerializableFile& operator<<(SerializableFile& out, const uint64_t& obj); +SerializableFile& operator>>(SerializableFile& in, uint64_t& obj); +} // namespace usenetsearch diff --git a/include/usenetsearch/StringUtils.h b/include/usenetsearch/StringUtils.h index 245e2b8..2293b8f 100644 --- a/include/usenetsearch/StringUtils.h +++ b/include/usenetsearch/StringUtils.h @@ -24,11 +24,6 @@ namespace usenetsearch { -std::ostream& operator<<(std::ofstream& out, const std::string& str); -std::ifstream& operator>>(std::ifstream& in, std::string& str); -std::ostream& operator<<(std::ofstream& out, const std::wstring& str); -std::ifstream& operator>>(std::ifstream& in, std::wstring& str); - struct StringException: public UsenetSearchException { StringException(int errorCode, const std::string& message): @@ -39,6 +34,19 @@ struct StringException: public UsenetSearchException std::string StringHash(const std::string& input); +template +std::string StringJoin(const std::vector& list, const T& delim) +{ + T result; + if (list.empty()) return result; + if (list.size() == 1) return list[0]; + for (const auto& token: list) + { + result += token + delim; + } + return result.substr(0, result.size() - delim.size()); +} + template std::vector StringSplit( const T& str, @@ -49,11 +57,12 @@ std::vector StringSplit( size_t pos=0; T s = str; std::vector result; + if (str.empty()) return result; int tokens = 1; while (((pos = s.find(delim)) != T::npos) - and ((max_elem < 0) or (tokens != max_elem))) + && ((max_elem < 0) or (tokens != max_elem))) { - result.push_back(s.substr(0,pos)); + result.push_back(s.substr(0, pos)); s.erase(0, pos + delim.length()); tokens++; } @@ -70,6 +79,18 @@ T StringLeftTrim(const T& str) return s; } +template +T StringRemove(const T& subject, const T& toErase) +{ + T s(subject); + size_t pos = T::npos; + while ((pos = s.find(toErase) )!= T::npos) + { + s.erase(pos, toErase.length()); + } + return s; +} + template T StringRightTrim(const T& str) { @@ -100,4 +121,11 @@ T StringToLower(const T& str) bool StringToBoolean(const std::string& str); bool StringToBoolean(const std::wstring& str); +void StringTreeOperation( + const std::string& searchString, + const std::string& splitBy, + size_t maxDepth, + std::function Fn +); + } // namespace usenetsearch diff --git a/include/usenetsearch/ThreadPool.h b/include/usenetsearch/ThreadPool.h new file mode 100644 index 0000000..492b72c --- /dev/null +++ b/include/usenetsearch/ThreadPool.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace usenetsearch { + +class ThreadPool +{ + typedef std::unordered_map ThreadStates; + typedef std::vector> Threads; + size_t m_maxThreads{2}; + std::atomic m_stopping{false}; + Threads m_threads; + std::mutex m_threadsMutex; + std::shared_ptr m_threadStates; + std::mutex m_threadStatesMutex; + +public: + + ThreadPool(); + ThreadPool(const ThreadPool&) = delete; + ~ThreadPool(); + void JoinThreads(); + void MaxThreads(std::uint16_t max); + void Queue(std::function fn); + +}; + +} // namespace usenetsearch diff --git a/include/usenetsearch/UsenetClient.h b/include/usenetsearch/UsenetClient.h index 2fafdd6..9d21e6f 100644 --- a/include/usenetsearch/UsenetClient.h +++ b/include/usenetsearch/UsenetClient.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -40,34 +41,30 @@ struct UsenetClientException: public UsenetSearchException struct NntpHeader { - std::wstring articleID; - std::wstring from; - std::wstring subject; + std::uint64_t articleID; + std::string subject; /* There's a lot more headers, but trying to limit it to the fields we care to search to save on the storage space per article (there's going to be a LOT of them). */ }; -std::ostream& operator<<(std::ofstream& out, const NntpHeader& obj); -std::ifstream& operator>>(std::ifstream& in, NntpHeader& obj); +typedef std::vector NntpHeaders; struct NntpMessage { std::uint16_t code; - std::wstring message; + std::string message; }; struct NntpListEntry { - std::wstring name; + std::string name; std::uint64_t high; std::uint64_t low; std::uint64_t count; - std::wstring status; + std::string status; }; -std::ostream& operator<<(std::ofstream& out, const NntpListEntry& obj); -std::ifstream& operator>>(std::ifstream& in, NntpListEntry& obj); class UsenetClient { @@ -79,7 +76,7 @@ class UsenetClient bool IsError(const NntpMessage& msg) const; NntpMessage ReadLine(); void Write(const std::wstring& message); - std::wstring ReadUntil(const std::wstring& deliminator); + std::string ReadUntil(const std::string& deliminator); public: @@ -88,9 +85,8 @@ public: * Authenticate * List() to get a list of newsgroups * for every newsgroup: - * ListGroup() to get list of article ID's - * for every article ID: - * Head to get article headers. + * XZHDR subject 0- + * uncompress result. */ void Authenticate(const std::wstring& user, const std::wstring& password); @@ -101,15 +97,19 @@ public: bool useSSL = false ); - void Group(const std::wstring& groupName); + void Group(const std::wstring& newsgroup); - NntpHeader Head(const std::wstring& articleID); + NntpHeader Head(std::uint64_t articleID); + + void ProcessHeaders( + std::uint64_t startMessage, + std::function)> processFn, + std::uint64_t batchSize=100 + ); std::unique_ptr> List(); - /* whilst message id's are typically numbers, the rfc states they are unique - alphanumeric strings. */ - std::unique_ptr> ListGroup( + std::unique_ptr> ListGroup( const std::wstring& newsGroup ); diff --git a/src/Configuration.cpp b/src/Configuration.cpp index 8e44ca0..8b1b582 100644 --- a/src/Configuration.cpp +++ b/src/Configuration.cpp @@ -25,11 +25,21 @@ namespace usenetsearch { +std::uint16_t Configuration::BatchSize() const +{ + return m_batchSize; +} + std::filesystem::path Configuration::DatabasePath() const { return m_databasePath; } +std::uint16_t Configuration::MaxThreads() const +{ + return m_maxThreads; +} + std::string Configuration::NNTPServerHost() const { return m_nntpServerHost; @@ -87,10 +97,18 @@ void Configuration::Open(const std::string& filename) } const std::string key = StringToLower(kvp[0]); const std::string value = StringTrim(kvp[1]); - if (key == "database_path") + if (key == "batch_size") + { + m_batchSize = stoi(value); + } + else if (key == "database_path") { m_databasePath = value; } + else if (key == "max_threads") + { + m_maxThreads = stoi(value); + } else if (key == "nntp_server_host") { m_nntpServerHost = value; @@ -101,7 +119,7 @@ void Configuration::Open(const std::string& filename) } else if (key == "nntp_server_port") { - m_nntpServerPort = atoi(value.c_str()); + m_nntpServerPort = stoi(value); } else if (key == "nntp_server_use_ssl") { diff --git a/src/Database.cpp b/src/Database.cpp index a504a40..0764876 100644 --- a/src/Database.cpp +++ b/src/Database.cpp @@ -15,35 +15,207 @@ along with UsenetSearch. If not, see . */ +#include + +#include #include #include #include +#include #include #include "usenetsearch/StringUtils.h" #include "usenetsearch/UsenetClient.h" +#include "usenetsearch/ScopeExit.h" +#include "usenetsearch/Serialize.h" #include "usenetsearch/Database.h" namespace usenetsearch { +size_t ArticleEntry::Size() const +{ + return sizeof(articleIDs) + (articleIDs.size() * sizeof(std::uint64_t)) + + sizeof(std::uint64_t) + (searchString.size() * sizeof(char)); +} + +// Database class -------------------------------------------------------------- + Database::~Database() { - if (m_newsGroupFileInput.is_open()) + if (m_newsGroupFileIO.is_open()) { - m_newsGroupFileInput.close(); - } - if (m_newsGroupFileOutput.is_open()) - { - m_newsGroupFileOutput.close(); + m_newsGroupFileIO.close(); } } std::filesystem::path Database::GetArticleFilePath( - const std::wstring& newsgroup) + const std::wstring& newsgroup, bool mkdirs) { - const auto groupFile = StringHash(m_conv.to_bytes(newsgroup)) + ".db"; - return m_databasePath / groupFile; + const std::string md5 = StringHash(m_conv.to_bytes(newsgroup)); + const std::string tok1 = md5.substr(0, 2); + const std::string tok2 = md5.substr(2, 2); + const std::string tok3 = md5.substr(4, 2); + const std::filesystem::path groupPath = m_databasePath / "articles" + / tok1 / tok2 / tok3; + if (mkdirs) + { + if (!std::filesystem::exists(groupPath)) + { + std::filesystem::create_directories(groupPath); + } + } + const auto groupFile = md5 + ".db"; + return groupPath / groupFile; +} + +bool Database::GetArticleEntry( + const std::string& subToken, + const std::string& searchString, + ArticleEntry& entry, + size_t& startPosition, + size_t& endPosition, + size_t& count) +{ + const auto path = GetTokenFilePath(subToken); + if (!std::filesystem::exists(path)) return false; + + SerializableFile io; + + LockFile(path); + ScopeExit closeAndUnlockFile([&](){ + if (io.is_open()) io.close(); + UnlockFile(path); + }); + + io.open(path, std::ios::binary | std::ios::in); + if (!io.is_open()) return false; + + std::uint64_t articleCount{0}; + io >> articleCount; + size_t startPos{0}; + size_t endPos{0}; + for (std::uint64_t i = 0; i != articleCount; ++i) + { + ArticleEntry curEntry{}; + startPos = io.tellg(); + io >> curEntry; + endPos = io.tellg(); + if (curEntry.searchString == searchString) + { + entry = curEntry; + startPosition = startPos; + endPosition = endPos; + count = i + 1; + return true; + } + } + return false; +} + +std::filesystem::path Database::GetTokenFilePath( + const std::string& token, + bool mkdirs +) +{ + const std::string md5 = StringHash(token); + const std::string tok1 = md5.substr(0, 2); + const std::string tok2 = md5.substr(2, 2); + const std::string tok3 = md5.substr(4, 2); + const std::filesystem::path groupPath = m_databasePath / "tokens" + / tok1 / tok2 / tok3; + if (mkdirs) + { + if (!std::filesystem::exists(groupPath)) + { + std::filesystem::create_directories(groupPath); + } + } + const auto groupFile = md5 + ".db"; + return groupPath / groupFile; +} + + + +bool Database::HasToken( + const std::string& subToken, + const std::string& subject, + std::uint64_t articleID) +{ + const auto path = GetTokenFilePath(subToken); + if (!std::filesystem::exists(path)) return false; + LockFile(path); + ScopeExit unlockFile([&](){ UnlockFile(path); }); + SerializableFile io; + io.open(path, std::ios::binary | std::ios::in); + std::uint64_t articleCount{0}; + io.read(reinterpret_cast(&articleCount), sizeof(articleCount)); + std::uint64_t c = 0; + for (std::uint64_t i = 0; i != c; ++i) + { + ArticleEntry entry; + io >> entry; + if (entry.searchString == subject) + { + if (std::find( + entry.articleIDs.begin(), + entry.articleIDs.end(), articleID) != entry.articleIDs.end()) + { + return true; + } + } + } + return false; +} + +void Database::LockFile(std::filesystem::path file) +{ +#if 1 + while (true) + { + { + std::lock_guard lock(m_lockedFilesMutex); + auto it = std::find(m_lockedFiles.begin(), m_lockedFiles.end(), file); + if (it == m_lockedFiles.end()) break; + } + //std::cout << "Waiting on lock: " << file.string() << std::endl; + //std::this_thread::sleep_for(std::chrono::milliseconds{1000}); + std::this_thread::sleep_for(std::chrono::milliseconds{20}); + } + { + std::lock_guard lock(m_lockedFilesMutex); + m_lockedFiles.emplace_back(file); + } +#else + const std::filesystem::path lockFilePath = file.string() + ".lock"; + while (true) + { + if (!std::filesystem::exists(lockFilePath)) + { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds{20}); + } + std::ofstream touch(lockFilePath); +#endif +} + +void Database::UnlockFile(std::filesystem::path file) +{ +#if 1 + std::lock_guard lock(m_lockedFilesMutex); + auto it = std::find(m_lockedFiles.begin(), m_lockedFiles.end(), file); + if (it != m_lockedFiles.end()) + { + m_lockedFiles.erase(it); + } +#else + const std::filesystem::path lockFilePath = file.string() + ".lock"; + if (std::filesystem::exists(lockFilePath)) + { + std::filesystem::remove(lockFilePath); + } +#endif } std::unique_ptr> Database::LoadArticleList( @@ -56,8 +228,12 @@ std::unique_ptr> Database::LoadArticleList( "No article list found for newsgroup " + m_conv.to_bytes(newsgroup) ); } - std::ifstream io; - io.open(articleFile, std::ios::binary); + + LockFile(articleFile); + ScopeExit unlockFile([&](){ UnlockFile(articleFile); }); + + SerializableFile io; + io.open(articleFile, std::ios::binary | std::ios::in); std::uint64_t articleCount; io.read( reinterpret_cast(&articleCount), @@ -79,7 +255,7 @@ std::unique_ptr> Database::LoadNewsgroupList() OpenNewsGroupFile(); std::uint64_t dbVersion{0}; - m_newsGroupFileInput.read( + m_newsGroupFileIO.read( reinterpret_cast(&dbVersion), sizeof(dbVersion) ); @@ -92,7 +268,7 @@ std::unique_ptr> Database::LoadNewsgroupList() } size_t newsGroupCount{0}; - m_newsGroupFileInput.read( + m_newsGroupFileIO.read( reinterpret_cast(&newsGroupCount), sizeof(newsGroupCount) ); @@ -101,7 +277,7 @@ std::unique_ptr> Database::LoadNewsgroupList() for (size_t numLoaded = 0; numLoaded != newsGroupCount; ++numLoaded) { NntpListEntry entry; - m_newsGroupFileInput >> entry; + m_newsGroupFileIO >> entry; result->emplace_back(entry); } return result; @@ -119,19 +295,16 @@ void Database::Open(std::filesystem::path dbPath) void Database::OpenNewsGroupFile() { - if (m_newsGroupFileInput.is_open() && m_newsGroupFileOutput.is_open()) + if (m_newsGroupFileIO.is_open() && m_newsGroupFileIO.is_open()) { return; } const std::filesystem::path newsGroupFilePath = m_databasePath / "newsgroups.db"; - if (!m_newsGroupFileInput.is_open()) + if (!m_newsGroupFileIO.is_open()) { - m_newsGroupFileInput.open(newsGroupFilePath, std::ios::binary); - } - if (!m_newsGroupFileOutput.is_open()) - { - m_newsGroupFileOutput.open(newsGroupFilePath, std::ios::binary); + m_newsGroupFileIO.open(newsGroupFilePath, + std::ios::binary | std::ios::in | std::ios::out); } } @@ -139,14 +312,17 @@ void Database::UpdateArticleList( const std::wstring& newsgroup, const std::vector& headers) { - const auto articleFile = GetArticleFilePath(newsgroup); - std::ofstream io; - io.open(articleFile, std::ios::binary); + const auto articleFile = GetArticleFilePath(newsgroup, true); + + LockFile(articleFile); + ScopeExit unlockFile([&](){ + UnlockFile(articleFile); + }); + + SerializableFile io; + io.open(articleFile, std::ios::binary | std::ios::out); const std::uint64_t articleCount = headers.size(); - io.write( - reinterpret_cast(&articleCount), - sizeof(articleCount) - ); + io << articleCount; for (const auto& header: headers) { io << header; @@ -158,22 +334,128 @@ void Database::UpdateNewsgroupList(const std::vector& list) { OpenNewsGroupFile(); - m_newsGroupFileOutput.write( + m_newsGroupFileIO.write( reinterpret_cast(&m_databaseVersion), sizeof(m_databaseVersion) ); const std::uint64_t newsGroupCount = list.size(); - m_newsGroupFileOutput.write( - reinterpret_cast(&newsGroupCount), - sizeof(newsGroupCount) - ); + m_newsGroupFileIO << newsGroupCount; for (const auto& entry: list) { - m_newsGroupFileOutput << entry; + m_newsGroupFileIO << entry; + } + m_newsGroupFileIO.flush(); +} + +void Database::SaveSearchTokens( + std::uint64_t articleID, + const std::string& searchString) +{ + const std::string sstr(searchString); + StringTreeOperation( + sstr, + " ", + m_maxTreeDepth, + [&](const std::string& subToken, const std::string& str){ + try + { + SaveToken(subToken, str, articleID); + } + catch (const SerializeException& e) + { + /// @todo do graceful magic here. + std::cout << "Broken file for \"" << subToken << "\"" << std::endl; + } + } + ); +} + +void Database::SaveToken( + const std::string& subtoken, + const std::string& subject, + std::uint64_t articleID) +{ + if (subtoken == "") return; + if (subtoken == " ") return; + if (subject == "") return; + if (subject == " ") return; + if (HasToken(subtoken, subject, articleID)) return; + const auto path = GetTokenFilePath(subtoken, true); + size_t startPos{0}; + size_t endPos{0}; + size_t count{0}; + ArticleEntry entry{}; + bool found = GetArticleEntry( + subtoken, subject, entry, startPos, endPos, count); + + std::uint64_t entryCount{0}; + + // Update existing? + if (!std::filesystem::exists(path)) + { + // Creating a new token file is pretty simple. + entryCount = 1; + SerializableFile io; + LockFile(path); + ScopeExit unlockFile([&](){ + if (io.is_open()) io.close(); + UnlockFile(path); + }); + io.open(path, std::ios::binary | std::ios::out); + io << entryCount; + entry.searchString = subject; + entry.articleIDs.emplace_back(articleID); + io << entry; + } + else + { + // Updating an existing token file is a bit more complicated. + // See if the search string already exists in the token file. + // Open file. + SerializableFile io; + io.open(path, std::ios::binary | std::ios::in | std::ios::out); + LockFile(path); + ScopeExit unlockFile([&](){ + if (io.is_open()) io.close(); + UnlockFile(path); + }); + // Read entry count. + io >> entryCount; + if (found) + { + /* Seek to the end of the entry and read all following entries. */ + io.seekg(endPos); + std::vector entries; + for (auto i = count; i != entryCount; ++i) + { + ArticleEntry e{}; + io >> e; + entries.emplace_back(e); + } + // Seek back to the start of the existing entry and rewrite it. + io.seekg(startPos); + io << entry; + // Now re-write all following entries. + for (auto& e: entries) + { + io << e; + } + } + else + { + // This is a new search string in the token file, append to the end. + io.seekg(0, std::ios_base::end); + entry.searchString = subject; + entry.articleIDs = {articleID}; + io << entry; + // Increment entry count. + entryCount++; + io.seekg(0); + io << entryCount; + } } - m_newsGroupFileOutput.flush(); } } // namespace usenetsearch diff --git a/src/IoSocket.cpp b/src/IoSocket.cpp index 9b4616c..d6be31f 100644 --- a/src/IoSocket.cpp +++ b/src/IoSocket.cpp @@ -58,6 +58,7 @@ std::string IoSocket::ReadUntil(std::string deliminator) } } } + return result; } } // namespace usenetsearch diff --git a/src/Serialize.cpp b/src/Serialize.cpp new file mode 100644 index 0000000..825ae60 --- /dev/null +++ b/src/Serialize.cpp @@ -0,0 +1,163 @@ +#include +#include +#include + +#include "usenetsearch/Database.h" +#include "usenetsearch/UsenetClient.h" + +#include "usenetsearch/Serialize.h" + +namespace usenetsearch { + +SerializableFile& operator<<( + SerializableFile& out, const std::vector& arr) +{ + const std::uint64_t size = arr.size(); + out << size; + for (const auto value: arr) + { + out << value; + } + return out; +} + +SerializableFile& operator>>( + SerializableFile& in, std::vector& arr) +{ + std::uint64_t size; + in >> size; + for (std::uint64_t n = 0; n != size; ++n) + { + std::uint64_t value; + in >> value; + arr.emplace_back(value); + } + return in; +} + +SerializableFile& operator<<( + SerializableFile& out, const ArticleEntry& obj) +{ + const char preMagic[1]{24}; + out.write(preMagic, sizeof(preMagic)); + out << obj.searchString; + out << obj.articleIDs; + const char postMagic[1]{42}; + out.write(postMagic, sizeof(postMagic)); + return out; +} + +SerializableFile& operator>>(SerializableFile& in, ArticleEntry& obj) +{ + char preMagic[1]{}; + in.read(preMagic, sizeof(preMagic)); + if (preMagic[0] != 24) + { + throw SerializeException(EBADMSG, "Bad magic number in entry header."); + } + in >> obj.searchString; + in >> obj.articleIDs; + char postMagic[1]{}; + in.read(postMagic, sizeof(postMagic)); + if (postMagic[0] != 42) + { + throw SerializeException(EBADMSG, "Bad magic number in entry footer."); + } + return in; +} + +SerializableFile& operator<<(SerializableFile& out, const std::string& str) +{ + const std::uint64_t size = str.size(); + out.write(reinterpret_cast(&size), sizeof(size)); + out.write(reinterpret_cast(str.c_str()), size); + return out; +} + +SerializableFile& operator>>(SerializableFile& in, std::string& str) +{ + std::uint64_t size{0}; + in.read(reinterpret_cast(&size), sizeof(size)); + char buf[size + 1]; + in.read(buf, size); + buf[size] = 0; + str = buf; + return in; +} + +SerializableFile& operator<<(SerializableFile& out, const std::wstring& str) +{ + const std::uint64_t size = str.size(); + out.write(reinterpret_cast(&size), sizeof(size)); + out.write( + reinterpret_cast(str.c_str()), + size * sizeof(wchar_t) + ); + return out; +} + +SerializableFile& operator>>(SerializableFile& in, std::wstring& str) +{ + std::uint64_t size{0}; + in.read(reinterpret_cast(&size), sizeof(size)); + wchar_t buf[size + 1]; + in.read(reinterpret_cast(buf), size * sizeof(wchar_t)); + buf[size] = 0; + str = buf; + return in; +} + +SerializableFile& operator<<(SerializableFile& out, const NntpHeader& obj) +{ + out.write( + reinterpret_cast(&obj.articleID), + sizeof(obj.articleID) + ); + out << obj.subject; + return out; +} + +SerializableFile& operator>>(SerializableFile& in, NntpHeader& obj) +{ + in.read( + reinterpret_cast(&obj.articleID), + sizeof(obj.articleID) + ); + in >> obj.articleID; + in >> obj.subject; + return in; +} + +SerializableFile& operator<<(SerializableFile& out, const NntpListEntry& obj) +{ + out.write(reinterpret_cast(&obj.count), sizeof(obj.count)); + out.write(reinterpret_cast(&obj.high), sizeof(obj.high)); + out.write(reinterpret_cast(&obj.low), sizeof(obj.low)); + out << obj.name; + out << obj.status; + return out; +} + +SerializableFile& operator>>(SerializableFile& in, NntpListEntry& obj) +{ + in.read(reinterpret_cast(&obj.count), sizeof(obj.count)); + in.read(reinterpret_cast(&obj.high), sizeof(obj.high)); + in.read(reinterpret_cast(&obj.low), sizeof(obj.low)); + in >> obj.name; + in >> obj.status; + return in; +} + +SerializableFile& operator<<(SerializableFile& out, const uint64_t& obj) +{ + out.write(reinterpret_cast(&obj), sizeof(obj)); + return out; +} + +SerializableFile& operator>>(SerializableFile& in, uint64_t& obj) +{ + in.read(reinterpret_cast(&obj), sizeof(obj)); + return in; +} + +} // namespace usenetsearch diff --git a/src/StringUtils.cpp b/src/StringUtils.cpp index dbd396b..5a77320 100644 --- a/src/StringUtils.cpp +++ b/src/StringUtils.cpp @@ -30,47 +30,6 @@ namespace usenetsearch { -std::ostream& operator<<(std::ofstream& out, const std::string& str) -{ - const std::uint64_t size = str.size(); - out.write(reinterpret_cast(&size), sizeof(size)); - out.write(reinterpret_cast(str.c_str()), size); - return out; -} - -std::ifstream& operator>>(std::ifstream& in, std::string& str) -{ - std::uint64_t size{0}; - in.read(reinterpret_cast(&size), sizeof(size)); - char buf[size + 1]; - in.read(buf, size); - buf[size] = 0; - str = buf; - return in; -} - -std::ostream& operator<<(std::ofstream& out, const std::wstring& str) -{ - const std::uint64_t size = str.size(); - out.write(reinterpret_cast(&size), sizeof(size)); - out.write( - reinterpret_cast(str.c_str()), - size * sizeof(wchar_t) - ); - return out; -} - -std::ifstream& operator>>(std::ifstream& in, std::wstring& str) -{ - std::uint64_t size{0}; - in.read(reinterpret_cast(&size), sizeof(size)); - wchar_t buf[size + 1]; - in.read(reinterpret_cast(buf), size * sizeof(wchar_t)); - buf[size] = 0; - str = buf; - return in; -} - std::string StringHash(const std::string& input) { unsigned char result[MD5_DIGEST_LENGTH]; @@ -120,4 +79,24 @@ bool StringToBoolean(const std::wstring& str) ); } +void StringTreeOperation( + const std::string& searchString, + const std::string& splitBy, + size_t maxDepth, + std::function Fn) +{ + const auto tokens = StringSplit(searchString, splitBy); + for (auto outerIt = tokens.begin(); outerIt != tokens.end(); outerIt++) + { + for (size_t depth = 1; depth != maxDepth + 1; ++depth) + { + const auto endIt = outerIt + depth; + const auto subset = std::vector(outerIt, endIt); + const auto subToken = StringJoin(subset, splitBy); + Fn(subToken, searchString); + if (endIt == tokens.end()) break; + } + } +} + } // namespace usenetsearch diff --git a/src/ThreadPool.cpp b/src/ThreadPool.cpp new file mode 100644 index 0000000..28b17c6 --- /dev/null +++ b/src/ThreadPool.cpp @@ -0,0 +1,113 @@ +#include "usenetsearch/ThreadPool.h" + +namespace usenetsearch { + +ThreadPool::ThreadPool() +{ + m_threadStates = std::make_shared(); +} + +ThreadPool::~ThreadPool() +{ + m_stopping = true; + JoinThreads(); +} + +void ThreadPool::JoinThreads() +{ + std::lock_guard threadsLock(m_threadsMutex); + std::lock_guard threadStatesLock(m_threadStatesMutex); + for (auto& thread: m_threads) + { + if (thread->joinable()) thread->join(); + } + m_threads.clear(); + m_threadStates->clear(); +} + +void ThreadPool::MaxThreads(std::uint16_t max) +{ + m_maxThreads = max; +} + +void ThreadPool::Queue(std::function fn) +{ + if (m_stopping == true) return; // not accepting new work. + if (m_maxThreads <= 1) + { + // single threaded case. + fn(); + return; + } + // Wait for an available thread. + while (m_threads.size() >= m_maxThreads) + { + //std::lock_guard threadsLock(m_threadsMutex); + auto foundThread = m_threads.end(); + //auto foundStatus = m_threadStates->end(); + for (auto it = m_threads.begin(); it != m_threads.end(); ++it) + { + const auto& t = (*it); + if (!t) continue; + //const auto threadID = t->get_id(); + //if (threadID == std::thread::id{}) continue; + //ThreadStates::iterator statusIt; + //{ + // std::lock_guard tsl(m_threadStatesMutex); + // statusIt = m_threadStates->find(threadID); + //} + //if (statusIt == m_threadStates->end()) continue; + //bool state = statusIt->second; + //if (state == true) + //{ + if (t->joinable()) t->join(); + foundThread = it; + // foundStatus = statusIt; + break; + //} + } + if (foundThread != m_threads.end()) m_threads.erase(foundThread); + /* + { + std::lock_guard tsl(m_threadStatesMutex); + if (foundStatus != m_threadStates->end()) m_threadStates->erase(foundStatus); + } + */ + } + // Spawn a new thread. + auto thread = std::make_unique(fn); + /* + auto thread = std::make_unique( + [](std::function func, std::mutex& tsm, std::shared_ptr ts) + { + if (ts == nullptr) return; + { + std::lock_guard tsl(tsm); + const auto threadID = std::this_thread::get_id(); + if (threadID == std::thread::id{}) return; + ts->emplace(threadID, false); + } + func(); + { + std::lock_guard tsl(tsm); + const auto threadID = std::this_thread::get_id(); + if (threadID != std::thread::id{}) + { + auto it = ts->find(threadID); + if (it == ts->end()) + { + ts->emplace(threadID, true); + } + else + { + (*ts)[threadID] = true; + } + } + } + }, fn, std::ref(m_threadStatesMutex), m_threadStates); + */ + //std::lock_guard threadsLock(m_threadsMutex); + m_threads.emplace_back(std::move(thread)); +} + +} // namespace usenetsearch diff --git a/src/UsenetClient.cpp b/src/UsenetClient.cpp index 1097336..a656e14 100644 --- a/src/UsenetClient.cpp +++ b/src/UsenetClient.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include "usenetsearch/Except.h" @@ -28,46 +29,6 @@ namespace usenetsearch { -// NntpHeader serialization ---------------------------------------------------- - -std::ostream& operator<<(std::ofstream& out, const NntpHeader& obj) -{ - out << obj.articleID; - out << obj.from; - out << obj.subject; - return out; -} - -std::ifstream& operator>>(std::ifstream& in, NntpHeader& obj) -{ - in >> obj.articleID; - in >> obj.from; - in >> obj.subject; - return in; -} - -// NntpListEntry serialization ------------------------------------------------- - -std::ostream& operator<<(std::ofstream& out, const NntpListEntry& obj) -{ - out.write(reinterpret_cast(&obj.count), sizeof(obj.count)); - out.write(reinterpret_cast(&obj.high), sizeof(obj.high)); - out.write(reinterpret_cast(&obj.low), sizeof(obj.low)); - out << obj.name; - out << obj.status; - return out; -} - -std::ifstream& operator>>(std::ifstream& in, NntpListEntry& obj) -{ - in.read(reinterpret_cast(&obj.count), sizeof(obj.count)); - in.read(reinterpret_cast(&obj.high), sizeof(obj.high)); - in.read(reinterpret_cast(&obj.low), sizeof(obj.low)); - in >> obj.name; - in >> obj.status; - return in; -} - // UsenetClient class ---------------------------------------------------------- void UsenetClient::Authenticate( @@ -82,7 +43,7 @@ void UsenetClient::Authenticate( throw UsenetClientException( response.code, "Error authenticating with NNTP server: " - + m_conv.to_bytes(response.message) + + response.message ); } // Send password @@ -93,7 +54,7 @@ void UsenetClient::Authenticate( throw UsenetClientException( response.code, "Error authenticating with NNTP server: " - + m_conv.to_bytes(response.message) + + response.message ); } } @@ -130,29 +91,29 @@ void UsenetClient::Connect( throw UsenetClientException( serverHello.code, "Error received from NNTP server: " - + m_conv.to_bytes(serverHello.message) + + serverHello.message ); } } -void UsenetClient::Group(const std::wstring& groupName) +void UsenetClient::Group(const std::wstring& newsgroup) { // Send user name - Write(L"GROUP " + groupName + L"\r\n"); + Write(L"GROUP " + newsgroup + L"\r\n"); auto response = ReadLine(); if (IsError(response)) { throw UsenetClientException( response.code, - "Error changing group to " + m_conv.to_bytes(groupName) + " : " - + m_conv.to_bytes(response.message) + "Error changing group to " + m_conv.to_bytes(newsgroup) + " : " + + response.message ); } } -NntpHeader UsenetClient::Head(const std::wstring& articleID) +NntpHeader UsenetClient::Head(std::uint64_t articleID) { - Write(L"HEAD " + articleID + L"\r\n"); + Write(L"HEAD " + std::to_wstring(articleID) + L"\r\n"); /* Typical response is a code 221 followed by the headers ending with a line containing a period by itself. */ auto response = ReadLine(); @@ -161,26 +122,22 @@ NntpHeader UsenetClient::Head(const std::wstring& articleID) throw UsenetClientException( response.code, "Error getting headers for article id " - + m_conv.to_bytes(articleID) - + " : " + m_conv.to_bytes(response.message) + + std::to_string(articleID) + + " : " + response.message ); } - const auto headerLinesStr = ReadUntil(L"\r\n.\r\n"); + const auto headerLinesStr = ReadUntil("\r\n.\r\n"); // parse the headers. - const auto lines = StringSplit(headerLinesStr, std::wstring{L"\r\n"}); + const auto lines = StringSplit(headerLinesStr, std::string{"\r\n"}); NntpHeader result; result.articleID = articleID; for (const auto& line: lines) { - const auto kvp = StringSplit(line, std::wstring{L":"}, 2); + const auto kvp = StringSplit(line, std::string{":"}, 2); if (kvp.size() != 2) continue; // Bad header line? const auto key = StringToLower(StringTrim(kvp[0])); const auto value = StringTrim(kvp[1]); - if (key == L"from") - { - result.from = value; - } - else if (key == L"subject") + if (key == "subject") { result.subject = value; } @@ -188,6 +145,62 @@ NntpHeader UsenetClient::Head(const std::wstring& articleID) return result; } +void UsenetClient::ProcessHeaders( + std::uint64_t startMessage, + std::function)> processFn, + std::uint64_t batchSize) +{ + Write(L"XHDR subject " + std::to_wstring(startMessage) + L"-\r\n"); + /* Typical response is a code 221 followed by the headers ending with a + line containing a period by itself. */ + const auto response = ReadLine(); + if (IsError(response)) + { + throw UsenetClientException( + response.code, + "Error getting headers: " + + response.message + ); + } + NntpHeaders headers; + std::mutex headersLock; + while (true) + { + const auto line = StringTrim(ReadUntil("\r\n")); + if (line == ".") break; + // parse the headers. + const auto kvp = StringSplit(line, std::string{" "}, 2); + if (kvp.size() != 2) continue; // Bad header line? + const std::uint64_t articleID = std::stoul(StringTrim(kvp[0])); + const auto subject = StringTrim(kvp[1]); + NntpHeader hdr; + hdr.articleID = articleID; + hdr.subject = subject; + { + std::lock_guard lock(headersLock); + headers.emplace_back(hdr); + } + if (headers.size() >= batchSize) + { + auto headersToPass = std::make_shared( + headers.begin(), + headers.end() + ); + processFn(std::move(headersToPass)); + headers.clear(); + } + } + // Left over headers. + if (!headers.empty()) + { + auto headersToPass = std::make_shared( + headers.begin(), + headers.end() + ); + processFn(std::move(headersToPass)); + } +} + bool UsenetClient::IsError(const NntpMessage& msg) const { if (msg.code >= 400) return true; @@ -206,21 +219,18 @@ std::unique_ptr> UsenetClient::List() response.code, "Failed to fetch newsgroup list from server, " + std::string{"server responded with: "} - + m_conv.to_bytes(response.message) + + response.message ); } - const auto listStr = ReadUntil(L"\r\n.\r\n"); + const auto listStr = ReadUntil("\r\n.\r\n"); // parse the list. - auto lines = StringSplit(listStr, std::wstring{L"\r\n"}); + auto lines = StringSplit(listStr, std::string{"\r\n"}); auto result = std::make_unique>(); if (lines.empty()) return result; - // remove the last line (the ending period). - lines.pop_back(); - lines.pop_back(); for (const auto& line: lines) { NntpListEntry entry; - const auto fields = StringSplit(line, std::wstring{L" "}); + const auto fields = StringSplit(line, std::string{" "}); if (fields.size() == 5) { entry.name = fields[0]; @@ -234,7 +244,8 @@ std::unique_ptr> UsenetClient::List() return result; } -std::unique_ptr> UsenetClient::ListGroup(const std::wstring& newsGroup) +std::unique_ptr> UsenetClient::ListGroup( + const std::wstring& newsGroup) { Write(L"LISTGROUP " + newsGroup + L"\r\n"); /* In response, we should get a 211 response followed by the list of @@ -246,20 +257,17 @@ std::unique_ptr> UsenetClient::ListGroup(const std::ws response.code, "Failed to fetch newsgroup list from server, " + std::string{"server responded with: "} - + m_conv.to_bytes(response.message) + + response.message ); } - const auto listStr = ReadUntil(L"\r\n.\r\n"); + const auto listStr = ReadUntil("\r\n.\r\n"); // parse the list. - auto lines = StringSplit(listStr, std::wstring{L"\r\n"}); - auto result = std::make_unique>(); + auto lines = StringSplit(listStr, std::string{"\r\n"}); + auto result = std::make_unique>(); if (lines.empty()) return result; - // Discard the last 2 line (the ending period + blank line) - lines.pop_back(); - lines.pop_back(); for (const auto& line: lines) { - result->emplace_back(StringTrim(line)); + result->emplace_back(stoul(StringTrim(line))); } return result; } @@ -267,41 +275,32 @@ std::unique_ptr> UsenetClient::ListGroup(const std::ws NntpMessage UsenetClient::ReadLine() { NntpMessage result{}; - std::wstring line; - line = ReadUntil(L"\r\n"); - if (line.length() < 4) + std::string line; + line = ReadUntil("\r\n"); + if (line.length() < 2) { throw UsenetSearchException(EPROTONOSUPPORT, "NNTP protocol error - invalid response from server: " - + m_conv.to_bytes(line)); + + line); } - std::wstring codeStr = line.substr(0, 3); + std::string codeStr = line.substr(0, 3); result.code = std::stoi(codeStr); result.message = line.substr(4, line.length()); return result; } -std::wstring UsenetClient::ReadUntil(const std::wstring& deliminator) +std::string UsenetClient::ReadUntil(const std::string& deliminator) { - std::wstring result; - const std::string deliminatorStr = m_conv.to_bytes(deliminator); - std::string resultStr; + std::string result; if (m_useSSL) { - resultStr = m_ssl->ReadUntil(deliminatorStr); + result = m_ssl->ReadUntil(deliminator); } else { - resultStr = m_tcp->ReadUntil(deliminatorStr); - } - try - { - result = m_conv.from_bytes(resultStr); - } - catch (const std::range_error& e) - { - return result; + result = m_tcp->ReadUntil(deliminator); } + result.erase(result.size() - deliminator.size(), deliminator.size()); return result; } diff --git a/src/main.cpp b/src/main.cpp index 12c74b5..49a85bb 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -15,25 +15,33 @@ along with UsenetSearch. If not, see . */ +#include +#include #include +#include #include +#include #include "usenetsearch/Configuration.h" #include "usenetsearch/Database.h" #include "usenetsearch/Except.h" #include "usenetsearch/StringUtils.h" +#include "usenetsearch/ThreadPool.h" #include "usenetsearch/UsenetClient.h" using namespace usenetsearch; void Usage(const std::string& programName) { + std::cout << "UsenetSearch - usenet search indexer" << std::endl; + std::cout << "Copyright© 2021 John Sennesael" << std::endl << std::endl; + std::cout << "Usage:" << std::endl << std::endl; std::cout << programName; std::cout << "\t"; std::cout << "[-c ] "; std::cout << "[-h] " << std::endl << std::endl; std::cout << "-c \tSets configuration file to use" << std::endl; - std::cout << "-h\tShow help (this text)." << std::endl; + std::cout << "-h\t\tShow help (this text)." << std::endl; std::cout << std::endl; } @@ -41,10 +49,7 @@ int main(int argc, char* argv[]) { std::cout.setf(std::ios::unitbuf); - std::cout << "UsenetSearch - usenet search indexer" << std::endl; - std::cout << "Copyright© 2021 John Sennesael" << std::endl << std::endl; - - std::string configFile{"config.json"}; + std::string configFile{"usenetsearch.conf"}; // Parse args. for (int argn = 1; argn != argc; ++argn) @@ -77,11 +82,11 @@ int main(int argc, char* argv[]) db.Open(config.DatabasePath()); // Start nntp client. + ThreadPool threads; UsenetClient client; std::wstring_convert> conv; try { - client.Connect( config.NNTPServerHost(), config.NNTPServerPort(), @@ -93,33 +98,99 @@ int main(int argc, char* argv[]) ); // BEGIN TEMPORARY TEST CODE -/* - const auto list = client.List(); - db.UpdateNewsgroupList(list); - std::cout << "Number of newsgroups in newsgroup list (saved): " - << list.size() << std::endl; + std::wstring_convert> conv; + std::unique_ptr> list; +// try +// { +// list = db.LoadNewsgroupList(); +// } +// catch (const DatabaseException& e) +// { +// // noop +// } - const auto listLoaded = db.LoadNewsgroupList(); - std::cout << "Number of newsgroups in newsgroup list (loaded): " - << listLoaded->size() << std::endl; -*/ - const std::wstring newsgroup = L"comp.os.os2.comm"; - client.Group(newsgroup); - const auto articles = client.ListGroup(newsgroup); - std::vector headers; - std::wcout << L"Reading headers in " << newsgroup << std::endl; - for (const auto& articleID: *articles) + NntpListEntry e{}; + e.count = 100; + +// 1001 headers +// e.name = "comp.os.os2.comm"; + +// 2541 headers +// e.name = "borland.public.cppbuilder.commandlinetools"; + +// 100026 headers (1859952 K) (1816.35 M) + e.name = "dk.videnskab"; +// 1000437 headers +// e.name = "alt.bible"; + +// a million or so, but this one is very slow because all subjects look the +// same, so everything goes to the same token index, which means we're +// constantly waiting on a file lock. +// e.name = "usenetserver.test"; + + list = std::make_unique>(); + list->emplace_back(e); + if ((list == nullptr) || (list->empty())) { - NntpHeader header = client.Head(articleID); - headers.emplace_back(header); - std::cout << "."; + std::cout << "Getting newsgroup list..."; + std::cout.flush(); + list = client.List(); + db.UpdateNewsgroupList(*list); + std::cout << "DONE." << std::endl; std::cout.flush(); } - std::cout << "DONE." << std::endl; - std::cout << "Updating database..." << std::endl; + std::cout << "Number of newsgroups in newsgroup: " + << list->size() << std::endl; std::cout.flush(); - db.UpdateArticleList(L"comp.os.os2.comm", headers); - + const size_t batchSize = config.BatchSize(); + threads.MaxThreads(config.MaxThreads()); + for (const auto& group: *list) + { + const std::wstring newsgroup = conv.from_bytes(group.name); + std::cout << "Setting group to " << group.name << "..."; + std::cout.flush(); + client.Group(newsgroup); + std::cout << "DONE." << std::endl; + std::cout << "Reading headers in " << group.name << " " + << "(.=" << batchSize << " headers)." << std::endl; + std::cout.flush(); + std::atomic headerCount{0}; + std::reference_wrapper dbref = std::ref(db); + client.ProcessHeaders(0, + [&threads, &headerCount, dbref](std::shared_ptr headers){ + threads.Queue([headers, &headerCount, dbref](){ + for (const auto& header: *headers) + { + const std::uint64_t id{header.articleID}; + std::string subject = StringRemove( + StringToLower(header.subject), std::string{"re:"} + ); + subject.erase( + std::remove_if( + subject.begin(), subject.end(), + [](char c){ + if (std::isspace(c)) return false; + if ((c > 65) && (c < 90)) return false; + if ((c > 97) && (c < 122)) return false; + if (c == '\'') return false; + if ((c > 48) && (c < 57)) return false; + return true; + }), subject.end() + ); + dbref.get().SaveSearchTokens(id, subject); + headerCount++; + } + std::cout << "."; + std::cout.flush(); + }); + }, + batchSize + ); + threads.JoinThreads(); + std::cout << "DONE." << std::endl; + std::cout << "Saved " << headerCount << " headers." << std::endl; + std::cout.flush(); + } // END TEMPORARY TEST CODE } diff --git a/usenetsearch.example.conf b/usenetsearch.example.conf index cd4da7b..1a24480 100644 --- a/usenetsearch.example.conf +++ b/usenetsearch.example.conf @@ -7,3 +7,7 @@ nntp_server_use_ssl: no # Index database configuration details database_path: ./db + +# Parallel processing settings +max_threads: 2 +batch_size: 5