btree storage, multithreaded processing

This commit is contained in:
John Sennesael 2021-09-29 18:52:54 -05:00
parent ed2e3a9824
commit 67c829b1bc
18 changed files with 1039 additions and 232 deletions

1
.kdev_ignore Normal file
View File

@ -0,0 +1 @@
build/

View File

@ -18,8 +18,8 @@ cmake_minimum_required(VERSION 3.5)
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -ggdb -fno-omit-frame-pointer -fsanitize=address") set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -ggdb -fno-omit-frame-pointer -fsanitize=address")
set (CMAKE_LINKER_FLAGS_DEBUG "${CMAKE_LINKER_FLAGS_DEBUG} -fno-omit-frame-pointer -fsanitize=address") set (CMAKE_LINKER_FLAGS_DEBUG "${CMAKE_LINKER_FLAGS_DEBUG} -fno-omit-frame-pointer -fsanitize=address")
#set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -ggdb -fno-omit-frame-pointer") #set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -ggdb -fno-omit-frame-pointer -pthread")
#set (CMAKE_LINKER_FLAGS_DEBUG "${CMAKE_LINKER_FLAGS_DEBUG} -fno-omit-frame-pointer") #set (CMAKE_LINKER_FLAGS_DEBUG "${CMAKE_LINKER_FLAGS_DEBUG} -fno-omit-frame-pointer -pthread")
if(NOT DEFINED CMAKE_BUILD_TYPE) if(NOT DEFINED CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE "DEBUG" CACHE STRING "Type of build: Debug|Release") set(CMAKE_BUILD_TYPE "DEBUG" CACHE STRING "Type of build: Debug|Release")
@ -39,9 +39,11 @@ add_executable(usenetsearch
"src/Except.cpp" "src/Except.cpp"
"src/IoSocket.cpp" "src/IoSocket.cpp"
"src/main.cpp" "src/main.cpp"
"src/Serialize.cpp"
"src/SSLConnection.cpp" "src/SSLConnection.cpp"
"src/StringUtils.cpp" "src/StringUtils.cpp"
"src/TcpConnection.cpp" "src/TcpConnection.cpp"
"src/ThreadPool.cpp"
"src/UsenetClient.cpp" "src/UsenetClient.cpp"
) )

View File

