Implement reading headers

This commit is contained in:
John Sennesael 2021-09-21 19:19:13 -05:00
parent 60271b8de3
commit ed2e3a9824
8 changed files with 188 additions and 68 deletions

View File

@ -54,14 +54,14 @@ class Database
public:
~Database();
std::unique_ptr<std::vector<std::wstring>> LoadArticleList(
std::unique_ptr<std::vector<NntpHeader>> LoadArticleList(
const std::wstring& newsgroup
);
std::unique_ptr<std::vector<NntpListEntry>> LoadNewsgroupList();
void Open(std::filesystem::path dbPath);
void UpdateArticleList(
const std::wstring& newsgroup,
const std::vector<std::wstring>& articleIds
const std::vector<NntpHeader>& headers
);
void UpdateNewsgroupList(const std::vector<NntpListEntry>& list);

View File

@ -42,7 +42,7 @@ public:
protected:
std::chrono::milliseconds m_connectionTimeout{10000};
std::chrono::milliseconds m_ioTimeout{5000};
std::chrono::milliseconds m_ioTimeout{0};
};

View File

@ -40,8 +40,17 @@ struct UsenetClientException: public UsenetSearchException
struct NntpHeader
{
std::wstring articleID;
std::wstring from;
std::wstring subject;
/*
There's a lot more headers, but trying to limit it to the fields we care
to search to save on the storage space per article (there's going to be
a LOT of them).
*/
};
std::ostream& operator<<(std::ofstream& out, const NntpHeader& obj);
std::ifstream& operator>>(std::ifstream& in, NntpHeader& obj);
struct NntpMessage
{
@ -74,13 +83,28 @@ class UsenetClient
public:
/* Expected flow:
* Connect
* Authenticate
* List() to get a list of newsgroups
* for every newsgroup:
* ListGroup() to get list of article ID's
* for every article ID:
* Head to get article headers.
*/
void Authenticate(const std::wstring& user, const std::wstring& password);
void Connect(
const std::string& host,
std::uint16_t port,
bool useSSL = false
);
void Group(const std::wstring& groupName);
NntpHeader Head(const std::wstring& articleID);
std::unique_ptr<std::vector<NntpListEntry>> List();
/* whilst message id's are typically numbers, the rfc states they are unique
@ -89,9 +113,6 @@ public:
const std::wstring& newsGroup
);
// use the ListGroup <newsgroup> command to get a list of article id's in a newsgroup
// then use the HEAD <article id> command to get the headers for each article
};
} // namespace usenetsearch

View File

@ -46,7 +46,7 @@ std::filesystem::path Database::GetArticleFilePath(
return m_databasePath / groupFile;
}
std::unique_ptr<std::vector<std::wstring>> Database::LoadArticleList(
std::unique_ptr<std::vector<NntpHeader>> Database::LoadArticleList(
const std::wstring& newsgroup)
{
const auto articleFile = GetArticleFilePath(newsgroup);
@ -63,12 +63,12 @@ std::unique_ptr<std::vector<std::wstring>> Database::LoadArticleList(
reinterpret_cast<char*>(&articleCount),
sizeof(articleCount)
);
auto result = std::make_unique<std::vector<std::wstring>>();
auto result = std::make_unique<std::vector<NntpHeader>>();
for (std::uint64_t i = 0; i != articleCount; ++i)
{
std::wstring articleId;
io >> articleId;
result->emplace_back(articleId);
NntpHeader header;
io >> header;
result->emplace_back(header);
}
io.close();
return result;
@ -137,19 +137,19 @@ void Database::OpenNewsGroupFile()
void Database::UpdateArticleList(
const std::wstring& newsgroup,
const std::vector<std::wstring>& articleIds)
const std::vector<NntpHeader>& headers)
{
const auto articleFile = GetArticleFilePath(newsgroup);
std::ofstream io;
io.open(articleFile, std::ios::binary);
const std::uint64_t articleCount = articleIds.size();
const std::uint64_t articleCount = headers.size();
io.write(
reinterpret_cast<const char*>(&articleCount),
sizeof(articleCount)
);
for (const auto& id: articleIds)
for (const auto& header: headers)
{
io << id;
io << header;
}
io.close();
}

View File

@ -125,14 +125,18 @@ std::string SSLConnection::Read(size_t amount)
{
std::string buffer;
buffer.resize(amount - result.size());
const auto currentTime = std::chrono::system_clock::now();
const auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
currentTime - startTime
);
if (timeDelta > m_ioTimeout) return result;
ERR_clear_error();
int bytesRead = SSL_read(m_ssl.get(), &buffer[0], buffer.size());
if (bytesRead == 0)
{
if (m_ioTimeout == std::chrono::seconds{0}) return result;
const auto currentTime = std::chrono::system_clock::now();
const auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
currentTime - startTime
);
if (timeDelta > m_ioTimeout) return result;
}
if (bytesRead > 0)
{
buffer.resize(bytesRead);
@ -159,18 +163,6 @@ void SSLConnection::Write(const std::string& data)
const auto startTime = std::chrono::system_clock::now();
while(true)
{
const auto currentTime = std::chrono::system_clock::now();
const auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
currentTime - startTime
);
if (timeDelta > m_ioTimeout)
{
throw SSLException(
ETIMEDOUT,
"Timed out while trying to write to SSL connection."
);
}
ERR_clear_error();
int bytesWritten = SSL_write(
m_ssl.get(),
@ -181,6 +173,23 @@ void SSLConnection::Write(const std::string& data)
{
return; // If we wrote the entire buffer, we're done.
}
else if (bytesWritten == 0)
{
const auto currentTime = std::chrono::system_clock::now();
const auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
currentTime - startTime
);
if (
(timeDelta > m_ioTimeout)
|| (m_ioTimeout == std::chrono::milliseconds{0}) )
{
throw SSLException(
ETIMEDOUT,
"Timed out while trying to write to SSL connection."
);
}
}
else if (bytesWritten > 0)
{
// If we wrote a partial buffer, pop off what we wrote, try again.

View File

@ -133,14 +133,18 @@ std::string TcpConnection::Read(size_t amount)
{
std::string buffer;
buffer.resize(amount);
const auto currentTime = std::chrono::system_clock::now();
const auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
currentTime - startTime
);
if (timeDelta > m_ioTimeout) break;
const auto bytesRead = read(m_fd, &buffer[0], buffer.size());
if (bytesRead >= 0)
if (bytesRead == 0)
{
if (m_ioTimeout == std::chrono::milliseconds{0}) break;
const auto currentTime = std::chrono::system_clock::now();
const auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
currentTime - startTime
);
if (timeDelta > m_ioTimeout) break;
}
else if (bytesRead >= 0)
{
buffer.resize(bytesRead);
result += buffer;
@ -163,19 +167,23 @@ void TcpConnection::Write(const std::string& data)
std::string buffer(data);
while(true)
{
const auto currentTime = std::chrono::system_clock::now();
const auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
currentTime - startTime
);
if (timeDelta > m_ioTimeout)
{
throw SocketException(ETIMEDOUT,
"Timed out writing to TCP socket."
);
}
auto bytesWritten = write(m_fd, &buffer[0], buffer.size());
if (bytesWritten >= 0)
if (bytesWritten == 0)
{
const auto currentTime = std::chrono::system_clock::now();
const auto timeDelta =
std::chrono::duration_cast<std::chrono::milliseconds>(
currentTime - startTime
);
if ((timeDelta > m_ioTimeout)
|| m_ioTimeout == std::chrono::milliseconds{0})
{
throw SocketException(ETIMEDOUT,
"Timed out writing to TCP socket."
);
}
}
else if (bytesWritten >= 0)
{
if (bytesWritten > buffer.size()) bytesWritten = buffer.size();
buffer.erase(0, bytesWritten);

View File

@ -28,6 +28,24 @@
namespace usenetsearch {
// NntpHeader serialization ----------------------------------------------------
std::ostream& operator<<(std::ofstream& out, const NntpHeader& obj)
{
out << obj.articleID;
out << obj.from;
out << obj.subject;
return out;
}
std::ifstream& operator>>(std::ifstream& in, NntpHeader& obj)
{
in >> obj.articleID;
in >> obj.from;
in >> obj.subject;
return in;
}
// NntpListEntry serialization -------------------------------------------------
std::ostream& operator<<(std::ofstream& out, const NntpListEntry& obj)
@ -132,6 +150,44 @@ void UsenetClient::Group(const std::wstring& groupName)
}
}
NntpHeader UsenetClient::Head(const std::wstring& articleID)
{
Write(L"HEAD " + articleID + L"\r\n");
/* Typical response is a code 221 followed by the headers ending with a
line containing a period by itself. */
auto response = ReadLine();
if (IsError(response))
{
throw UsenetClientException(
response.code,
"Error getting headers for article id "
+ m_conv.to_bytes(articleID)
+ " : " + m_conv.to_bytes(response.message)
);
}
const auto headerLinesStr = ReadUntil(L"\r\n.\r\n");
// parse the headers.
const auto lines = StringSplit(headerLinesStr, std::wstring{L"\r\n"});
NntpHeader result;
result.articleID = articleID;
for (const auto& line: lines)
{
const auto kvp = StringSplit(line, std::wstring{L":"}, 2);
if (kvp.size() != 2) continue; // Bad header line?
const auto key = StringToLower(StringTrim(kvp[0]));
const auto value = StringTrim(kvp[1]);
if (key == L"from")
{
result.from = value;
}
else if (key == L"subject")
{
result.subject = value;
}
}
return result;
}
bool UsenetClient::IsError(const NntpMessage& msg) const
{
if (msg.code >= 400) return true;
@ -155,8 +211,12 @@ std::unique_ptr<std::vector<NntpListEntry>> UsenetClient::List()
}
const auto listStr = ReadUntil(L"\r\n.\r\n");
// parse the list.
const auto lines = StringSplit(listStr, std::wstring{L"\r\n"});
auto lines = StringSplit(listStr, std::wstring{L"\r\n"});
auto result = std::make_unique<std::vector<NntpListEntry>>();
if (lines.empty()) return result;
// remove the last line (the ending period).
lines.pop_back();
lines.pop_back();
for (const auto& line: lines)
{
NntpListEntry entry;
@ -191,8 +251,12 @@ std::unique_ptr<std::vector<std::wstring>> UsenetClient::ListGroup(const std::ws
}
const auto listStr = ReadUntil(L"\r\n.\r\n");
// parse the list.
const auto lines = StringSplit(listStr, std::wstring{L"\r\n"});
auto lines = StringSplit(listStr, std::wstring{L"\r\n"});
auto result = std::make_unique<std::vector<std::wstring>>();
if (lines.empty()) return result;
// Discard the last 2 line (the ending period + blank line)
lines.pop_back();
lines.pop_back();
for (const auto& line: lines)
{
result->emplace_back(StringTrim(line));
@ -230,7 +294,14 @@ std::wstring UsenetClient::ReadUntil(const std::wstring& deliminator)
{
resultStr = m_tcp->ReadUntil(deliminatorStr);
}
result = m_conv.from_bytes(resultStr);
try
{
result = m_conv.from_bytes(resultStr);
}
catch (const std::range_error& e)
{
return result;
}
return result;
}

View File

@ -24,7 +24,7 @@
#include "usenetsearch/StringUtils.h"
#include "usenetsearch/UsenetClient.h"
using usenetsearch::StringStartsWith;
using namespace usenetsearch;
void Usage(const std::string& programName)
{
@ -39,6 +39,7 @@ void Usage(const std::string& programName)
int main(int argc, char* argv[])
{
std::cout.setf(std::ios::unitbuf);
std::cout << "UsenetSearch - usenet search indexer" << std::endl;
std::cout << "Copyright© 2021 John Sennesael" << std::endl << std::endl;
@ -70,17 +71,17 @@ int main(int argc, char* argv[])
}
// Read config, setup db
usenetsearch::Configuration config;
Configuration config;
config.Open(configFile);
usenetsearch::Database db;
Database db;
db.Open(config.DatabasePath());
// Start nntp client.
usenetsearch::UsenetClient client;
UsenetClient client;
std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> conv;
try
{
client.Connect(
config.NNTPServerHost(),
config.NNTPServerPort(),
@ -102,17 +103,27 @@ int main(int argc, char* argv[])
std::cout << "Number of newsgroups in newsgroup list (loaded): "
<< listLoaded->size() << std::endl;
*/
const auto articles = client.ListGroup(L"comp.os.os2.comm");
std::cout << "Saving " << articles->size() << " articles." << std::endl;
db.UpdateArticleList(L"comp.os.os2.comm", *articles);
const auto loadedArticles = db.LoadArticleList(L"comp.os.os2.comm");
std::cout << "Loaded " << loadedArticles->size() << " artices." << std::endl;
const std::wstring newsgroup = L"comp.os.os2.comm";
client.Group(newsgroup);
const auto articles = client.ListGroup(newsgroup);
std::vector<NntpHeader> headers;
std::wcout << L"Reading headers in " << newsgroup << std::endl;
for (const auto& articleID: *articles)
{
NntpHeader header = client.Head(articleID);
headers.emplace_back(header);
std::cout << ".";
std::cout.flush();
}
std::cout << "DONE." << std::endl;
std::cout << "Updating database..." << std::endl;
std::cout.flush();
db.UpdateArticleList(L"comp.os.os2.comm", headers);
// END TEMPORARY TEST CODE
}
catch (const usenetsearch::UsenetSearchException& e)
catch (const UsenetSearchException& e)
{
std::cerr << e.what() << std::endl;;
return 1;