You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Revolt_docker/migrations/20240929-autumn-rewrite---p...

664 lines
16 KiB
JavaScript

// THIS FILE IS TAILORED TO REVOLT PRODUCTION
// MIGRATING FROM A BACKUP & EXISTING CDN NODE
// INTO BACKBLAZE B2
//
// THIS IS ONLY INCLUDED FOR REFERENCE PURPOSES
// NODE_EXTRA_CA_CERTS=~/projects/revolt-admin-panel/revolt.crt node index.mjs
// NODE_EXTRA_CA_CERTS=/cwd/revolt.crt node /cwd/index.mjs
import { readdir, readFile, writeFile } from "node:fs/promises";
import { createCipheriv, createHash, randomBytes } from "node:crypto";
import { resolve } from "node:path";
import { MongoClient } from "mongodb";
import { config } from "dotenv";
import assert from "node:assert";
import bfj from "bfj";
config();
config({ path: "/cwd/.env" });
import BackBlazeB2 from "backblaze-b2";
import axiosRetry from "axios-retry";
import { decodeTime } from "ulid";
// .env:
// ENCRYPTION_KEY=
// MONGODB=
// B2_APP_KEYID=
// B2_APP_KEY=
/**
* @type {string | null}
*/
const USE_CACHE = "/cwd/cache.json";
let processed_ids = new Set();
async function dumpCache() {
if (USE_CACHE) await bfj.write(USE_CACHE, [...processed_ids]);
}
if (USE_CACHE) {
try {
processed_ids = new Set(await bfj.read(USE_CACHE));
} catch (err) {
console.error(err);
}
}
const b2 = new BackBlazeB2({
applicationKeyId: process.env.B2_APP_KEYID,
applicationKey: process.env.B2_APP_KEY,
retry: {
retryDelay: axiosRetry.exponentialDelay,
},
});
await b2.authorize();
//const encKey = Buffer.from(randomBytes(32), "utf8");
//console.info(encKey.toString("base64"));
const encKey = Buffer.from(process.env.ENCRYPTION_KEY, "base64");
const mongo = new MongoClient(process.env.MONGODB);
await mongo.connect();
// TODO: set all existing files to current timestamp
const dirs = [
// "banners",
// "emojis", // TODO: timestamps
// "avatars",
// "backgrounds",
// "icons",
"attachments", // https://stackoverflow.com/a/18777877
];
async function encryptFile(data) {
const iv = Buffer.from(randomBytes(12), "utf8");
const cipher = createCipheriv("aes-256-gcm", encKey, iv);
let enc = cipher.update(data, "utf8", "base64");
enc += cipher.final("base64");
// enc += cipher.getAuthTag();
enc = Buffer.from(enc, "base64");
return {
iv,
data: Buffer.concat([enc, cipher.getAuthTag()]),
};
}
const cache = {};
const objectLookup = {};
/**
* aaa
*/
async function determineUploaderIdAndUse(f, v, i) {
if (f.tag === "attachments" && v === "attachments") {
if (typeof f.message_id !== "string") {
console.warn(i, "No message id specified.");
return null;
}
if (!objectLookup[f.message_id]) {
objectLookup[f.message_id] = await mongo
.db("revolt")
.collection("messages")
.findOne({
_id: f.message_id,
});
}
if (!objectLookup[f.message_id]) {
console.warn(i, "Message", f.message_id, "doesn't exist anymore!");
return null;
}
return {
uploaded_at: new Date(decodeTime(f.message_id)),
uploader_id: objectLookup[f.message_id].author,
used_for: {
type: "message",
id: f.message_id,
},
};
} else if (f.tag === "banners" && v === "banners") {
if (typeof f.server_id !== "string") {
console.warn(i, "No server id specified.");
return null;
}
if (!objectLookup[f.server_id]) {
objectLookup[f.server_id] = await mongo
.db("revolt")
.collection("servers")
.findOne({
_id: f.server_id,
});
}
if (!objectLookup[f.server_id]) {
console.warn(i, "Server", f.server_id, "doesn't exist anymore!");
return null;
}
return {
uploaded_at: new Date(),
uploader_id: objectLookup[f.server_id].owner,
used_for: {
type: "serverBanner",
id: f.server_id,
},
};
} else if (f.tag === "emojis" && v === "emojis") {
if (typeof f.object_id !== "string") {
return null;
}
if (!objectLookup[f.object_id]) {
objectLookup[f.object_id] = await mongo
.db("revolt")
.collection("emojis")
.findOne({
_id: f.object_id,
});
}
if (!objectLookup[f.object_id]) {
console.warn(i, "Emoji", f.object_id, "doesn't exist anymore!");
return null;
}
return {
uploaded_at: new Date(decodeTime(f.object_id)),
uploader_id: objectLookup[f.object_id].creator_id,
used_for: {
type: "emoji",
id: f.object_id,
},
};
} else if (f.tag === "avatars" && v === "avatars") {
if (typeof f.user_id !== "string") {
return null;
}
if (!objectLookup[f.user_id]) {
objectLookup[f.user_id] = await mongo
.db("revolt")
.collection("users")
.findOne({
_id: f.user_id,
});
}
if (!objectLookup[f.user_id]) {
console.warn(i, "User", f.user_id, "doesn't exist anymore!");
return null;
}
if (objectLookup[f.user_id].avatar?._id !== f._id) {
console.warn(
i,
"Attachment no longer in use.",
f._id,
"for",
f.user_id,
"current:",
objectLookup[f.user_id].avatar?._id
);
return null;
}
return {
uploaded_at: new Date(),
uploader_id: f.user_id,
used_for: {
type: "userAvatar",
id: f.user_id,
},
};
} else if (f.tag === "backgrounds" && v === "backgrounds") {
if (typeof f.user_id !== "string") {
return null;
}
if (!objectLookup[f.user_id]) {
objectLookup[f.user_id] = await mongo
.db("revolt")
.collection("users")
.findOne({
_id: f.user_id,
});
}
if (!objectLookup[f.user_id]) {
console.warn(i, "User", f.user_id, "doesn't exist anymore!");
return null;
}
if (objectLookup[f.user_id].profile?.background?._id !== f._id) {
console.warn(
i,
"Attachment no longer in use.",
f._id,
"for",
f.user_id,
"current:",
objectLookup[f.user_id].profile?.background?._id
);
return null;
}
return {
uploaded_at: new Date(),
uploader_id: f.user_id,
used_for: {
type: "userProfileBackground",
id: f.user_id,
},
};
} else if (f.tag === "icons" && v === "icons") {
if (typeof f.object_id !== "string") {
return null;
}
// some bugged files at start
// ... expensive to compute at worst case =(
// so instead we can just disable it until everything is processed
// then re-run on these!
if (false) {
objectLookup[f.object_id] = await mongo
.db("revolt")
.collection("users")
.findOne({
_id: f.object_id,
});
if (!objectLookup[f.object_id]) {
console.warn(i, "No legacy match!");
return null;
}
return {
uploaded_at: new Date(),
uploader_id: f.object_id,
used_for: {
type: "legacyGroupIcon",
id: f.object_id,
},
};
}
if (!objectLookup[f.object_id]) {
objectLookup[f.object_id] = await mongo
.db("revolt")
.collection("servers")
.findOne({
_id: f.object_id,
});
}
if (
!objectLookup[f.object_id] ||
// heuristic for not server
!objectLookup[f.object_id].channels
) {
console.warn(i, "Server", f.object_id, "doesn't exist!");
if (!objectLookup[f.object_id]) {
objectLookup[f.object_id] = await mongo
.db("revolt")
.collection("channels")
.findOne({
_id: f.object_id,
});
}
if (!objectLookup[f.object_id]) {
console.warn(i, "Channel", f.object_id, "doesn't exist!");
return null;
}
let server;
const serverId = objectLookup[f.object_id].server;
if (serverId) {
server = objectLookup[serverId];
if (!server) {
server = await mongo.db("revolt").collection("servers").findOne({
_id: serverId,
});
console.info(
i,
"Couldn't find matching server for channel " + f.object_id + "!"
);
if (!server) return null;
objectLookup[serverId] = server;
}
}
return {
uploaded_at: new Date(),
uploader_id: (server ?? objectLookup[f.object_id]).owner,
used_for: {
type: "channelIcon",
id: f.object_id,
},
};
}
return {
uploaded_at: new Date(),
uploader_id: objectLookup[f.object_id].owner,
used_for: {
type: "serverIcon",
id: f.object_id,
},
};
} else {
throw (
"couldn't find uploader id for " +
f._id +
" expected " +
v +
" but got " +
f.tag
);
}
}
const workerCount = 8;
let workingOnHashes = [];
for (const dir of dirs) {
console.info(dir);
// const RESUME = 869000 + 283000 + 772000;
// UPLOAD FROM LOCAL FILE LISTING:
// const RESUME = 0;
// const files = (await readdir(dir)).slice(RESUME);
// const total = files.length;
// UPLOAD FROM DATABASE FILE LISTING:
const files = await mongo
.db("revolt")
.collection("attachments")
.find(
{
tag: dir,
// don't upload delete files
deleted: {
$ne: true,
},
// don't upload already processed files
hash: {
$exists: false,
},
},
{
projection: { _id: 1 },
}
)
.toArray()
.then((arr) => arr.map((x) => x._id));
const total = files.length;
let i = 0;
let skipsA = 0,
skipsB = 0;
await Promise.all(
new Array(workerCount).fill(0).map(async (_) => {
while (true) {
const file = files.shift();
if (!file) return;
i++;
console.info(i, files.length, file);
// if (i < 869000) continue; // TODO
// if (i > 3000) break;
if (USE_CACHE) {
if (processed_ids.has(file)) {
console.info(i, "Skip, known file.");
continue;
}
}
const doc = await mongo
.db("revolt")
.collection("attachments")
.findOne({
_id: file,
// don't upload delete files
deleted: {
$ne: true,
},
// don't upload already processed files
hash: {
$exists: false,
},
});
if (!doc) {
console.info(
i,
"Skipping as it does not exist in DB, is queued for deletion, or has already been processed!"
);
skipsA += 1;
continue;
}
const metaUseInfo = await determineUploaderIdAndUse(doc, dir, i);
if (!metaUseInfo) {
if (USE_CACHE) {
processed_ids.add(file);
}
console.info(i, "Skipping as it hasn't been attached to anything!");
skipsB += 1;
continue;
}
const start = +new Date();
let buff;
try {
buff = await readFile(resolve(dir, file));
} catch (err) {
if (err.code === "ENOENT") {
if (USE_CACHE) {
processed_ids.add(file);
}
console.log(i, "File not found!");
await mongo.db("revolt").collection("logs").insertOne({
type: "missingFile",
desc: "File doesn't exist!",
file,
});
continue;
} else {
throw err;
}
}
const hash = createHash("sha256").update(buff).digest("hex");
while (workingOnHashes.includes(hash)) {
console.log(
"Waiting to avoid race condition... hash is already being processed..."
);
await new Promise((r) => setTimeout(r, 1000));
}
workingOnHashes.push(hash);
// merge existing
const existingHash = await mongo
.db("revolt")
.collection("attachment_hashes")
.findOne({
_id: hash,
});
if (existingHash) {
console.info(i, "Hash already uploaded, merging!");
await mongo
.db("revolt")
.collection("attachments")
.updateOne(
{
_id: file,
},
{
$set: {
size: existingHash.size,
hash,
...metaUseInfo,
},
}
);
await mongo.db("revolt").collection("logs").insertOne({
type: "mergeHash",
desc: "Merged an existing file!",
hash: existingHash._id,
size: existingHash.size,
});
workingOnHashes = workingOnHashes.filter((x) => x !== hash);
continue;
}
// encrypt
const { iv, data } = await encryptFile(buff);
const end = +new Date();
console.info(metaUseInfo); // + write hash
console.info(
file,
hash,
iv,
`${end - start}ms`,
buff.byteLength,
"bytes"
);
let retry = true;
while (retry) {
try {
const urlResp = await b2.getUploadUrl({
bucketId: "---", // revolt-uploads
});
await b2.uploadFile({
uploadUrl: urlResp.data.uploadUrl,
uploadAuthToken: urlResp.data.authorizationToken,
fileName: hash,
data,
onUploadProgress: (event) => console.info(event),
});
await mongo
.db("revolt")
.collection("attachment_hashes")
.insertOne({
_id: hash,
processed_hash: hash,
created_at: new Date(), // TODO on all
bucket_id: "revolt-uploads",
path: hash,
iv: iv.toString("base64"),
metadata: doc.metadata,
content_type: doc.content_type,
size: data.byteLength,
});
await mongo
.db("revolt")
.collection("attachments")
.updateOne(
{
_id: file,
},
{
$set: {
size: data.byteLength,
hash,
...metaUseInfo,
},
}
);
retry = false;
} catch (err) {
if (
(err.isAxiosError &&
(err.response?.status === 503 ||
err.response?.status === 500)) ||
(err?.code === "ENOTFOUND" && err?.syscall === "getaddrinfo") ||
(err?.code === "ETIMEDOUT" && err?.syscall === "connect") ||
(err?.code === "ECONNREFUSED" && err?.syscall === "connect")
) {
console.error(i, err.response.status, "ERROR RETRYING");
await mongo
.db("revolt")
.collection("logs")
.insertOne({
type: "upload503",
desc:
"Hit status " +
(err?.code === "ETIMEDOUT" && err?.syscall === "connect"
? "Network issue (ETIMEDOUT connect)"
: err?.code === "ECONNREFUSED" &&
err?.syscall === "connect"
? "Network issue (ECONNREFUSED connect)"
: err?.code === "ENOTFOUND" &&
err?.syscall === "getaddrinfo"
? "DNS issue (ENOTFOUND getaddrinfo)"
: err.response?.status) +
", trying a new URL!",
hash,
});
await new Promise((r) => setTimeout(() => r(), 1500));
} else {
await dumpCache().catch(console.error);
throw err;
}
}
}
console.info(i, "Successfully uploaded", file, "to S3!");
console.info(
"*** ➡️ Processed",
i,
"out of",
total,
"files",
((i / total) * 100).toFixed(2),
"%"
);
workingOnHashes = workingOnHashes.filter((x) => x !== hash);
}
})
);
console.info("Skips (A):", skipsA, "(B):", skipsB);
break;
}
await dumpCache().catch(console.error);
process.exit(0);