@ -34,7 +34,8 @@ struct ConfigurationException: public UsenetSearchException
class Configuration class Configuration
{ {
std::uint16_t m_batchSize{1};
std::uint16_t m_maxThreads{1};
std::string m_nntpServerHost{"127.0.0.1"}; std::string m_nntpServerHost{"127.0.0.1"};
std::string m_nntpServerPassword{"password"}; std::string m_nntpServerPassword{"password"};
int m_nntpServerPort{119}; int m_nntpServerPort{119};
@ -44,7 +45,9 @@ class Configuration
public: public:
std::uint16_t BatchSize() const;
std::filesystem::path DatabasePath() const; std::filesystem::path DatabasePath() const;
std::uint16_t MaxThreads() const;
std::string NNTPServerHost() const; std::string NNTPServerHost() const;
std::string NNTPServerPassword() const; std::string NNTPServerPassword() const;
int NNTPServerPort() const; int NNTPServerPort() const;

View File

@ -23,14 +23,28 @@
#include <fstream> #include <fstream>
#include <locale> #include <locale>
#include <memory> #include <memory>
#include <mutex>
#include <vector> #include <vector>
#include "usenetsearch/Serialize.h"
#include "usenetsearch/UsenetClient.h" #include "usenetsearch/UsenetClient.h"
namespace usenetsearch { namespace usenetsearch {
static constexpr const std::uint64_t DatabaseVersion{1}; static constexpr const std::uint64_t DatabaseVersion{1};
struct ArticleEntry
{
std::vector<std::uint64_t> articleIDs;
std::string searchString;
size_t Size() const;
};
std::fstream& operator<<(std::fstream& out, const ArticleEntry& obj);
std::fstream& operator>>(std::fstream& in, ArticleEntry& obj);
struct DatabaseException: public UsenetSearchException struct DatabaseException: public UsenetSearchException
{ {
DatabaseException(int errorCode, const std::string& message): DatabaseException(int errorCode, const std::string& message):
@ -45,11 +59,40 @@ class Database
std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> m_conv; std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> m_conv;
std::filesystem::path m_databasePath; std::filesystem::path m_databasePath;
std::uint64_t m_databaseVersion{DatabaseVersion}; std::uint64_t m_databaseVersion{DatabaseVersion};
std::ifstream m_newsGroupFileInput; std::vector<std::filesystem::path> m_lockedFiles;
std::ofstream m_newsGroupFileOutput; std::mutex m_lockedFilesMutex;
std::uint8_t m_maxTreeDepth{5};
SerializableFile m_newsGroupFileIO;
std::filesystem::path GetArticleFilePath(const std::wstring& newsgroup); bool GetArticleEntry(
const std::string& subToken,
const std::string& searchString,
ArticleEntry& entry,
size_t& startPosition,
size_t& endPosition,
size_t& count);
std::filesystem::path GetArticleFilePath(
const std::wstring& newsgroup,
bool mkdirs=false
);
std::filesystem::path GetTokenFilePath(
const std::string& token,
bool mkdirs=false
);
bool HasToken(
const std::string& subToken,
const std::string& subject,
std::uint64_t articleID
);
void LockFile(std::filesystem::path file);
void UnlockFile(std::filesystem::path file);
void OpenNewsGroupFile(); void OpenNewsGroupFile();
void SaveToken(
const std::string& subToken,
const std::string& subject,
std::uint64_t articleID
);
public: public:
@ -64,6 +107,10 @@ public:
const std::vector<NntpHeader>& headers const std::vector<NntpHeader>& headers
); );
void UpdateNewsgroupList(const std::vector<NntpListEntry>& list); void UpdateNewsgroupList(const std::vector<NntpListEntry>& list);
void SaveSearchTokens(
std::uint64_t articleID,
const std::string& searchString
);
}; };

View File

@ -0,0 +1,18 @@
#pragma once
#include <functional>
namespace usenetsearch {
class ScopeExit
{
std::function<void()> m_function;
public:
ScopeExit(std::function<void()> fn): m_function(fn){}
~ScopeExit(){ if (m_function) m_function(); }
};
} // namespace usenetsearch

View File

@ -0,0 +1,44 @@
#pragma once
#include <cstdint>
#include <fstream>
#include <vector>
#include "usenetsearch/Except.h"
namespace usenetsearch {
struct ArticleEntry;
struct NntpHeader;
struct NntpListEntry;
struct SerializeException: public UsenetSearchException
{
SerializeException(int errorCode, const std::string& message):
UsenetSearchException(errorCode, message){}
virtual ~SerializeException() = default;
};
class SerializableFile: public std::fstream
{
public:
virtual ~SerializableFile() = default;
};
SerializableFile& operator<<(
SerializableFile& out, const std::vector<std::uint64_t>& arr);
SerializableFile& operator>>(
SerializableFile& in, std::vector<std::uint64_t>& arr);
SerializableFile& operator<<(SerializableFile& out, const ArticleEntry& obj);
SerializableFile& operator>>(SerializableFile& in, ArticleEntry& obj);
SerializableFile& operator<<(SerializableFile& out, const std::string& str);
SerializableFile& operator>>(SerializableFile& in, std::string& str);
SerializableFile& operator<<(SerializableFile& out, const std::wstring& str);
SerializableFile& operator>>(SerializableFile& in, std::wstring& str);
SerializableFile& operator<<(SerializableFile& out, const NntpHeader& obj);
SerializableFile& operator>>(SerializableFile& in, NntpHeader& obj);
SerializableFile& operator<<(SerializableFile& out, const NntpListEntry& obj);
SerializableFile& operator>>(SerializableFile& in, NntpListEntry& obj);
SerializableFile& operator<<(SerializableFile& out, const uint64_t& obj);
SerializableFile& operator>>(SerializableFile& in, uint64_t& obj);
} // namespace usenetsearch

View File

@ -24,11 +24,6 @@
namespace usenetsearch { namespace usenetsearch {
std::ostream& operator<<(std::ofstream& out, const std::string& str);
std::ifstream& operator>>(std::ifstream& in, std::string& str);
std::ostream& operator<<(std::ofstream& out, const std::wstring& str);
std::ifstream& operator>>(std::ifstream& in, std::wstring& str);
struct StringException: public UsenetSearchException struct StringException: public UsenetSearchException
{ {
StringException(int errorCode, const std::string& message): StringException(int errorCode, const std::string& message):
@ -39,6 +34,19 @@ struct StringException: public UsenetSearchException
std::string StringHash(const std::string& input); std::string StringHash(const std::string& input);
template<typename T>
std::string StringJoin(const std::vector<T>& list, const T& delim)
{
T result;
if (list.empty()) return result;
if (list.size() == 1) return list[0];
for (const auto& token: list)
{
result += token + delim;
}
return result.substr(0, result.size() - delim.size());
}
template<typename T> template<typename T>
std::vector<T> StringSplit( std::vector<T> StringSplit(
const T& str, const T& str,
@ -49,11 +57,12 @@ std::vector<T> StringSplit(
size_t pos=0; size_t pos=0;
T s = str; T s = str;
std::vector<T> result; std::vector<T> result;
if (str.empty()) return result;
int tokens = 1; int tokens = 1;
while (((pos = s.find(delim)) != T::npos) while (((pos = s.find(delim)) != T::npos)
and ((max_elem < 0) or (tokens != max_elem))) && ((max_elem < 0) or (tokens != max_elem)))
{ {
result.push_back(s.substr(0,pos)); result.push_back(s.substr(0, pos));
s.erase(0, pos + delim.length()); s.erase(0, pos + delim.length());
tokens++; tokens++;
} }
@ -70,6 +79,18 @@ T StringLeftTrim(const T& str)
return s; return s;
} }
template<typename T>
T StringRemove(const T& subject, const T& toErase)
{
T s(subject);
size_t pos = T::npos;
while ((pos = s.find(toErase) )!= T::npos)
{
s.erase(pos, toErase.length());
}
return s;
}
template<typename T> template<typename T>
T StringRightTrim(const T& str) T StringRightTrim(const T& str)
{ {
@ -100,4 +121,11 @@ T StringToLower(const T& str)
bool StringToBoolean(const std::string& str); bool StringToBoolean(const std::string& str);
bool StringToBoolean(const std::wstring& str); bool StringToBoolean(const std::wstring& str);
void StringTreeOperation(
const std::string& searchString,
const std::string& splitBy,
size_t maxDepth,
std::function<void(const std::string& subToken, const std::string& str)> Fn
);
} // namespace usenetsearch } // namespace usenetsearch

View File

@ -0,0 +1,34 @@
#pragma once
#include <atomic>
#include <functional>
#include <future>
#include <thread>
#include <unordered_map>
#include <vector>
namespace usenetsearch {
class ThreadPool
{
typedef std::unordered_map<std::thread::id, bool> ThreadStates;
typedef std::vector<std::unique_ptr<std::thread>> Threads;
size_t m_maxThreads{2};
std::atomic<bool> m_stopping{false};
Threads m_threads;
std::mutex m_threadsMutex;
std::shared_ptr<ThreadStates> m_threadStates;
std::mutex m_threadStatesMutex;
public:
ThreadPool();
ThreadPool(const ThreadPool&) = delete;
~ThreadPool();
void JoinThreads();
void MaxThreads(std::uint16_t max);
void Queue(std::function<void()> fn);
};
} // namespace usenetsearch

View File

@ -20,6 +20,7 @@
#include <cstdint> #include <cstdint>
#include <codecvt> #include <codecvt>
#include <fstream> #include <fstream>
#include <functional>
#include <locale> #include <locale>
#include <memory> #include <memory>
#include <string> #include <string>
@ -40,34 +41,30 @@ struct UsenetClientException: public UsenetSearchException
struct NntpHeader struct NntpHeader
{ {
std::wstring articleID; std::uint64_t articleID;
std::wstring from; std::string subject;
std::wstring subject;
/* /*
There's a lot more headers, but trying to limit it to the fields we care There's a lot more headers, but trying to limit it to the fields we care
to search to save on the storage space per article (there's going to be to search to save on the storage space per article (there's going to be
a LOT of them). a LOT of them).
*/ */
}; };
std::ostream& operator<<(std::ofstream& out, const NntpHeader& obj); typedef std::vector<NntpHeader> NntpHeaders;
std::ifstream& operator>>(std::ifstream& in, NntpHeader& obj);
struct NntpMessage struct NntpMessage
{ {
std::uint16_t code; std::uint16_t code;
std::wstring message; std::string message;
}; };
struct NntpListEntry struct NntpListEntry
{ {
std::wstring name; std::string name;
std::uint64_t high; std::uint64_t high;
std::uint64_t low; std::uint64_t low;
std::uint64_t count; std::uint64_t count;
std::wstring status; std::string status;
}; };
std::ostream& operator<<(std::ofstream& out, const NntpListEntry& obj);
std::ifstream& operator>>(std::ifstream& in, NntpListEntry& obj);
class UsenetClient class UsenetClient
{ {
@ -79,7 +76,7 @@ class UsenetClient
bool IsError(const NntpMessage& msg) const; bool IsError(const NntpMessage& msg) const;
NntpMessage ReadLine(); NntpMessage ReadLine();
void Write(const std::wstring& message); void Write(const std::wstring& message);
std::wstring ReadUntil(const std::wstring& deliminator); std::string ReadUntil(const std::string& deliminator);
public: public:
@ -88,9 +85,8 @@ public:
* Authenticate * Authenticate
* List() to get a list of newsgroups * List() to get a list of newsgroups
* for every newsgroup: * for every newsgroup:
* ListGroup() to get list of article ID's * XZHDR subject 0-
* for every article ID: * uncompress result.
* Head to get article headers.
*/ */
void Authenticate(const std::wstring& user, const std::wstring& password); void Authenticate(const std::wstring& user, const std::wstring& password);
@ -101,15 +97,19 @@ public:
bool useSSL = false bool useSSL = false
); );
void Group(const std::wstring& groupName); void Group(const std::wstring& newsgroup);
NntpHeader Head(const std::wstring& articleID); NntpHeader Head(std::uint64_t articleID);
void ProcessHeaders(
std::uint64_t startMessage,
std::function<void(std::shared_ptr<NntpHeaders>)> processFn,
std::uint64_t batchSize=100
);
std::unique_ptr<std::vector<NntpListEntry>> List(); std::unique_ptr<std::vector<NntpListEntry>> List();
/* whilst message id's are typically numbers, the rfc states they are unique std::unique_ptr<std::vector<std::uint64_t>> ListGroup(
alphanumeric strings. */
std::unique_ptr<std::vector<std::wstring>> ListGroup(
const std::wstring& newsGroup const std::wstring& newsGroup
); );

View File

@ -25,11 +25,21 @@
namespace usenetsearch { namespace usenetsearch {
std::uint16_t Configuration::BatchSize() const
{
return m_batchSize;
}
std::filesystem::path Configuration::DatabasePath() const std::filesystem::path Configuration::DatabasePath() const
{ {
return m_databasePath; return m_databasePath;
} }
std::uint16_t Configuration::MaxThreads() const
{
return m_maxThreads;
}
std::string Configuration::NNTPServerHost() const std::string Configuration::NNTPServerHost() const
{ {
return m_nntpServerHost; return m_nntpServerHost;
@ -87,10 +97,18 @@ void Configuration::Open(const std::string& filename)
} }
const std::string key = StringToLower(kvp[0]); const std::string key = StringToLower(kvp[0]);
const std::string value = StringTrim(kvp[1]); const std::string value = StringTrim(kvp[1]);
if (key == "database_path") if (key == "batch_size")
{
m_batchSize = stoi(value);
}
else if (key == "database_path")
{ {
m_databasePath = value; m_databasePath = value;
} }
else if (key == "max_threads")
{
m_maxThreads = stoi(value);
}
else if (key == "nntp_server_host") else if (key == "nntp_server_host")
{ {
m_nntpServerHost = value; m_nntpServerHost = value;
@ -101,7 +119,7 @@ void Configuration::Open(const std::string& filename)
} }
else if (key == "nntp_server_port") else if (key == "nntp_server_port")
{ {
m_nntpServerPort = atoi(value.c_str()); m_nntpServerPort = stoi(value);
} }
else if (key == "nntp_server_use_ssl") else if (key == "nntp_server_use_ssl")
{ {

View File

@ -15,35 +15,207 @@
along with UsenetSearch. If not, see <https://www.gnu.org/licenses/>. along with UsenetSearch. If not, see <https://www.gnu.org/licenses/>.
*/ */
#include <iostream>
#include <chrono>
#include <filesystem> #include <filesystem>
#include <fstream> #include <fstream>
#include <string> #include <string>
#include <thread>
#include <vector> #include <vector>
#include "usenetsearch/StringUtils.h" #include "usenetsearch/StringUtils.h"
#include "usenetsearch/UsenetClient.h" #include "usenetsearch/UsenetClient.h"
#include "usenetsearch/ScopeExit.h"
#include "usenetsearch/Serialize.h"
#include "usenetsearch/Database.h" #include "usenetsearch/Database.h"
namespace usenetsearch { namespace usenetsearch {
size_t ArticleEntry::Size() const
{
return sizeof(articleIDs) + (articleIDs.size() * sizeof(std::uint64_t))
+ sizeof(std::uint64_t) + (searchString.size() * sizeof(char));
}
// Database class --------------------------------------------------------------
Database::~Database() Database::~Database()
{ {
if (m_newsGroupFileInput.is_open()) if (m_newsGroupFileIO.is_open())
{ {
m_newsGroupFileInput.close(); m_newsGroupFileIO.close();
}
if (m_newsGroupFileOutput.is_open())
{
m_newsGroupFileOutput.close();
} }
} }
std::filesystem::path Database::GetArticleFilePath( std::filesystem::path Database::GetArticleFilePath(
const std::wstring& newsgroup) const std::wstring& newsgroup, bool mkdirs)
{ {
const auto groupFile = StringHash(m_conv.to_bytes(newsgroup)) + ".db"; const std::string md5 = StringHash(m_conv.to_bytes(newsgroup));
return m_databasePath / groupFile; const std::string tok1 = md5.substr(0, 2);
const std::string tok2 = md5.substr(2, 2);
const std::string tok3 = md5.substr(4, 2);
const std::filesystem::path groupPath = m_databasePath / "articles"
/ tok1 / tok2 / tok3;
if (mkdirs)
{
if (!std::filesystem::exists(groupPath))
{
std::filesystem::create_directories(groupPath);
}
}
const auto groupFile = md5 + ".db";
return groupPath / groupFile;
}
bool Database::GetArticleEntry(
const std::string& subToken,
const std::string& searchString,
ArticleEntry& entry,
size_t& startPosition,
size_t& endPosition,
size_t& count)
{
const auto path = GetTokenFilePath(subToken);
if (!std::filesystem::exists(path)) return false;
SerializableFile io;
LockFile(path);
ScopeExit closeAndUnlockFile([&](){
if (io.is_open()) io.close();
UnlockFile(path);
});
io.open(path, std::ios::binary | std::ios::in);
if (!io.is_open()) return false;
std::uint64_t articleCount{0};
io >> articleCount;
size_t startPos{0};
size_t endPos{0};
for (std::uint64_t i = 0; i != articleCount; ++i)
{
ArticleEntry curEntry{};
startPos = io.tellg();
io >> curEntry;
endPos = io.tellg();
if (curEntry.searchString == searchString)
{
entry = curEntry;
startPosition = startPos;
endPosition = endPos;
count = i + 1;
return true;
}
}
return false;
}
std::filesystem::path Database::GetTokenFilePath(
const std::string& token,
bool mkdirs
)
{
const std::string md5 = StringHash(token);
const std::string tok1 = md5.substr(0, 2);
const std::string tok2 = md5.substr(2, 2);
const std::string tok3 = md5.substr(4, 2);
const std::filesystem::path groupPath = m_databasePath / "tokens"
/ tok1 / tok2 / tok3;
if (mkdirs)
{
if (!std::filesystem::exists(groupPath))
{
std::filesystem::create_directories(groupPath);
}
}
const auto groupFile = md5 + ".db";
return groupPath / groupFile;
}
bool Database::HasToken(
const std::string& subToken,
const std::string& subject,
std::uint64_t articleID)
{
const auto path = GetTokenFilePath(subToken);
if (!std::filesystem::exists(path)) return false;
LockFile(path);
ScopeExit unlockFile([&](){ UnlockFile(path); });
SerializableFile io;
io.open(path, std::ios::binary | std::ios::in);
std::uint64_t articleCount{0};
io.read(reinterpret_cast<char*>(&articleCount), sizeof(articleCount));
std::uint64_t c = 0;
for (std::uint64_t i = 0; i != c; ++i)
{
ArticleEntry entry;
io >> entry;
if (entry.searchString == subject)
{
if (std::find(
entry.articleIDs.begin(),
entry.articleIDs.end(), articleID) != entry.articleIDs.end())
{
return true;
}
}
}
return false;
}
void Database::LockFile(std::filesystem::path file)
{
#if 1
while (true)
{
{
std::lock_guard<std::mutex> lock(m_lockedFilesMutex);
auto it = std::find(m_lockedFiles.begin(), m_lockedFiles.end(), file);
if (it == m_lockedFiles.end()) break;
}
//std::cout << "Waiting on lock: " << file.string() << std::endl;
//std::this_thread::sleep_for(std::chrono::milliseconds{1000});
std::this_thread::sleep_for(std::chrono::milliseconds{20});
}
{
std::lock_guard<std::mutex> lock(m_lockedFilesMutex);
m_lockedFiles.emplace_back(file);
}
#else
const std::filesystem::path lockFilePath = file.string() + ".lock";
while (true)
{
if (!std::filesystem::exists(lockFilePath))
{
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds{20});
}
std::ofstream touch(lockFilePath);
#endif
}
void Database::UnlockFile(std::filesystem::path file)
{
#if 1
std::lock_guard<std::mutex> lock(m_lockedFilesMutex);
auto it = std::find(m_lockedFiles.begin(), m_lockedFiles.end(), file);
if (it != m_lockedFiles.end())
{
m_lockedFiles.erase(it);
}
#else
const std::filesystem::path lockFilePath = file.string() + ".lock";
if (std::filesystem::exists(lockFilePath))
{
std::filesystem::remove(lockFilePath);
}
#endif
} }
std::unique_ptr<std::vector<NntpHeader>> Database::LoadArticleList( std::unique_ptr<std::vector<NntpHeader>> Database::LoadArticleList(
@ -56,8 +228,12 @@ std::unique_ptr<std::vector<NntpHeader>> Database::LoadArticleList(
"No article list found for newsgroup " + m_conv.to_bytes(newsgroup) "No article list found for newsgroup " + m_conv.to_bytes(newsgroup)
); );
} }
std::ifstream io;
io.open(articleFile, std::ios::binary); LockFile(articleFile);
ScopeExit unlockFile([&](){ UnlockFile(articleFile); });
SerializableFile io;
io.open(articleFile, std::ios::binary | std::ios::in);
std::uint64_t articleCount; std::uint64_t articleCount;
io.read( io.read(
reinterpret_cast<char*>(&articleCount), reinterpret_cast<char*>(&articleCount),
@ -79,7 +255,7 @@ std::unique_ptr<std::vector<NntpListEntry>> Database::LoadNewsgroupList()
OpenNewsGroupFile(); OpenNewsGroupFile();
std::uint64_t dbVersion{0}; std::uint64_t dbVersion{0};
m_newsGroupFileInput.read( m_newsGroupFileIO.read(
reinterpret_cast<char*>(&dbVersion), reinterpret_cast<char*>(&dbVersion),
sizeof(dbVersion) sizeof(dbVersion)
); );
@ -92,7 +268,7 @@ std::unique_ptr<std::vector<NntpListEntry>> Database::LoadNewsgroupList()
} }
size_t newsGroupCount{0}; size_t newsGroupCount{0};
m_newsGroupFileInput.read( m_newsGroupFileIO.read(
reinterpret_cast<char*>(&newsGroupCount), reinterpret_cast<char*>(&newsGroupCount),
sizeof(newsGroupCount) sizeof(newsGroupCount)
); );
@ -101,7 +277,7 @@ std::unique_ptr<std::vector<NntpListEntry>> Database::LoadNewsgroupList()
for (size_t numLoaded = 0; numLoaded != newsGroupCount; ++numLoaded) for (size_t numLoaded = 0; numLoaded != newsGroupCount; ++numLoaded)
{ {
NntpListEntry entry; NntpListEntry entry;
m_newsGroupFileInput >> entry; m_newsGroupFileIO >> entry;
result->emplace_back(entry); result->emplace_back(entry);
} }
return result; return result;
@ -119,19 +295,16 @@ void Database::Open(std::filesystem::path dbPath)
void Database::OpenNewsGroupFile() void Database::OpenNewsGroupFile()
{ {
if (m_newsGroupFileInput.is_open() && m_newsGroupFileOutput.is_open()) if (m_newsGroupFileIO.is_open() && m_newsGroupFileIO.is_open())
{ {
return; return;
} }
const std::filesystem::path newsGroupFilePath = const std::filesystem::path newsGroupFilePath =
m_databasePath / "newsgroups.db"; m_databasePath / "newsgroups.db";
if (!m_newsGroupFileInput.is_open()) if (!m_newsGroupFileIO.is_open())
{ {
m_newsGroupFileInput.open(newsGroupFilePath, std::ios::binary); m_newsGroupFileIO.open(newsGroupFilePath,
} std::ios::binary | std::ios::in | std::ios::out);
if (!m_newsGroupFileOutput.is_open())
{
m_newsGroupFileOutput.open(newsGroupFilePath, std::ios::binary);
} }
} }
@ -139,14 +312,17 @@ void Database::UpdateArticleList(
const std::wstring& newsgroup, const std::wstring& newsgroup,
const std::vector<NntpHeader>& headers) const std::vector<NntpHeader>& headers)
{ {
const auto articleFile = GetArticleFilePath(newsgroup); const auto articleFile = GetArticleFilePath(newsgroup, true);
std::ofstream io;
io.open(articleFile, std::ios::binary); LockFile(articleFile);
ScopeExit unlockFile([&](){
UnlockFile(articleFile);
});
SerializableFile io;
io.open(articleFile, std::ios::binary | std::ios::out);
const std::uint64_t articleCount = headers.size(); const std::uint64_t articleCount = headers.size();
io.write( io << articleCount;
reinterpret_cast<const char*>(&articleCount),
sizeof(articleCount)
);
for (const auto& header: headers) for (const auto& header: headers)
{ {
io << header; io << header;
@ -158,22 +334,128 @@ void Database::UpdateNewsgroupList(const std::vector<NntpListEntry>& list)
{ {
OpenNewsGroupFile(); OpenNewsGroupFile();
m_newsGroupFileOutput.write( m_newsGroupFileIO.write(
reinterpret_cast<const char*>(&m_databaseVersion), reinterpret_cast<const char*>(&m_databaseVersion),
sizeof(m_databaseVersion) sizeof(m_databaseVersion)
); );
const std::uint64_t newsGroupCount = list.size(); const std::uint64_t newsGroupCount = list.size();
m_newsGroupFileOutput.write( m_newsGroupFileIO << newsGroupCount;
reinterpret_cast<const char*>(&newsGroupCount),
sizeof(newsGroupCount)
);
for (const auto& entry: list) for (const auto& entry: list)
{ {
m_newsGroupFileOutput << entry; m_newsGroupFileIO << entry;
}
m_newsGroupFileIO.flush();
}
void Database::SaveSearchTokens(
std::uint64_t articleID,
const std::string& searchString)
{
const std::string sstr(searchString);
StringTreeOperation(
sstr,
" ",
m_maxTreeDepth,
[&](const std::string& subToken, const std::string& str){
try
{
SaveToken(subToken, str, articleID);
}
catch (const SerializeException& e)
{
/// @todo do graceful magic here.
std::cout << "Broken file for \"" << subToken << "\"" << std::endl;
}
}
);
}
void Database::SaveToken(
const std::string& subtoken,
const std::string& subject,
std::uint64_t articleID)
{
if (subtoken == "") return;
if (subtoken == " ") return;
if (subject == "") return;
if (subject == " ") return;
if (HasToken(subtoken, subject, articleID)) return;
const auto path = GetTokenFilePath(subtoken, true);
size_t startPos{0};
size_t endPos{0};
size_t count{0};
ArticleEntry entry{};
bool found = GetArticleEntry(
subtoken, subject, entry, startPos, endPos, count);
std::uint64_t entryCount{0};
// Update existing?
if (!std::filesystem::exists(path))
{
// Creating a new token file is pretty simple.
entryCount = 1;
SerializableFile io;
LockFile(path);
ScopeExit unlockFile([&](){
if (io.is_open()) io.close();
UnlockFile(path);
});
io.open(path, std::ios::binary | std::ios::out);
io << entryCount;
entry.searchString = subject;
entry.articleIDs.emplace_back(articleID);
io << entry;
}
else
{
// Updating an existing token file is a bit more complicated.
// See if the search string already exists in the token file.
// Open file.
SerializableFile io;
io.open(path, std::ios::binary | std::ios::in | std::ios::out);
LockFile(path);
ScopeExit unlockFile([&](){
if (io.is_open()) io.close();
UnlockFile(path);
});
// Read entry count.
io >> entryCount;
if (found)
{
/* Seek to the end of the entry and read all following entries. */
io.seekg(endPos);
std::vector<ArticleEntry> entries;
for (auto i = count; i != entryCount; ++i)
{
ArticleEntry e{};
io >> e;
entries.emplace_back(e);
}
// Seek back to the start of the existing entry and rewrite it.
io.seekg(startPos);
io << entry;
// Now re-write all following entries.
for (auto& e: entries)
{
io << e;
}
}
else
{
// This is a new search string in the token file, append to the end.
io.seekg(0, std::ios_base::end);
entry.searchString = subject;
entry.articleIDs = {articleID};
io << entry;
// Increment entry count.
entryCount++;
io.seekg(0);
io << entryCount;
}
} }
m_newsGroupFileOutput.flush();
} }
} // namespace usenetsearch } // namespace usenetsearch

View File

@ -58,6 +58,7 @@ std::string IoSocket::ReadUntil(std::string deliminator)
} }
} }
} }
return result;
} }
} // namespace usenetsearch } // namespace usenetsearch

163
src/Serialize.cpp Normal file
View File

@ -0,0 +1,163 @@
#include <cstdint>
#include <fstream>
#include <vector>
#include "usenetsearch/Database.h"
#include "usenetsearch/UsenetClient.h"
#include "usenetsearch/Serialize.h"
namespace usenetsearch {
SerializableFile& operator<<(
SerializableFile& out, const std::vector<std::uint64_t>& arr)
{
const std::uint64_t size = arr.size();
out << size;
for (const auto value: arr)
{
out << value;
}
return out;
}
SerializableFile& operator>>(
SerializableFile& in, std::vector<std::uint64_t>& arr)
{
std::uint64_t size;
in >> size;
for (std::uint64_t n = 0; n != size; ++n)
{
std::uint64_t value;
in >> value;
arr.emplace_back(value);
}
return in;
}
SerializableFile& operator<<(
SerializableFile& out, const ArticleEntry& obj)
{
const char preMagic[1]{24};
out.write(preMagic, sizeof(preMagic));
out << obj.searchString;
out << obj.articleIDs;
const char postMagic[1]{42};
out.write(postMagic, sizeof(postMagic));
return out;
}
SerializableFile& operator>>(SerializableFile& in, ArticleEntry& obj)
{
char preMagic[1]{};
in.read(preMagic, sizeof(preMagic));
if (preMagic[0] != 24)
{
throw SerializeException(EBADMSG, "Bad magic number in entry header.");
}
in >> obj.searchString;
in >> obj.articleIDs;
char postMagic[1]{};
in.read(postMagic, sizeof(postMagic));
if (postMagic[0] != 42)
{
throw SerializeException(EBADMSG, "Bad magic number in entry footer.");
}
return in;
}
SerializableFile& operator<<(SerializableFile& out, const std::string& str)
{
const std::uint64_t size = str.size();
out.write(reinterpret_cast<const char*>(&size), sizeof(size));
out.write(reinterpret_cast<const char*>(str.c_str()), size);
return out;
}
SerializableFile& operator>>(SerializableFile& in, std::string& str)
{
std::uint64_t size{0};
in.read(reinterpret_cast<char*>(&size), sizeof(size));
char buf[size + 1];
in.read(buf, size);
buf[size] = 0;
str = buf;
return in;
}
SerializableFile& operator<<(SerializableFile& out, const std::wstring& str)
{
const std::uint64_t size = str.size();
out.write(reinterpret_cast<const char*>(&size), sizeof(size));
out.write(
reinterpret_cast<const char*>(str.c_str()),
size * sizeof(wchar_t)
);
return out;
}
SerializableFile& operator>>(SerializableFile& in, std::wstring& str)
{
std::uint64_t size{0};
in.read(reinterpret_cast<char*>(&size), sizeof(size));
wchar_t buf[size + 1];
in.read(reinterpret_cast<char*>(buf), size * sizeof(wchar_t));
buf[size] = 0;
str = buf;
return in;
}
SerializableFile& operator<<(SerializableFile& out, const NntpHeader& obj)
{
out.write(
reinterpret_cast<const char*>(&obj.articleID),
sizeof(obj.articleID)
);
out << obj.subject;
return out;
}
SerializableFile& operator>>(SerializableFile& in, NntpHeader& obj)
{
in.read(
reinterpret_cast<char*>(&obj.articleID),
sizeof(obj.articleID)
);
in >> obj.articleID;
in >> obj.subject;
return in;
}
SerializableFile& operator<<(SerializableFile& out, const NntpListEntry& obj)
{
out.write(reinterpret_cast<const char*>(&obj.count), sizeof(obj.count));
out.write(reinterpret_cast<const char*>(&obj.high), sizeof(obj.high));
out.write(reinterpret_cast<const char*>(&obj.low), sizeof(obj.low));
out << obj.name;
out << obj.status;
return out;
}
SerializableFile& operator>>(SerializableFile& in, NntpListEntry& obj)
{
in.read(reinterpret_cast<char*>(&obj.count), sizeof(obj.count));
in.read(reinterpret_cast<char*>(&obj.high), sizeof(obj.high));
in.read(reinterpret_cast<char*>(&obj.low), sizeof(obj.low));
in >> obj.name;
in >> obj.status;
return in;
}
SerializableFile& operator<<(SerializableFile& out, const uint64_t& obj)
{
out.write(reinterpret_cast<const char*>(&obj), sizeof(obj));
return out;
}
SerializableFile& operator>>(SerializableFile& in, uint64_t& obj)
{
in.read(reinterpret_cast<char*>(&obj), sizeof(obj));
return in;
}
} // namespace usenetsearch

View File

@ -30,47 +30,6 @@
namespace usenetsearch { namespace usenetsearch {
std::ostream& operator<<(std::ofstream& out, const std::string& str)
{
const std::uint64_t size = str.size();
out.write(reinterpret_cast<const char*>(&size), sizeof(size));
out.write(reinterpret_cast<const char*>(str.c_str()), size);
return out;
}
std::ifstream& operator>>(std::ifstream& in, std::string& str)
{
std::uint64_t size{0};
in.read(reinterpret_cast<char*>(&size), sizeof(size));
char buf[size + 1];
in.read(buf, size);
buf[size] = 0;
str = buf;
return in;
}
std::ostream& operator<<(std::ofstream& out, const std::wstring& str)
{
const std::uint64_t size = str.size();
out.write(reinterpret_cast<const char*>(&size), sizeof(size));
out.write(
reinterpret_cast<const char*>(str.c_str()),
size * sizeof(wchar_t)
);
return out;
}
std::ifstream& operator>>(std::ifstream& in, std::wstring& str)
{
std::uint64_t size{0};
in.read(reinterpret_cast<char*>(&size), sizeof(size));
wchar_t buf[size + 1];
in.read(reinterpret_cast<char*>(buf), size * sizeof(wchar_t));
buf[size] = 0;
str = buf;
return in;
}
std::string StringHash(const std::string& input) std::string StringHash(const std::string& input)
{ {
unsigned char result[MD5_DIGEST_LENGTH]; unsigned char result[MD5_DIGEST_LENGTH];
@ -120,4 +79,24 @@ bool StringToBoolean(const std::wstring& str)
); );
} }
void StringTreeOperation(
const std::string& searchString,
const std::string& splitBy,
size_t maxDepth,
std::function<void(const std::string& subToken, const std::string& str)> Fn)
{
const auto tokens = StringSplit(searchString, splitBy);
for (auto outerIt = tokens.begin(); outerIt != tokens.end(); outerIt++)
{
for (size_t depth = 1; depth != maxDepth + 1; ++depth)
{
const auto endIt = outerIt + depth;
const auto subset = std::vector<std::string>(outerIt, endIt);
const auto subToken = StringJoin(subset, splitBy);
Fn(subToken, searchString);
if (endIt == tokens.end()) break;
}
}
}
} // namespace usenetsearch } // namespace usenetsearch

113
src/ThreadPool.cpp Normal file
View File

@ -0,0 +1,113 @@
#include "usenetsearch/ThreadPool.h"
namespace usenetsearch {
ThreadPool::ThreadPool()
{
m_threadStates = std::make_shared<ThreadStates>();
}
ThreadPool::~ThreadPool()
{
m_stopping = true;
JoinThreads();
}
void ThreadPool::JoinThreads()
{
std::lock_guard<std::mutex> threadsLock(m_threadsMutex);
std::lock_guard<std::mutex> threadStatesLock(m_threadStatesMutex);
for (auto& thread: m_threads)
{
if (thread->joinable()) thread->join();
}
m_threads.clear();
m_threadStates->clear();
}
void ThreadPool::MaxThreads(std::uint16_t max)
{
m_maxThreads = max;
}
void ThreadPool::Queue(std::function<void()> fn)
{
if (m_stopping == true) return; // not accepting new work.
if (m_maxThreads <= 1)
{
// single threaded case.
fn();
return;
}
// Wait for an available thread.
while (m_threads.size() >= m_maxThreads)
{
//std::lock_guard<std::mutex> threadsLock(m_threadsMutex);
auto foundThread = m_threads.end();
//auto foundStatus = m_threadStates->end();
for (auto it = m_threads.begin(); it != m_threads.end(); ++it)
{
const auto& t = (*it);
if (!t) continue;
//const auto threadID = t->get_id();
//if (threadID == std::thread::id{}) continue;
//ThreadStates::iterator statusIt;
//{
// std::lock_guard<std::mutex> tsl(m_threadStatesMutex);
// statusIt = m_threadStates->find(threadID);
//}
//if (statusIt == m_threadStates->end()) continue;
//bool state = statusIt->second;
//if (state == true)
//{
if (t->joinable()) t->join();
foundThread = it;
// foundStatus = statusIt;
break;
//}
}
if (foundThread != m_threads.end()) m_threads.erase(foundThread);
/*
{
std::lock_guard<std::mutex> tsl(m_threadStatesMutex);
if (foundStatus != m_threadStates->end()) m_threadStates->erase(foundStatus);
}
*/
}
// Spawn a new thread.
auto thread = std::make_unique<std::thread>(fn);
/*
auto thread = std::make_unique<std::thread>(
[](std::function<void()> func, std::mutex& tsm, std::shared_ptr<ThreadStates> ts)
{
if (ts == nullptr) return;
{
std::lock_guard<std::mutex> tsl(tsm);
const auto threadID = std::this_thread::get_id();
if (threadID == std::thread::id{}) return;
ts->emplace(threadID, false);
}
func();
{
std::lock_guard<std::mutex> tsl(tsm);
const auto threadID = std::this_thread::get_id();
if (threadID != std::thread::id{})
{
auto it = ts->find(threadID);
if (it == ts->end())
{
ts->emplace(threadID, true);
}
else
{
(*ts)[threadID] = true;
}
}
}
}, fn, std::ref(m_threadStatesMutex), m_threadStates);
*/
//std::lock_guard<std::mutex> threadsLock(m_threadsMutex);
m_threads.emplace_back(std::move(thread));
}
} // namespace usenetsearch

View File

@ -19,6 +19,7 @@
#include <fstream> #include <fstream>
#include <locale> #include <locale>
#include <memory> #include <memory>
#include <mutex>
#include <string> #include <string>
#include "usenetsearch/Except.h" #include "usenetsearch/Except.h"
@ -28,46 +29,6 @@
namespace usenetsearch { namespace usenetsearch {
// NntpHeader serialization ----------------------------------------------------
std::ostream& operator<<(std::ofstream& out, const NntpHeader& obj)
{
out << obj.articleID;
out << obj.from;
out << obj.subject;
return out;
}
std::ifstream& operator>>(std::ifstream& in, NntpHeader& obj)
{
in >> obj.articleID;
in >> obj.from;
in >> obj.subject;
return in;
}
// NntpListEntry serialization -------------------------------------------------
std::ostream& operator<<(std::ofstream& out, const NntpListEntry& obj)
{
out.write(reinterpret_cast<const char*>(&obj.count), sizeof(obj.count));
out.write(reinterpret_cast<const char*>(&obj.high), sizeof(obj.high));
out.write(reinterpret_cast<const char*>(&obj.low), sizeof(obj.low));
out << obj.name;
out << obj.status;
return out;
}
std::ifstream& operator>>(std::ifstream& in, NntpListEntry& obj)
{
in.read(reinterpret_cast<char*>(&obj.count), sizeof(obj.count));
in.read(reinterpret_cast<char*>(&obj.high), sizeof(obj.high));
in.read(reinterpret_cast<char*>(&obj.low), sizeof(obj.low));
in >> obj.name;
in >> obj.status;
return in;
}
// UsenetClient class ---------------------------------------------------------- // UsenetClient class ----------------------------------------------------------
void UsenetClient::Authenticate( void UsenetClient::Authenticate(
@ -82,7 +43,7 @@ void UsenetClient::Authenticate(
throw UsenetClientException( throw UsenetClientException(
response.code, response.code,
"Error authenticating with NNTP server: " "Error authenticating with NNTP server: "
+ m_conv.to_bytes(response.message) + response.message
); );
} }
// Send password // Send password
@ -93,7 +54,7 @@ void UsenetClient::Authenticate(
throw UsenetClientException( throw UsenetClientException(
response.code, response.code,
"Error authenticating with NNTP server: " "Error authenticating with NNTP server: "
+ m_conv.to_bytes(response.message) + response.message
); );
} }
} }
@ -130,29 +91,29 @@ void UsenetClient::Connect(
throw UsenetClientException( throw UsenetClientException(
serverHello.code, serverHello.code,
"Error received from NNTP server: " "Error received from NNTP server: "
+ m_conv.to_bytes(serverHello.message) + serverHello.message
); );
} }
} }
void UsenetClient::Group(const std::wstring& groupName) void UsenetClient::Group(const std::wstring& newsgroup)
{ {
// Send user name // Send user name
Write(L"GROUP " + groupName + L"\r\n"); Write(L"GROUP " + newsgroup + L"\r\n");
auto response = ReadLine(); auto response = ReadLine();
if (IsError(response)) if (IsError(response))
{ {
throw UsenetClientException( throw UsenetClientException(
response.code, response.code,
"Error changing group to " + m_conv.to_bytes(groupName) + " : " "Error changing group to " + m_conv.to_bytes(newsgroup) + " : "
+ m_conv.to_bytes(response.message) + response.message
); );
} }
} }
NntpHeader UsenetClient::Head(const std::wstring& articleID) NntpHeader UsenetClient::Head(std::uint64_t articleID)
{ {
Write(L"HEAD " + articleID + L"\r\n"); Write(L"HEAD " + std::to_wstring(articleID) + L"\r\n");
/* Typical response is a code 221 followed by the headers ending with a /* Typical response is a code 221 followed by the headers ending with a
line containing a period by itself. */ line containing a period by itself. */
auto response = ReadLine(); auto response = ReadLine();
@ -161,26 +122,22 @@ NntpHeader UsenetClient::Head(const std::wstring& articleID)
throw UsenetClientException( throw UsenetClientException(
response.code, response.code,
"Error getting headers for article id " "Error getting headers for article id "
+ m_conv.to_bytes(articleID) + std::to_string(articleID)
+ " : " + m_conv.to_bytes(response.message) + " : " + response.message
); );
} }
const auto headerLinesStr = ReadUntil(L"\r\n.\r\n"); const auto headerLinesStr = ReadUntil("\r\n.\r\n");
// parse the headers. // parse the headers.
const auto lines = StringSplit(headerLinesStr, std::wstring{L"\r\n"}); const auto lines = StringSplit(headerLinesStr, std::string{"\r\n"});
NntpHeader result; NntpHeader result;
result.articleID = articleID; result.articleID = articleID;
for (const auto& line: lines) for (const auto& line: lines)
{ {
const auto kvp = StringSplit(line, std::wstring{L":"}, 2); const auto kvp = StringSplit(line, std::string{":"}, 2);
if (kvp.size() != 2) continue; // Bad header line? if (kvp.size() != 2) continue; // Bad header line?
const auto key = StringToLower(StringTrim(kvp[0])); const auto key = StringToLower(StringTrim(kvp[0]));
const auto value = StringTrim(kvp[1]); const auto value = StringTrim(kvp[1]);
if (key == L"from") if (key == "subject")
{
result.from = value;
}
else if (key == L"subject")
{ {
result.subject = value; result.subject = value;
} }
@ -188,6 +145,62 @@ NntpHeader UsenetClient::Head(const std::wstring& articleID)
return result; return result;
} }
void UsenetClient::ProcessHeaders(
std::uint64_t startMessage,
std::function<void(std::shared_ptr<NntpHeaders>)> processFn,
std::uint64_t batchSize)
{
Write(L"XHDR subject " + std::to_wstring(startMessage) + L"-\r\n");
/* Typical response is a code 221 followed by the headers ending with a
line containing a period by itself. */
const auto response = ReadLine();
if (IsError(response))
{
throw UsenetClientException(
response.code,
"Error getting headers: "
+ response.message
);
}
NntpHeaders headers;
std::mutex headersLock;
while (true)
{
const auto line = StringTrim(ReadUntil("\r\n"));
if (line == ".") break;
// parse the headers.
const auto kvp = StringSplit(line, std::string{" "}, 2);
if (kvp.size() != 2) continue; // Bad header line?
const std::uint64_t articleID = std::stoul(StringTrim(kvp[0]));
const auto subject = StringTrim(kvp[1]);
NntpHeader hdr;
hdr.articleID = articleID;
hdr.subject = subject;
{
std::lock_guard<std::mutex> lock(headersLock);
headers.emplace_back(hdr);
}
if (headers.size() >= batchSize)
{
auto headersToPass = std::make_shared<NntpHeaders>(
headers.begin(),
headers.end()
);
processFn(std::move(headersToPass));
headers.clear();
}
}
// Left over headers.
if (!headers.empty())
{
auto headersToPass = std::make_shared<NntpHeaders>(
headers.begin(),
headers.end()
);
processFn(std::move(headersToPass));
}
}
bool UsenetClient::IsError(const NntpMessage& msg) const bool UsenetClient::IsError(const NntpMessage& msg) const
{ {
if (msg.code >= 400) return true; if (msg.code >= 400) return true;
@ -206,21 +219,18 @@ std::unique_ptr<std::vector<NntpListEntry>> UsenetClient::List()
response.code, response.code,
"Failed to fetch newsgroup list from server, " "Failed to fetch newsgroup list from server, "
+ std::string{"server responded with: "} + std::string{"server responded with: "}
+ m_conv.to_bytes(response.message) + response.message
); );
} }
const auto listStr = ReadUntil(L"\r\n.\r\n"); const auto listStr = ReadUntil("\r\n.\r\n");
// parse the list. // parse the list.
auto lines = StringSplit(listStr, std::wstring{L"\r\n"}); auto lines = StringSplit(listStr, std::string{"\r\n"});
auto result = std::make_unique<std::vector<NntpListEntry>>(); auto result = std::make_unique<std::vector<NntpListEntry>>();
if (lines.empty()) return result; if (lines.empty()) return result;
// remove the last line (the ending period).
lines.pop_back();
lines.pop_back();
for (const auto& line: lines) for (const auto& line: lines)
{ {
NntpListEntry entry; NntpListEntry entry;
const auto fields = StringSplit(line, std::wstring{L" "}); const auto fields = StringSplit(line, std::string{" "});
if (fields.size() == 5) if (fields.size() == 5)
{ {
entry.name = fields[0]; entry.name = fields[0];
@ -234,7 +244,8 @@ std::unique_ptr<std::vector<NntpListEntry>> UsenetClient::List()
return result; return result;
} }
std::unique_ptr<std::vector<std::wstring>> UsenetClient::ListGroup(const std::wstring& newsGroup) std::unique_ptr<std::vector<std::uint64_t>> UsenetClient::ListGroup(
const std::wstring& newsGroup)
{ {
Write(L"LISTGROUP " + newsGroup + L"\r\n"); Write(L"LISTGROUP " + newsGroup + L"\r\n");
/* In response, we should get a 211 response followed by the list of /* In response, we should get a 211 response followed by the list of
@ -246,20 +257,17 @@ std::unique_ptr<std::vector<std::wstring>> UsenetClient::ListGroup(const std::ws
response.code, response.code,
"Failed to fetch newsgroup list from server, " "Failed to fetch newsgroup list from server, "
+ std::string{"server responded with: "} + std::string{"server responded with: "}
+ m_conv.to_bytes(response.message) + response.message
); );
} }
const auto listStr = ReadUntil(L"\r\n.\r\n"); const auto listStr = ReadUntil("\r\n.\r\n");
// parse the list. // parse the list.
auto lines = StringSplit(listStr, std::wstring{L"\r\n"}); auto lines = StringSplit(listStr, std::string{"\r\n"});
auto result = std::make_unique<std::vector<std::wstring>>(); auto result = std::make_unique<std::vector<std::uint64_t>>();
if (lines.empty()) return result; if (lines.empty()) return result;
// Discard the last 2 line (the ending period + blank line)
lines.pop_back();
lines.pop_back();
for (const auto& line: lines) for (const auto& line: lines)
{ {
result->emplace_back(StringTrim(line)); result->emplace_back(stoul(StringTrim(line)));
} }
return result; return result;
} }
@ -267,41 +275,32 @@ std::unique_ptr<std::vector<std::wstring>> UsenetClient::ListGroup(const std::ws
NntpMessage UsenetClient::ReadLine() NntpMessage UsenetClient::ReadLine()
{ {
NntpMessage result{}; NntpMessage result{};
std::wstring line; std::string line;
line = ReadUntil(L"\r\n"); line = ReadUntil("\r\n");
if (line.length() < 4) if (line.length() < 2)
{ {
throw UsenetSearchException(EPROTONOSUPPORT, throw UsenetSearchException(EPROTONOSUPPORT,
"NNTP protocol error - invalid response from server: " "NNTP protocol error - invalid response from server: "
+ m_conv.to_bytes(line)); + line);
} }
std::wstring codeStr = line.substr(0, 3); std::string codeStr = line.substr(0, 3);
result.code = std::stoi(codeStr); result.code = std::stoi(codeStr);
result.message = line.substr(4, line.length()); result.message = line.substr(4, line.length());
return result; return result;
} }
std::wstring UsenetClient::ReadUntil(const std::wstring& deliminator) std::string UsenetClient::ReadUntil(const std::string& deliminator)
{ {
std::wstring result; std::string result;
const std::string deliminatorStr = m_conv.to_bytes(deliminator);
std::string resultStr;
if (m_useSSL) if (m_useSSL)
{ {
resultStr = m_ssl->ReadUntil(deliminatorStr); result = m_ssl->ReadUntil(deliminator);
} }
else else
{ {
resultStr = m_tcp->ReadUntil(deliminatorStr); result = m_tcp->ReadUntil(deliminator);
}
try
{
result = m_conv.from_bytes(resultStr);
}
catch (const std::range_error& e)
{
return result;
} }
result.erase(result.size() - deliminator.size(), deliminator.size());
return result; return result;
} }

View File

@ -15,25 +15,33 @@
along with UsenetSearch. If not, see <https://www.gnu.org/licenses/>. along with UsenetSearch. If not, see <https://www.gnu.org/licenses/>.
*/ */
#include <atomic>
#include <codecvt>
#include <iostream> #include <iostream>
#include <locale>
#include <memory> #include <memory>
#include <thread>
#include "usenetsearch/Configuration.h" #include "usenetsearch/Configuration.h"
#include "usenetsearch/Database.h" #include "usenetsearch/Database.h"
#include "usenetsearch/Except.h" #include "usenetsearch/Except.h"
#include "usenetsearch/StringUtils.h" #include "usenetsearch/StringUtils.h"
#include "usenetsearch/ThreadPool.h"
#include "usenetsearch/UsenetClient.h" #include "usenetsearch/UsenetClient.h"
using namespace usenetsearch; using namespace usenetsearch;
void Usage(const std::string& programName) void Usage(const std::string& programName)
{ {
std::cout << "UsenetSearch - usenet search indexer" << std::endl;
std::cout << "Copyright© 2021 John Sennesael" << std::endl << std::endl;
std::cout << "Usage:" << std::endl << std::endl;
std::cout << programName; std::cout << programName;
std::cout << "\t"; std::cout << "\t";
std::cout << "[-c <config filename>] "; std::cout << "[-c <config filename>] ";
std::cout << "[-h] " << std::endl << std::endl; std::cout << "[-h] " << std::endl << std::endl;
std::cout << "-c <file>\tSets configuration file to use" << std::endl; std::cout << "-c <file>\tSets configuration file to use" << std::endl;
std::cout << "-h\tShow help (this text)." << std::endl; std::cout << "-h\t\tShow help (this text)." << std::endl;
std::cout << std::endl; std::cout << std::endl;
} }
@ -41,10 +49,7 @@ int main(int argc, char* argv[])
{ {
std::cout.setf(std::ios::unitbuf); std::cout.setf(std::ios::unitbuf);
std::cout << "UsenetSearch - usenet search indexer" << std::endl; std::string configFile{"usenetsearch.conf"};
std::cout << "Copyright© 2021 John Sennesael" << std::endl << std::endl;
std::string configFile{"config.json"};
// Parse args. // Parse args.
for (int argn = 1; argn != argc; ++argn) for (int argn = 1; argn != argc; ++argn)
@ -77,11 +82,11 @@ int main(int argc, char* argv[])
db.Open(config.DatabasePath()); db.Open(config.DatabasePath());
// Start nntp client. // Start nntp client.
ThreadPool threads;
UsenetClient client; UsenetClient client;
std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> conv; std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> conv;
try try
{ {
client.Connect( client.Connect(
config.NNTPServerHost(), config.NNTPServerHost(),
config.NNTPServerPort(), config.NNTPServerPort(),
@ -93,33 +98,99 @@ int main(int argc, char* argv[])
); );
// BEGIN TEMPORARY TEST CODE // BEGIN TEMPORARY TEST CODE
/* std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> conv;
const auto list = client.List(); std::unique_ptr<std::vector<NntpListEntry>> list;
db.UpdateNewsgroupList(list); // try
std::cout << "Number of newsgroups in newsgroup list (saved): " // {
<< list.size() << std::endl; // list = db.LoadNewsgroupList();
// }
// catch (const DatabaseException& e)
// {
// // noop
// }
const auto listLoaded = db.LoadNewsgroupList(); NntpListEntry e{};
std::cout << "Number of newsgroups in newsgroup list (loaded): " e.count = 100;
<< listLoaded->size() << std::endl;
*/ // 1001 headers
const std::wstring newsgroup = L"comp.os.os2.comm"; // e.name = "comp.os.os2.comm";
client.Group(newsgroup);
const auto articles = client.ListGroup(newsgroup); // 2541 headers
std::vector<NntpHeader> headers; // e.name = "borland.public.cppbuilder.commandlinetools";
std::wcout << L"Reading headers in " << newsgroup << std::endl;
for (const auto& articleID: *articles) // 100026 headers (1859952 K) (1816.35 M)
e.name = "dk.videnskab";
// 1000437 headers
// e.name = "alt.bible";
// a million or so, but this one is very slow because all subjects look the
// same, so everything goes to the same token index, which means we're
// constantly waiting on a file lock.
// e.name = "usenetserver.test";
list = std::make_unique<std::vector<NntpListEntry>>();
list->emplace_back(e);
if ((list == nullptr) || (list->empty()))
{ {
NntpHeader header = client.Head(articleID); std::cout << "Getting newsgroup list...";
headers.emplace_back(header); std::cout.flush();
std::cout << "."; list = client.List();
db.UpdateNewsgroupList(*list);
std::cout << "DONE." << std::endl;
std::cout.flush(); std::cout.flush();
} }
std::cout << "DONE." << std::endl; std::cout << "Number of newsgroups in newsgroup: "
std::cout << "Updating database..." << std::endl; << list->size() << std::endl;
std::cout.flush(); std::cout.flush();
db.UpdateArticleList(L"comp.os.os2.comm", headers); const size_t batchSize = config.BatchSize();
threads.MaxThreads(config.MaxThreads());
for (const auto& group: *list)
{
const std::wstring newsgroup = conv.from_bytes(group.name);
std::cout << "Setting group to " << group.name << "...";
std::cout.flush();
client.Group(newsgroup);
std::cout << "DONE." << std::endl;
std::cout << "Reading headers in " << group.name << " "
<< "(.=" << batchSize << " headers)." << std::endl;
std::cout.flush();
std::atomic<std::uint64_t> headerCount{0};
std::reference_wrapper<Database> dbref = std::ref(db);
client.ProcessHeaders(0,
[&threads, &headerCount, dbref](std::shared_ptr<NntpHeaders> headers){
threads.Queue([headers, &headerCount, dbref](){
for (const auto& header: *headers)
{
const std::uint64_t id{header.articleID};
std::string subject = StringRemove(
StringToLower(header.subject), std::string{"re:"}
);
subject.erase(
std::remove_if(
subject.begin(), subject.end(),
[](char c){
if (std::isspace(c)) return false;
if ((c > 65) && (c < 90)) return false;
if ((c > 97) && (c < 122)) return false;
if (c == '\'') return false;
if ((c > 48) && (c < 57)) return false;
return true;
}), subject.end()
);
dbref.get().SaveSearchTokens(id, subject);
headerCount++;
}
std::cout << ".";
std::cout.flush();
});
},
batchSize
);
threads.JoinThreads();
std::cout << "DONE." << std::endl;
std::cout << "Saved " << headerCount << " headers." << std::endl;
std::cout.flush();
}
// END TEMPORARY TEST CODE // END TEMPORARY TEST CODE
} }

View File

@ -7,3 +7,7 @@ nntp_server_use_ssl: no
# Index database configuration details # Index database configuration details
database_path: ./db database_path: ./db
# Parallel processing settings
max_threads: 2
batch_size: 5