From d794e0b48fc9e98c0ccbefac410fb2bfc610af5a Mon Sep 17 00:00:00 2001 From: ansuz Date: Thu, 24 Sep 2020 19:02:22 +0530 Subject: [PATCH] include inactive accounts in the archival script --- config/config.example.js | 10 ++ lib/pins.js | 29 ++-- lib/storage/file.js | 21 +-- package-lock.json | 5 + package.json | 1 + scripts/evict-inactive.js | 269 ++++++++++++++++++++++++++++++++++---- 6 files changed, 290 insertions(+), 45 deletions(-) diff --git a/config/config.example.js b/config/config.example.js index 3826a7291..a71175a04 100644 --- a/config/config.example.js +++ b/config/config.example.js @@ -192,6 +192,16 @@ module.exports = { */ //archiveRetentionTime: 15, + /* XXX + * + * + * + * + * + */ + //accountRetentionTime: 365, + + /* Max Upload Size (bytes) * this sets the maximum size of any one file uploaded to the server. * anything larger than this size will be rejected diff --git a/lib/pins.js b/lib/pins.js index d840991a3..b0d524944 100644 --- a/lib/pins.js +++ b/lib/pins.js @@ -74,6 +74,14 @@ var createLineHandler = Pins.createLineHandler = function (ref, errorHandler) { }; }; + +var processPinFile = function (pinFile, fileName) { + var ref = {}; + var handler = createLineHandler(ref, fileName); + pinFile.split('\n').forEach(handler); + return ref; +}; + /* takes contents of a pinFile (UTF8 string) and the pin file's name @@ -82,10 +90,7 @@ var createLineHandler = Pins.createLineHandler = function (ref, errorHandler) { throw errors on pin logs with invalid pin data */ Pins.calculateFromLog = function (pinFile, fileName) { - var ref = {}; - var handler = createLineHandler(ref, fileName); - - pinFile.split('\n').forEach(handler); + var ref = processPinFile(pinFile, fileName); return Object.keys(ref.pins); }; @@ -207,6 +212,7 @@ Pins.load = function (cb, config) { var pinPath = config.pinPath || './pins'; var done = Util.once(cb); + var handler = config.handler; nThen((waitFor) => { // recurse over the configured pinPath, or the default @@ -240,16 +246,23 @@ Pins.load = function (cb, config) { }).nThen((waitFor) => { fileList.forEach((f) => { sema.take((returnAfter) => { - Fs.readFile(f, waitFor(returnAfter((err, content) => { + var next = waitFor(returnAfter()); + Fs.readFile(f, (err, content) => { if (err) { waitFor.abort(); return void done(err); } - const hashes = Pins.calculateFromLog(content.toString('utf8'), f); + var id = f.replace(/.*\/([^/]*).ndjson$/, (x, y)=>y); + var contentString = content.toString('utf8'); + if (handler) { + return void handler(processPinFile(contentString, f), id, next); + } + const hashes = Pins.calculateFromLog(contentString, f); hashes.forEach((x) => { - (pinned[x] = pinned[x] || {})[f.replace(/.*\/([^/]*).ndjson$/, (x, y)=>y)] = 1; + (pinned[x] = pinned[x] || {})[id] = 1; }); - }))); + next(); + }); }); }); }).nThen(() => { diff --git a/lib/storage/file.js b/lib/storage/file.js index c6506427d..599c7fecc 100644 --- a/lib/storage/file.js +++ b/lib/storage/file.js @@ -422,7 +422,7 @@ var removeArchivedChannel = function (env, channelName, cb) { }; // TODO use ../plan.js for a smaller memory footprint -var listChannels = function (root, handler, cb) { +var listChannels = function (root, handler, cb, fast) { // do twenty things at a time var sema = Semaphore.create(20); @@ -478,15 +478,20 @@ var listChannels = function (root, handler, cb) { metadataName = channelName.replace(/\.ndjson$/, '.metadata.ndjson'); } - var filePath = Path.join(nestedDirPath, channelName); - var metadataPath = Path.join(nestedDirPath, metadataName); var channel = metadataName.replace(/\.metadata.ndjson$/, ''); - if ([32, 34].indexOf(channel.length) === -1) { return; } + if ([32, 34, 44].indexOf(channel.length) === -1) { return; } // otherwise throw it on the pile sema.take(function (give) { var next = w(give()); + if (fast) { + return void handler(void 0, { channel: channel, }, next); + } + + var filePath = Path.join(nestedDirPath, channelName); + var metadataPath = Path.join(nestedDirPath, metadataName); + var metaStat, channelStat; var metaErr, channelErr; nThen(function (ww) { @@ -1186,11 +1191,11 @@ module.exports.create = function (conf, _cb) { }, // CHANNEL ITERATION - listChannels: function (handler, cb) { - listChannels(env.root, handler, cb); + listChannels: function (handler, cb, fastMode) { + listChannels(env.root, handler, cb, fastMode); }, - listArchivedChannels: function (handler, cb) { - listChannels(Path.join(env.archiveRoot, 'datastore'), handler, cb); + listArchivedChannels: function (handler, cb, fastMode) { + listChannels(Path.join(env.archiveRoot, 'datastore'), handler, cb, fastMode); }, getChannelSize: function (channelName, cb) { diff --git a/package-lock.json b/package-lock.json index cb59a4dac..ce127b9f9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4,6 +4,11 @@ "lockfileVersion": 1, "requires": true, "dependencies": { + "@mcrowe/minibloom": { + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/@mcrowe/minibloom/-/minibloom-0.2.0.tgz", + "integrity": "sha1-G+2WrsGDiBmNo3RDiZssP/WUgFM=" + }, "accepts": { "version": "1.3.7", "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.7.tgz", diff --git a/package.json b/package.json index 866058611..0f79bc8af 100644 --- a/package.json +++ b/package.json @@ -12,6 +12,7 @@ "url": "https://opencollective.com/cryptpad" }, "dependencies": { + "@mcrowe/minibloom": "^0.2.0", "chainpad-crypto": "^0.2.2", "chainpad-server": "^4.0.9", "express": "~4.16.0", diff --git a/scripts/evict-inactive.js b/scripts/evict-inactive.js index 4455d7c35..63d46917b 100644 --- a/scripts/evict-inactive.js +++ b/scripts/evict-inactive.js @@ -1,8 +1,11 @@ +/* global process */ + var nThen = require("nthen"); var Store = require("../lib/storage/file"); var BlobStore = require("../lib/storage/blob"); var Pins = require("../lib/pins"); +var Bloom = require("@mcrowe/minibloom"); var config = require("../lib/load-config"); // the administrator should have set an 'inactiveTime' in their config @@ -22,16 +25,42 @@ var getNewestTime = function (stats) { }; var store; -var pins; +var pinStore; var Log; var blobs; +/* It's fairly easy to know if a channel or blob is active + but knowing whether it is pinned requires that we + keep the set of pinned documents in memory. + + Some users will share the same set of documents in their pin lists, + so the representation of pinned documents should scale sub-linearly + with the number of users and pinned documents. + + That said, sub-linear isn't great... + A Bloom filter is "a space-efficient probabilistic data structure" + which lets us check whether an item is _probably_ or _definitely not_ + in a set. This is good enough for our purposes since we just want to + know whether something can safely be removed and false negatives + (not safe to remove when it actually is) are acceptable. + + We set our capacity to some large number, and the error rate to whatever + we think is acceptable. +*/ +var BLOOM_CAPACITY = (1 << 20) - 1; // over a million items +var BLOOM_ERROR = 1 / 1000; // an error rate of one in a thousand + +// we'll use one filter for the set of active documents +var activeDocs = Bloom.optimalFilter(BLOOM_CAPACITY, BLOOM_ERROR); +// and another one for the set of pinned documents +var pinnedDocs = Bloom. optimalFilter(BLOOM_CAPACITY, BLOOM_ERROR); + var startTime = +new Date(); var msSinceStart = function () { return (+new Date()) - startTime; }; -nThen(function (w) { +var loadStorage = function (w) { // load the store which will be used for iterating over channels // and performing operations like archival and deletion Store.create(config, w(function (err, _) { @@ -40,17 +69,17 @@ nThen(function (w) { throw err; } store = _; - })); // load the list of pinned files so you know which files - // should not be archived or deleted - Pins.load(w(function (err, _) { + })); + + Store.create({ + filePath: config.pinPath, + }, w(function (err, _) { if (err) { w.abort(); - return void console.error(err); + throw err; } - pins = _; - }), { - pinPath: config.pinPath, - }); + pinStore = _; + })); // load the logging module so that you have a record of which // files were archived or deleted at what time @@ -67,9 +96,9 @@ nThen(function (w) { } blobs = _; })); -}).nThen(function () { - Log.info("EVICT_TIME_TO_LOAD_PINS", msSinceStart()); -}).nThen(function (w) { +}; + +var removeArchivedChannels = function (w) { // this block will iterate over archived channels and removes them // if they've been in cold storage for longer than your configured archive time @@ -79,6 +108,7 @@ nThen(function (w) { // count the number of files which have been removed in this run var removed = 0; + var accounts = 0; var handler = function (err, item, cb) { if (err) { @@ -102,7 +132,13 @@ nThen(function (w) { return void cb(); } Log.info('EVICT_ARCHIVED_CHANNEL_REMOVAL', item.channel); - removed++; + + if (item.channel.length === 32) { + removed++; + } else if (item.channel.length === 44) { + accounts++; + } + cb(); })); }; @@ -115,10 +151,13 @@ nThen(function (w) { return Log.error('EVICT_ARCHIVED_FINAL_ERROR', err); } Log.info('EVICT_ARCHIVED_CHANNELS_REMOVED', removed); + Log.info('EVICT_ARCHIVED_ACCOUNTS_REMOVED', accounts); }; store.listArchivedChannels(handler, w(done)); -}).nThen(function (w) { +}; + +var removeArchivedBlobProofs = function (w) { if (typeof(config.archiveRetentionTime) !== "number") { return; } // Iterate over archive blob ownership proofs and remove them // if they are older than the specified retention time @@ -128,7 +167,6 @@ nThen(function (w) { Log.error("EVICT_BLOB_LIST_ARCHIVED_PROOF_ERROR", err); return void next(); } - if (pins[item.blobId]) { return void next(); } if (item && getNewestTime(item) > retentionTime) { return void next(); } blobs.remove.archived.proof(item.safeKey, item.blobId, (function (err) { if (err) { @@ -142,7 +180,9 @@ nThen(function (w) { }, w(function () { Log.info('EVICT_ARCHIVED_BLOB_PROOFS_REMOVED', removed); })); -}).nThen(function (w) { +}; + +var removeArchivedBlobs = function (w) { if (typeof(config.archiveRetentionTime) !== "number") { return; } // Iterate over archived blobs and remove them // if they are older than the specified retention time @@ -152,7 +192,6 @@ nThen(function (w) { Log.error("EVICT_BLOB_LIST_ARCHIVED_BLOBS_ERROR", err); return void next(); } - if (pins[item.blobId]) { return void next(); } if (item && getNewestTime(item) > retentionTime) { return void next(); } blobs.remove.archived.blob(item.blobId, function (err) { if (err) { @@ -166,7 +205,149 @@ nThen(function (w) { }, w(function () { Log.info('EVICT_ARCHIVED_BLOBS_REMOVED', removed); })); -}).nThen(function (w) { +}; + +var categorizeChannelsByActivity = function (w) { + var channels = 0; + var active = 0; + var handler = function (err, item, cb) { + channels++; + if (err) { + Log.error('EVICT_CHANNEL_CATEGORIZATION', err); + return void cb(); + } + + // if the channel has been modified recently + // we don't use mtime because we don't want to count access to the file, just modifications + if (+new Date(item.mtime) > inactiveTime) { + // add it to the set of activeDocs + activeDocs.add(item.channel); + active++; + return void cb(); + } + + return void cb(); + }; + + var done = function () { + Log.info('EVICT_CHANNELS_CATEGORIZED', { + active: active, + channels: channels, + }); + }; + + store.listChannels(handler, w(done)); +}; + +var categorizeBlobsByActivity = function (w) { + var n_blobs = 0; + var active = 0; + + blobs.list.blobs(function (err, item, next) { + n_blobs++; + if (err) { + Log.error("EVICT_BLOB_CATEGORIZATION", err); + return void next(); + } + if (!item) { + next(); + return void Log.error("EVICT_BLOB_CATEGORIZATION_INVALID", item); + } + if (getNewestTime(item) > inactiveTime) { + activeDocs.add(item.blobId); + active++; + return void next(); + } + next(); + }, w(function () { + Log.info('EVICT_BLOBS_CATEGORIZED', { + active: active, + blobs: n_blobs, + }); + })); +}; + +var categorizeAccountsByActivity = function (w) { +// iterate over all accounts + var accounts = 0; + var inactive = 0; + + var accountRetentionTime; + + if (typeof(config.accountRetentionTime) === 'number' && config.accountRetentionTime > 0) { + accountRetentionTime = +new Date() - (24 * 3600 * 1000 * config.accountRetentionTime); + } else { + accountRetentionTime = -1; + } + + var pinAll = function (pinList) { + pinList.forEach(function (docId) { + pinnedDocs.add(docId); + }); + }; + + var handler; + + if (accountRetentionTime < 0) { + // this means we'll retain all accounts + // so the pin log handler can be very simple + handler = function (content, id, next) { + pinAll(Object.keys(content.pins)); + next(); + }; + } else { + // otherwise, we'll only retain data from active accounts + // so we need more heuristics + handler = function (content, id, next) { + accounts++; + //console.log(content, id); + + var mtime = content.latest; + + var pinList = Object.keys(content.pins); + // if their pin log has changed recently then consider them active + if (mtime && mtime > accountRetentionTime) { + // the account is active + pinAll(pinList); + return void next(); + } + + // otherwise iterate over their pinned documents until you find one that has been active + if (Object.keys(content.pins).some(function (docId) { + return !activeDocs.test(docId); + })) { + // add active accounts' pinned documents to a second bloom filter + pinAll(pinList); + return void next(); + } + + // if none are active then archive the pin log + pinStore.archiveChannel(id, function (err) { + console.log(inactive++); + if (err) { + Log.error('EVICT_INACTIVE_ACCOUNT_PIN_LOG', err); + return void next(); + } + Log.info('EVICT_INACTIVE_ACCOUNT_LOG', id); + next(); + }); + }; + } + + var done = function () { + Log.info('EVICT_INACTIVE_ACCOUNTS', { + accounts: accounts, + inactive: inactive, + }); + }; + + Pins.load(w(done), { + pinPath: config.pinPath, + handler: handler, + }); +}; + +var archiveInactiveBlobs = function (w) { // iterate over blobs and remove them // if they have not been accessed within the specified retention time var removed = 0; @@ -179,8 +360,8 @@ nThen(function (w) { next(); return void Log.error('EVICT_BLOB_LIST_BLOBS_NO_ITEM', item); } - if (pins[item.blobId]) { return void next(); } - if (getNewestTime(item) > inactiveTime) { return void next(); } + if (pinnedDocs.test(item.blobId)) { return void next(); } + if (activeDocs.test(item.blobId)) { return void next(); } blobs.archive.blob(item.blobId, function (err) { if (err) { @@ -199,7 +380,9 @@ nThen(function (w) { }, w(function () { Log.info('EVICT_BLOBS_REMOVED', removed); })); -}).nThen(function (w) { +}; + +var archiveInactiveBlobProofs = function (w) { // iterate over blob proofs and remove them // if they don't correspond to a pinned or active file var removed = 0; @@ -212,7 +395,7 @@ nThen(function (w) { next(); return void Log.error('EVICT_BLOB_LIST_PROOFS_NO_ITEM', item); } - if (pins[item.blobId]) { return void next(); } + if (pinnedDocs.test(item.blobId)) { return void next(); } if (getNewestTime(item) > inactiveTime) { return void next(); } nThen(function (w) { blobs.size(item.blobId, w(function (err, size) { @@ -239,7 +422,9 @@ nThen(function (w) { }, w(function () { Log.info("EVICT_BLOB_PROOFS_REMOVED", removed); })); -}).nThen(function (w) { +}; + +var archiveInactiveChannels = function (w) { var channels = 0; var archived = 0; @@ -265,11 +450,11 @@ nThen(function (w) { })); } - // bail out if the channel was modified recently - if (+new Date(item.mtime) > inactiveTime) { return void cb(); } + // bail out if the channel is in the set of activeDocs + if (activeDocs.test(item.channel)) { return void cb(); } // ignore the channel if it's pinned - if (pins[item.channel]) { return void cb(); } + if (pinnedDocs.test(item.channel)) { return void cb(); } return void store.archiveChannel(item.channel, w(function (err) { if (err) { @@ -289,12 +474,38 @@ nThen(function (w) { return void Log.info('EVICT_CHANNELS_ARCHIVED', archived); }; - store.listChannels(handler, w(done)); -}).nThen(function () { + store.listChannels(handler, w(done), true); // using a hacky "fast mode" since we only need the channel id +}; + +nThen(loadStorage) +.nThen(function () { + Log.info("EVICT_TIME_TO_LOAD_PINS", msSinceStart()); +}) +.nThen(removeArchivedChannels) +.nThen(removeArchivedBlobProofs) +.nThen(removeArchivedBlobs) + +// iterate over all documents and add them to a bloom filter if they have been active +.nThen(categorizeChannelsByActivity) +.nThen(categorizeBlobsByActivity) + +// iterate over all accounts and add them to a bloom filter if they are active +.nThen(categorizeAccountsByActivity) + +// iterate again and archive inactive unpinned documents + // (documents which are not in either bloom filter) + +.nThen(archiveInactiveBlobs) +.nThen(archiveInactiveBlobProofs) +.nThen(archiveInactiveChannels) +.nThen(function () { Log.info("EVICT_TIME_TO_RUN_SCRIPT", msSinceStart()); }).nThen(function () { // the store will keep this script running if you don't shut it down store.shutdown(); Log.shutdown(); + pinStore.shutdown(); + process.exit(); + });