first version of admin 'database' tab

This commit is contained in:
ansuz 2022-08-11 11:53:03 +05:30
parent 673420e3fe
commit 60e58e8f7a
8 changed files with 1436 additions and 50 deletions

View File

@ -6,6 +6,9 @@ const Util = require("../common-util");
const Ulimit = require("ulimit");
const Decrees = require("../decrees");
const Pinning = require("./pin-rpc");
const Core = require("./core");
const Channel = require("./channel");
const BlockStore = require("../storage/block");
var Fs = require("fs");
@ -170,13 +173,15 @@ var archiveDocument = function (Env, Server, cb, data) {
switch (id.length) {
case 32:
// TODO disconnect users from active sessions
return void Env.msgStore.archiveChannel(id, Util.both(cb, function (err) {
Env.Log.info("ARCHIVAL_CHANNEL_BY_ADMIN_RPC", {
channelId: id,
reason: reason,
status: err? String(err): "SUCCESS",
});
Channel.disconnectChannelMembers(Env, Server, id, 'EDELETED', err => {
if (err) { } // XXX
});
}));
case 48:
return void Env.blobStore.archive.blob(id, Util.both(cb, function (err) {
@ -368,11 +373,275 @@ var getLimits = function (Env, Server, cb) {
cb(void 0, Env.limits);
};
var isValidKey = key => {
return typeof(key) === 'string' && key.length === 44;
};
// CryptPad_AsyncStore.rpc.send('ADMIN', ['GET_USER_TOTAL_SIZE', "CrufexqXcY/z+eKJlEbNELVy5Sb7E/EAAEFI8GnEtZ0="], console.log)
var getUserTotalSize = function (Env, Server, cb, data) {
var signingKey = Array.isArray(data) && data[1];
if (typeof(signingKey) !== 'string' || signingKey.length < 44) { return void cb("EINVAL"); } // FIXME use a standard check for this
Pinning.getTotalSize(Env, signingKey, cb);
if (!isValidKey(signingKey)) { return void cb("EINVAL"); }
Pinning.getTotalSize(Env, signingKey, cb); // XXX frequently incorrect...
};
var getPinActivity = function (Env, Server, cb, data) {
var signingKey = Array.isArray(data) && data[1];
if (!isValidKey(signingKey)) { return void cb("EINVAL"); }
Env.getPinActivity(signingKey, cb);
};
var isUserOnline = function (Env, Server, cb, data) {
var key = Array.isArray(data) && data[1];
if (!isValidKey(key)) { return void cb("EINVAL"); }
key = Util.unescapeKeyCharacters(key);
var online = false;
try {
Object.keys(Env.netfluxUsers).some(function (netfluxId) {
if (!Env.netfluxUsers[netfluxId][key]) { return; }
online = true;
return true;
});
} catch (err) {
Env.Log.error('ADMIN_USER_ONLINE_CHECK', {
error: err,
key: key,
});
return void cb("SERVER_ERROR");
}
cb(void 0, online);
};
var getPinLogStatus = function (Env, Server, cb, data) {
var key = Array.isArray(data) && data[1];
if (!isValidKey(key)) { return void cb("EINVAL"); }
var safeKey = Util.escapeKeyCharacters(key);
var response = {};
nThen(function (w) {
Env.pinStore.isChannelAvailable(safeKey, w(function (err, result) {
if (err) {
return void Env.Log.error('PIN_LOG_STATUS_AVAILABLE', err);
}
response.live = result;
}));
Env.pinStore.isChannelArchived(safeKey, w(function (err, result) {
if (err) {
return void Env.Log.error('PIN_LOG_STATUS_ARCHIVED', err);
}
response.archived = result;
}));
}).nThen(function () {
cb(void 0, response);
});
};
var getDocumentStatus = function (Env, Server, cb, data) {
var id = Array.isArray(data) && data[1];
if (typeof(id) !== 'string') { return void cb("EINVAL"); }
var response = {};
if (id.length === 44) {
return void nThen(function (w) {
BlockStore.isAvailable(Env, id, w(function (err, result) {
if (err) {
return void Env.Log.error('BLOCK_STATUS_AVAILABLE', err);
}
response.live = result;
}));
BlockStore.isArchived(Env, id, w(function (err, result) {
if (err) {
return void Env.Log.error('BLOCK_STATUS_ARCHIVED', err);
}
response.archived = result;
}));
}).nThen(function () {
cb(void 0, response);
});
}
if (id.length === 48) {
return void nThen(function (w) {
Env.blobStore.isBlobAvailable(id, w(function (err, result) {
if (err) {
return void Env.Log.error('BLOB_STATUS_AVAILABLE', err);
}
response.live = result;
}));
Env.blobStore.isBlobArchived(id, w(function (err, result) {
if (err) {
return void Env.Log.error('BLOB_STATUS_ARCHIVED', err);
}
response.archived = result;
}));
}).nThen(function () {
cb(void 0, response);
});
}
if (id.length !== 32) { return void cb("EINVAL"); }
nThen(function (w) {
Env.store.isChannelAvailable(id, w(function (err, result) {
if (err) {
return void Env.Log.error('CHANNEL_STATUS_AVAILABLE', err);
}
response.live = result;
}));
Env.store.isChannelArchived(id, w(function (err, result) {
if (err) {
return void Env.Log.error('CHANNEL_STATUS_ARCHIVED', err);
}
response.archived = result;
}));
}).nThen(function () {
cb(void 0, response);
});
};
var getPinList = function (Env, Server, cb, data) {
var key = Array.isArray(data) && data[1];
if (!isValidKey(key)) { return void cb("EINVAL"); }
var safeKey = Util.escapeKeyCharacters(key);
Env.getPinState(safeKey, function (err, value) {
if (err) { return void cb(err); }
try {
return void cb(void 0, Object.keys(value).filter(k => value[k]));
} catch (err2) { }
cb("UNEXPECTED_SERVER_ERROR");
});
};
var getPinHistory = function (Env, Server, cb, data) {
Env.Log.debug('GET_PIN_HISTORY', data);
cb("NOT_IMPLEMENTED");
};
var archivePinLog = function (Env, Server, cb, data) {
var key = Array.isArray(data) && data[1];
if (!isValidKey(key)) { return void cb("EINVAL"); }
var safeKey = Util.escapeKeyCharacters(key);
Env.pinStore.archiveChannel(safeKey, function (err) {
if (err) {
Env.Log.error('ARCHIVE_PIN_LOG_BY_ADMIN', {
error: err,
safeKey: safeKey,
});
} else {
Env.Log.info('ARCHIVE_PIN_LOG_BY_ADMIN', {
safeKey: safeKey,
});
}
cb(err);
});
};
var archiveBlock = function (Env, Server, cb, data) {
var args = Array.isArray(data) && data[1];
if (!args) { return void cb("INVALID_ARGS"); }
var key = args.key;
var reason = args.reason;
if (!isValidKey(key)) { return void cb("EINVAL"); }
BlockStore.archive(Env, key, err => {
Env.Log.debug("ARCHIVE_BLOCK_BY_ADMIN", { // XXX
error: err,
key: key,
reason: reason || '',
});
cb(err);
});
};
var restoreArchivedBlock = function (Env, Server, cb, data) {
var args = Array.isArray(data) && data[1];
if (!args) { return void cb("INVALID_ARGS"); }
var key = args.key;
var reason = args.reason;
if (!isValidKey(key)) { return void cb("EINVAL"); }
BlockStore.restore(Env, key, err => {
Env.Log.debug("RESTORE_ARCHIVED_BLOCK_BY_ADMIN", { // XXX
error: err,
key: key,
reason: reason || '',
});
cb(err);
});
};
var restoreArchivedPinLog = function (Env, Server, cb, data) {
var key = Array.isArray(data) && data[1];
if (!isValidKey(key)) { return void cb("EINVAL"); }
var safeKey = Util.escapeKeyCharacters(key);
Env.pinStore.restoreArchivedChannel(safeKey, function (err) {
if (err) {
Env.Log.error("RESTORE_ARCHIVED_PIN_LOG_BY_ADMIN", {
error: err,
safeKey: safeKey,
});
} else {
Env.Log.info('RESTORE_ARCHIVED_PIN_LOG_BY_ADMIN', {
safeKey: safeKey,
});
}
cb(err);
});
};
var archiveOwnedDocuments = function (Env, Server, cb, data) {
Env.Log.debug('ARCHIVE_OWNED_DOCUMENTS', data);
cb("NOT_IMPLEMENTED");
};
// quotas...
var getUserQuota = function (Env, Server, cb, data) {
var key = Array.isArray(data) && data[1];
if (!isValidKey(key)) { return void cb("EINVAL"); }
Pinning.getLimit(Env, key, cb);
};
var getUserStorageStats = function (Env, Server, cb, data) {
var key = Array.isArray(data) && data[1];
if (!isValidKey(key)) { return void cb("EINVAL"); }
var safeKey = Util.escapeKeyCharacters(key);
Env.getPinState(safeKey, function (err, value) {
if (err) { return void cb(err); }
try {
var res = {
channels: 0,
files: 0,
};
Object.keys(value).forEach(k => {
switch (k.length) {
case 32: return void ((res.channels++));
case 48: return void ((res.files++));
}
});
return void cb(void 0, res);
} catch (err2) { }
cb("UNEXPECTED_SERVER_ERROR");
});
};
var getStoredMetadata = function (Env, Server, cb, data) {
var id = Array.isArray(data) && data[1];
if (!Core.isValidId(id)) { return void cb('INVALID_CHAN'); }
Env.computeMetadata(id, function (err, data) {
cb(err, data);
});
};
var getDocumentSize = function (Env, Server, cb, data) {
var id = Array.isArray(data) && data[1];
if (!Core.isValidId(id)) { return void cb('INVALID_CHAN'); }
Env.getFileSize(id, (err, size) => {
if (err) { return void cb(err); }
cb(err, size);
});
};
var getLastChannelTime = function (Env, Server, cb, data) {
var id = Array.isArray(data) && data[1];
if (!Core.isValidId(id)) { return void cb('INVALID_CHAN'); }
Env.getLastChannelTime(id, function (err, time) {
cb(err, time);
});
};
var commands = {
@ -386,6 +655,26 @@ var commands = {
GET_FILE_DESCRIPTOR_LIMIT: getFileDescriptorLimit,
GET_CACHE_STATS: getCacheStats,
GET_PIN_ACTIVITY: getPinActivity,
IS_USER_ONLINE: isUserOnline,
GET_USER_QUOTA: getUserQuota,
GET_USER_STORAGE_STATS: getUserStorageStats,
GET_PIN_LOG_STATUS: getPinLogStatus,
GET_STORED_METADATA: getStoredMetadata,
GET_DOCUMENT_SIZE: getDocumentSize,
GET_LAST_CHANNEL_TIME: getLastChannelTime,
GET_DOCUMENT_STATUS: getDocumentStatus,
GET_PIN_LIST: getPinList,
GET_PIN_HISTORY: getPinHistory,
ARCHIVE_PIN_LOG: archivePinLog,
ARCHIVE_OWNED_DOCUMENTS: archiveOwnedDocuments,
RESTORE_ARCHIVED_PIN_LOG: restoreArchivedPinLog,
ARCHIVE_BLOCK: archiveBlock,
RESTORE_ARCHIVED_BLOCK: restoreArchivedBlock,
ARCHIVE_DOCUMENT: archiveDocument,
RESTORE_ARCHIVED_DOCUMENT: restoreArchivedDocument,

View File

@ -7,6 +7,57 @@ const Core = require("./core");
const Metadata = require("./metadata");
const HK = require("../hk-util");
Channel.disconnectChannelMembers = function (Env, Server, channelId, code, cb) {
var done = Util.once(Util.mkAsync(cb));
if (!Core.isValidId(channelId)) { return done('INVALID_ID'); }
// XXX is valid channel?
const channel_cache = Env.channel_cache;
const metadata_cache = Env.metadata_cache;
const clear = function () {
delete channel_cache[channelId];
Server.clearChannel(channelId);
delete metadata_cache[channelId];
};
// an owner of a channel deleted it
nThen(function (w) {
// close the channel in the store
Env.msgStore.closeChannel(channelId, w());
}).nThen(function (w) {
// Server.channelBroadcast would be better
// but we can't trust it to track even one callback,
// let alone many in parallel.
// so we simulate it on this side to avoid race conditions
Server.getChannelUserList(channelId).forEach(function (userId) {
Server.send(userId, [
0,
Env.historyKeeper.id,
"MSG",
userId,
JSON.stringify({
error: code, //'EDELETED',
channel: channelId,
})
], w());
});
}).nThen(function () {
// clear the channel's data from memory
// once you've sent everyone a notice that the channel has been deleted
clear();
done();
}).orTimeout(function () {
Env.Log.warn('DISCONNECT_CHANNEL_MEMBERS_TIMEOUT', {
channelId,
code,
});
clear();
done();
}, 30000);
};
Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb, Server) {
if (typeof(channelId) !== 'string' || channelId.length !== 32) {
return cb('INVALID_ARGUMENTS');
@ -90,44 +141,11 @@ var archiveOwnedChannel = function (Env, safeKey, channelId, __cb, Server) {
}
cb(void 0, 'OK');
const channel_cache = Env.channel_cache;
const metadata_cache = Env.metadata_cache;
const clear = function () {
delete channel_cache[channelId];
Server.clearChannel(channelId);
delete metadata_cache[channelId];
};
// an owner of a channel deleted it
nThen(function (w) {
// close the channel in the store
Env.msgStore.closeChannel(channelId, w());
}).nThen(function (w) {
// Server.channelBroadcast would be better
// but we can't trust it to track even one callback,
// let alone many in parallel.
// so we simulate it on this side to avoid race conditions
Server.getChannelUserList(channelId).forEach(function (userId) {
Server.send(userId, [
0,
Env.historyKeeper.id,
"MSG",
userId,
JSON.stringify({
error: 'EDELETED',
channel: channelId,
})
], w());
});
}).nThen(function () {
// clear the channel's data from memory
// once you've sent everyone a notice that the channel has been deleted
clear();
}).orTimeout(function () {
Env.Log.warn('ON_CHANNEL_DELETED_TIMEOUT', channelId);
clear();
}, 30000);
Channel.disconnectChannelMembers(Env, Server, channelId, 'EDELETED', err => {
if (err) {
// XXX
}
});
});
});
};

View File

@ -585,6 +585,19 @@ BlobStore.create = function (config, _cb) {
},
},
isBlobAvailable: function (blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
var path = makeBlobPath(Env, blobId);
isFile(path, cb);
},
isBlobArchived: function (blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
var path = prependArchive(Env, makeBlobPath(Env, blobId));
isFile(path, cb);
},
closeBlobstage: function (safeKey) {
closeBlobstage(Env, safeKey);
},

View File

@ -55,6 +55,61 @@ Block.archive = function (Env, publicKey, _cb) {
}, cb);
};
Block.restore = function (Env, publicKey, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
// derive the filepath
var livePath = Block.mkPath(Env, publicKey);
// make sure the path is valid
if (typeof(livePath) !== 'string') {
return void cb('E_INVALID_BLOCK_PATH');
}
var archivePath = Block.mkArchivePath(Env, publicKey);
// make sure the path is valid
if (typeof(archivePath) !== 'string') {
return void cb('E_INVALID_BLOCK_ARCHIVAL_PATH');
}
// TODO Env.incrementBytesWritten
Fse.move(archivePath, livePath, {
//overwrite: true,
}, cb);
};
var isValidKey = function (publicKey) {
return typeof(publicKey) === 'string' && publicKey.length === 44;
};
var exists = function (path, cb) {
Fs.stat(path, function (err, stat) {
if (err) {
if (err.code === 'ENOENT') {
return void cb(void 0, false);
}
return void cb(err);
}
if (!stat.isFile()) { return void cb('E_NOT_FILE'); }
return void cb(void 0, true);
});
};
var checkPath = function (Env, publicKey, pathFunction, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidKey(publicKey)) { return void cb("INVALID_ARGS"); }
var path = pathFunction(Env, publicKey);
exists(path, cb);
};
Block.isAvailable = function (Env, publicKey, _cb) {
checkPath(Env, publicKey, Block.mkPath, _cb);
};
Block.isArchived = function (Env, publicKey, _cb) {
checkPath(Env, publicKey, Block.mkArchivePath, _cb);
};
Block.check = function (Env, publicKey, _cb) { // 'check' because 'exists' implies boolean
var cb = Util.once(Util.mkAsync(_cb));
var path = Block.mkPath(Env, publicKey);

View File

@ -591,6 +591,43 @@ const completeUpload = function (data, cb) {
});
};
const getPinActivity = function (data, cb) {
if (!data) { return void cb("INVALID_ARGS"); }
var first;
var latest;
pinStore.getMessages(data.key, line => {
if (!line || !line.trim()) { return; }
var parsed;
try {
parsed = JSON.parse(line);
latest = parsed[parsed.length - 1];
if (first) { return; }
first = latest;
} catch (err) { }
}, function (err) {
if (err) { return void cb(err); }
cb(void 0, {
first: first,
latest: latest,
});
});
};
const getLastChannelTime = function (data, cb) {
if (!data) { return void cb("INVALID_ARGS"); }
var latest;
store.getMessages(data.channel, function (line) {
var parsed;
try {
parsed = JSON.parse(line);
latest = parsed[parsed.length - 1];
} catch (err) { }
}, function (err) {
if (err) { return void cb(err); }
cb(void 0, latest);
});
};
const COMMANDS = {
COMPUTE_INDEX: computeIndex,
COMPUTE_METADATA: computeMetadata,
@ -606,6 +643,8 @@ const COMMANDS = {
WRITE_TASK: writeTask,
EVICT_INACTIVE: evictInactive,
COMPLETE_UPLOAD: completeUpload,
GET_PIN_ACTIVITY: getPinActivity,
GET_LAST_CHANNEL_TIME: getLastChannelTime,
};
COMMANDS.INLINE = function (data, cb) {

View File

@ -340,10 +340,26 @@ Workers.initialize = function (Env, config, _cb) {
});
};
Env.getPinActivity = function (safeKey, cb) {
Env.pinStore.getWeakLock(safeKey, function (next) {
sendCommand({
key: safeKey,
command: 'GET_PIN_ACTIVITY',
}, Util.both(next, cb));
});
};
Env.getLastChannelTime = function (channel, cb) {
sendCommand({
command: 'GET_LAST_CHANNEL_TIME',
channel: channel,
}, cb);
};
Env.getFileSize = function (channel, cb) {
sendCommand({
command: 'GET_FILE_SIZE',
channel: channel,
channel: channel,
}, cb);
};

View File

@ -327,5 +327,17 @@
.cp-charts-row.heading {
font-weight: bold;
}
table.cp-account-stats,
table.cp-block-stats,
table.cp-document-stats {
@color: #777;
border: 1px solid @color;
margin: 15px;
td {
max-width: 60vw; // XXX
border: 1px solid @color;
padding: 5px;
}
}
}

File diff suppressed because it is too large Load Diff