persistent database for incremental fetching

This commit is contained in:
AdamK2003 2023-07-02 13:51:44 +02:00
parent dcd6eeac51
commit 98b33378c7
No known key found for this signature in database
GPG Key ID: 656E9DB475B09927
11 changed files with 1789 additions and 63 deletions

1
.gitignore vendored
View File

@ -2,6 +2,7 @@
/*.json
/*.db
/*.log
# dependencies
node_modules

View File

@ -1,9 +1,15 @@
global:
logLevel: info # pino log level
logFileLevel: debug # pino log level for the log files
contact: "https://your.instance.tld/you" # optional (but recommended), contact URL to be added to user-agent
# inputMaxRPS: 1 # optional, I HIGHLY recommend not increasing this, defaults to 1rps
fetchRetries: 3 # optional, defaults to 3, the amount of times the program will retry if one or more outputs have failed fetches
closeRetries: 3 # optional, defaults to 3, the amount of times the program will retry if closing one or more outputs fails
# inputMaxRPS: 1 # optional, I HIGHLY recommend not increasing this, defaults to 1rps
dbPath: ./posts.db # path to persistent sqlite3 database, defaults to ./posts.db, you can use `:memory:` if you don't want persistence
input:
skip: false # skip already fetched posts while fetching data from input instances? (explanation in readme)
skipInstances: # list of output instances to skip if the above is enabled - use database identifiers (output name, unless specified otherwise)
- your.instance.tld
outputs:
- type: masto # mastodon (or MastoAPI compatible) instance
enabled: true # optional, works for any output, output will be skipped if false, true if omitted; alternative to commenting out entries
@ -27,6 +33,7 @@ outputs:
options:
instance: fakerelay.domain.tld # required, your FakeRelay domain
token: "yourTokenHere" # required, your FakeRelay API key for the desired instance
dbName: your.instance.tld # will be used as the instance id in the persistent database, set to your instance domain to prevent duplicate entries with `masto`
maxRPS: 7 # default for FakeRelay is 5, you can probably go higher than with a `masto` output on the same server
directives: # those are described in more detail in the readme
- instances:

View File

@ -18,7 +18,7 @@
const getPosts = async (requests, eventEmitter, loggerInstance) => {
const getPosts = async (requests, eventEmitter, loggerInstance, db, options) => {
let logger = loggerInstance.child({function: 'getPosts'});
@ -30,7 +30,7 @@ const getPosts = async (requests, eventEmitter, loggerInstance) => {
// logger.info(requests[instance].requests[request])
getTimelinePosts(instance, requests[instance].requests[request], requests[instance].client, eventEmitter, logger);
getTimelinePosts(instance, requests[instance].requests[request], requests[instance].client, eventEmitter, logger, db, options);
// add posts to posts array
@ -50,7 +50,7 @@ const getPosts = async (requests, eventEmitter, loggerInstance) => {
const getTimelinePosts = async (instance, request, client, eventEmitter, loggerInstance) => {
const getTimelinePosts = async (instance, request, client, eventEmitter, loggerInstance, db, options) => {
let logger = loggerInstance.child({function: 'getTimelinePosts'});
@ -92,7 +92,7 @@ const getTimelinePosts = async (instance, request, client, eventEmitter, loggerI
}
getNextPage(instance, path, preservedParamsStr, client, maxId, count, eventEmitter, logger);
getNextPage(instance, path, preservedParamsStr, client, maxId, count, eventEmitter, logger, db, options);
// logger.info(maxId);
@ -101,7 +101,7 @@ const getTimelinePosts = async (instance, request, client, eventEmitter, loggerI
}
const getNextPage = async (instance, path, params, client, maxId, count, eventEmitter, loggerInstance) => {
const getNextPage = async (instance, path, params, client, maxId, count, eventEmitter, loggerInstance, db, options) => {
let logger;
@ -137,7 +137,33 @@ const getNextPage = async (instance, path, params, client, maxId, count, eventEm
let posts = [];
let users = [];
let skipped = false;
for (let post of response.data) {
let dbPosts = [];
if(options?.skip) {
let skipInstances = options?.skipInstances
dbPosts = await db.all("SELECT * FROM fetched WHERE object = ? and instance in (?) and status = 'success'", [post.url, skipInstances.join(',')]);
logger.trace(dbPosts)
}
if(dbPosts.length > 0 && options?.skip) {
logger.debug(`Skipping ${post.url} on ${instance} as it already exists on ${dbPosts[0].instance}`);
skipped = true;
continue;
}
posts.push(post.url);
let user = post.account.acct;
if(!user.startsWith('@')) {
@ -148,7 +174,9 @@ const getNextPage = async (instance, path, params, client, maxId, count, eventEm
}
}
if(posts.length === 0) {
// logger.warn(posts)
if(posts.length === 0 && !skipped) {
logger.info(`No more posts found on ${instance}${path}`);
return;
}
@ -171,7 +199,7 @@ const getNextPage = async (instance, path, params, client, maxId, count, eventEm
if(newCount <= 0) return;
getNextPage(instance, path, params, client, lastId, newCount, eventEmitter, logger);
getNextPage(instance, path, params, client, lastId, newCount, eventEmitter, logger, db, options);
}

View File

@ -23,19 +23,47 @@ const fs = require('fs');
const axiosRetry = require('axios-retry');
const rateLimit = require('axios-rate-limit');
const SQLite3 = require('node-sqlite3') // WHY do I need an extra wrapper for async/await support??
const config = YAML.parse(fs.readFileSync('./config.yml', 'utf8'));
logger = require('pino')({
level: config.global?.logLevel || 'info',
transport: {
target: 'pino-pretty',
options: {
colorize: true,
ignore: 'pid,hostname'
const dbPath = config.global?.dbPath || './posts.db';
global.runTimestamp = Date.now();
const db = new SQLite3(dbPath)
db.open()
db.run('create table if not exists fetched (object text, status text, instance text, type text, runTimestamp integer, constraint pk_obj_inst primary key (object, instance))')
const pino = require('pino');
const transports = pino.transport({
targets: [
{
level: config.global?.logLevel || 'info',
target: 'pino-pretty',
options: {
colorize: true,
ignore: 'pid,hostname'
}
},
{
level: config.global?.logFileLevel || 'info',
target: 'pino/file',
options: {
destination: './masto-backfill.log',
}
}
}
]
})
logger = pino(transports)
const EventEmitter = require('events');
const events = new EventEmitter();
@ -87,7 +115,7 @@ for (let output of config.outputs) {
logger.info(`Adding output ${output.name}`)
let outputType = outputGenerators[output.type];
outputs[`${output.type}-${output.name}`] = outputType.init(output.name, logger, output.options, config.global);
outputs[`${output.type}-${output.name}`] = outputType.init(output.name, logger, output.options, config.global, db);
logger.trace(outputs[`${output.type}-${output.name}`])
}
@ -105,7 +133,7 @@ events.on('fetchUserRoutesComplete', (requestsOutput) => {
getPosts(requests, events, logger);
getPosts(requests, events, logger, db, config.input);
})
@ -118,7 +146,7 @@ events.on('newFetchables', (posts) => {
for (let post of posts) {
outputs[output].fetch(post)
outputs[output].fetch(post, db)
}
@ -135,6 +163,7 @@ fetchUserRoutes(requests, events, logger);
let closed = false;
let closeFailed = false;
let errorCount = 0;
let fetchRetries = config.global?.fetchRetries || 3;
let closeRetries = config.global?.closeRetries || 3;
@ -146,7 +175,9 @@ process.on('beforeExit', async () => {
if(fetchRetries <= 0) break;
let errorCount = outputs[output].retry();
errorCount = await outputs[output].retry(db);
if(errorCount > 0) {
logger.warn(`Output ${outputs[output].name} failed to fetch ${errorCount} objects`)
@ -166,6 +197,7 @@ process.on('beforeExit', async () => {
closeFailed = false;
if(closed) return;
logger.info('Closing outputs')
for (let output in outputs) {
let success = outputs[output].close();
@ -174,6 +206,8 @@ process.on('beforeExit', async () => {
logger.warn(`Failed to close output ${outputs[output].name}`)
}
}
// await db.close();
if(closeFailed) {
logger.warn(`Failed to close some outputs, ${closeRetries} retries remaining`)
@ -190,5 +224,10 @@ process.on('beforeExit', async () => {
logger.info('Outputs closed')
closed = true
process.exit(0)
})
process.on('exit', () => {
db.close();
})

View File

@ -30,6 +30,7 @@ const FakeRelayOutput = new OutputInterface(
function (name, logger, options, globalOptions) {
this.name = name;
this.dbName = options.dbName || name;
this.instanceName = options.instance;
this.logger = logger.child({ output: this.outputName, name: name });
if(options?.logLevel) this.logger.level = options.logLevel;
@ -67,23 +68,25 @@ const FakeRelayOutput = new OutputInterface(
this.client = client;
this.fetched = new Set();
this.errors = new Set();
this.fetchedCount = 0;
this.errorsCount = 0;
return this;
},
async function (query, options) {
async function (query, db, options) {
if(this.fetched.has(query)) {
this.logger.debug(`Already fetched ${query} on ${this.name}`);
return true;
}
if(query.startsWith('@')) {
this.logger.debug(`User fetching not supported on ${this.name}`);
return false;
return true;
}
let dbResponse = await db.all("SELECT * FROM fetched WHERE object = ? and instance = ? and status = 'success'", [query, `${this.dbName}`]);
if(dbResponse.length > 0) {
this.logger.debug(`Already fetched ${query} on ${this.name}`);
return true;
}
@ -99,13 +102,16 @@ const FakeRelayOutput = new OutputInterface(
this.logger.debug(`Fetched ${query} on ${this.name}`); // there's gonna be a LOT of that, so I'm making it debug
this.fetched.add(query);
await db.all("INSERT INTO fetched (object, status, instance, type, runTimestamp) VALUES (?, 'success', ?, ?, ?) ON CONFLICT(object,instance) DO UPDATE SET status = 'success'", [query, `${this.dbName}`, this.outputName, global.runTimestamp]);
this.fetchedCount++;
if(this.fetchedCount % 20 == 0) this.logger.info(`Progress: ${this.fetchedCount} posts on ${this.name}`);
return true;
} catch (e) {
this.logger.warn(`Error fetching ${query} on ${this.name}; error: ${e}`);
this.errors.add(query);
await db.all("INSERT INTO fetched (object, status, instance, type, runTimestamp) VALUES (?, 'failed', ?, ?, ?) ON CONFLICT(object,instance) DO UPDATE SET status = 'failed'",
[query, `${this.dbName}`, this.outputName, global.runTimestamp]); // if unsuccessful
this.errorsCount++;
return false;
}
@ -114,7 +120,7 @@ const FakeRelayOutput = new OutputInterface(
// No cleanup needed
return true;
},
async function () {
async function (db) {
// will be called if error count > 0, should retry any failed fetches, return amount of failed fetches
if(this.errorsCount == 0) return 0;
@ -123,14 +129,14 @@ const FakeRelayOutput = new OutputInterface(
let errors = [...this.errors];
this.errors.clear();
let errors = await db.all("SELECT * FROM fetched WHERE status = 'error' and instance = ? and type = ?", [`${this.dbName}`, this.outputName])
let errorsCount = this.errorsCount;
this.errorsCount = 0;
for (let item of errors) {
this.fetch(item);
this.fetch(item.object, db);
}
return errorsCount;

View File

@ -28,6 +28,7 @@ const JSONOutput = new OutputInterface(
// will be called once when the output is initialized, should return a new instance of the output (`this`)
this.name = name;
this.dbName = options.dbName || name;
this.logger = logger.child({output: this.outputName, name: name});
if(options?.logLevel) this.logger.level = options.logLevel;
@ -35,7 +36,7 @@ const JSONOutput = new OutputInterface(
this.file = options.file;
this.fetched = new Set();
this.fetched = new Set(); // we don't really need the db for this? maybe later
this.posts = new Set();
this.users = new Set();
// this.errors = new Set();
@ -43,7 +44,7 @@ const JSONOutput = new OutputInterface(
return this;
},
async function (query, options) {
async function (query, db, options) {
// will be called for each post/user, should return true/false for whether the write was successful
if(this.fetched.has(query)) {

View File

@ -27,6 +27,7 @@ const DummyLoggerOutput = new OutputInterface(
// will be called once when the output is initialized, should return a new instance of the output (`this`)
this.name = name;
this.dbName = options.dbName || `log-${name}`;
this.logger = logger.child({output: 'log', name: name});
if(options?.logLevel) this.logger.level = options.logLevel;
@ -39,26 +40,32 @@ const DummyLoggerOutput = new OutputInterface(
return this;
},
async function (query, options) {
async function (query, db, options) {
// will be called for each post/user, should return true/false for whether the write was successful
if (this.fetched.has(query)) {
let dbResponse = await db.all("SELECT * FROM fetched WHERE object = ? and instance = ? and status = 'success'", [query, `${this.dbName}`]);
logger.trace(dbResponse)
if(dbResponse.length > 0) {
this.logger.debug(`Dummy logger output ${this.name} already fetched query ${query}`);
} else {
this.logger.info(`Dummy logger output ${this.name} called with query ${query}`);
this.fetched.add(query);
return true;
}
this.logger.info(`Dummy logger output ${this.name} called with query ${query}`);
// this.fetched.add(query);
await db.all("INSERT INTO fetched (object, status, instance, type, runTimestamp) VALUES (?, 'success', ?, ?, ?) ON CONFLICT(object,instance) DO UPDATE SET status = 'success'", [query, `${this.dbName}`, this.outputName, global.runTimestamp]);
},
function () {
// will be called once when the program is exiting, should return true/false for whether the close was successful
this.logger.info(`Close called on dummy logger output ${this.name}`)
this.logger.info(`Close called on dummy logger output ${this.name}`);
return true;
},
async function () {
this.logger.info(`Retry called on dummy logger output ${this.name}`)
return true;
this.logger.info(`Retry called on dummy logger output ${this.name}`);
return 0;
}
);

View File

@ -30,6 +30,7 @@ const MastoOutput = new OutputInterface(
function (name, logger, options, globalOptions) {
this.name = name;
this.dbName = options.dbName || name;
this.logger = logger.child({ output: 'masto', name: name });
if(options?.logLevel) this.logger.level = options.logLevel;
@ -69,12 +70,8 @@ const MastoOutput = new OutputInterface(
return this;
},
async function (query, options) {
async function (query, db, options) {
if(this.fetched.has(query)) {
this.logger.debug(`Already fetched ${query} on ${this.name}`);
return true;
}
if(query.startsWith('@')) {
if(!this.usersEnabled) {
@ -88,6 +85,13 @@ const MastoOutput = new OutputInterface(
}
}
let dbResponse = await db.all("SELECT * FROM fetched WHERE object = ? and instance = ? and status = 'success'", [query, `${this.dbName}`]);
if(dbResponse.length > 0) {
this.logger.debug(`Already fetched ${query} on ${this.name}`);
return true;
}
let params = new URLSearchParams();
@ -96,24 +100,33 @@ const MastoOutput = new OutputInterface(
try {
await this.client.get('/api/v2/search' + `?${params.toString()}`)
this.logger.debug(`Fetched ${query} on ${this.name}`); // there's gonna be a LOT of that, so I'm making it debug
this.fetched.add(query);
await db.all("INSERT INTO fetched (object, status, instance, type, runTimestamp) VALUES (?, 'success', ?, ?, ?) ON CONFLICT(object,instance) DO UPDATE SET status = 'success'",
[query, `${this.dbName}`, this.outputName, global.runTimestamp]); // if successful
this.fetchedCount++;
if(this.fetchedCount % 20 == 0) this.logger.info(`Progress: ${this.fetchedCount} objects on ${this.name}`);
return true;
} catch (e) {
this.logger.warn(`Error fetching ${query} on ${this.name}; error: ${e}`);
this.errors.add(query);
await db.all("INSERT INTO fetched (object, status, instance, type, runTimestamp) VALUES (?, 'failed', ?, ?, ?) ON CONFLICT(object,instance) DO UPDATE SET status = 'failed'",
[query, `${this.dbName}`, this.outputName, global.runTimestamp]); // if unsuccessful
this.errorsCount++;
return false;
}
},
function () {
// No cleanup needed
logger.info(`Fetched ${this.fetchedCount} objects on ${this.name}, ${this.errorsCount} failed`)
return true;
},
async function () {
// will be called if error count > 0, should retry any failed fetches, return amount of failed fetches
// will be called before closing outputs, should retry any failed fetches, return amount of failed fetches
if(this.errorsCount == 0) return 0;

View File

@ -38,28 +38,33 @@ const TemplateOutput = new OutputInterface(
return this;
},
async function (query, options) {
async function (query, db, options) {
// will be called for each post/user, should return true/false for whether the write was successful (only use false for retryable/unexpected errors, like network errors)
if(this.fetched.has(query)) {
// maybe do logging
let dbResponse = await db.all("SELECT * FROM fetched WHERE object = ? and instance = ? and status = 'success'", [query, `${this.dbName}`]);
if(dbResponse.length > 0) {
// already fetched
return true;
}
// do the actual fetching here
this.fetched.add(query); // if successful
// this.errors.add(query); // if unsuccessful
await db.all("INSERT INTO fetched (object, status, instance, type, runTimestamp) VALUES (?, 'success', ?, ?, ?) ON CONFLICT(pk_obj_inst) DO UPDATE SET status = 'success'",
[query, `${this.dbName}`, this.outputName, global.runTimestamp]); // if successful
// await db.all("INSERT INTO fetched (object, status, instance, type, runTimestamp) VALUES (?, 'failed', ?, ?, ?)",
// [query, `${this.dbName}`, this.outputName, global.runTimestamp]); // if unsuccessful
},
async function () {
function () {
// will be called once when the program is exiting, should return true/false for whether the close was successful
return true;
},
async function () {
function () { // needs to be synchronous
// will be called if error count > 0, should retry any failed fetches, return value doesn't matter
return true;
return 0;
}
);

1621
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -3,8 +3,10 @@
"axios": "^1.4.0",
"axios-rate-limit": "^1.3.0",
"axios-retry": "^3.5.1",
"node-sqlite3": "github:a-was/node-sqlite3",
"pino": "^8.14.1",
"pino-pretty": "^10.0.0",
"sqlite3": "^5.1.6",
"yaml": "^2.3.1"
},
"name": "masto-backfill",