|
|
|
|
@ -138,6 +138,14 @@ function sanitizeMongoUpdateSetObject(update_obj) {
|
|
|
|
|
return sanitized;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function isMongoWriteAck(result) {
|
|
|
|
|
if (!result) return false;
|
|
|
|
|
if (typeof result.acknowledged === 'boolean') return result.acknowledged;
|
|
|
|
|
if (typeof result.ok === 'number') return result.ok === 1;
|
|
|
|
|
if (result.result && typeof result.result.ok === 'number') return result.result.ok === 1;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function setDB(input_db, input_users_db) {
|
|
|
|
|
db = input_db; users_db = input_users_db;
|
|
|
|
|
exports.db = input_db;
|
|
|
|
|
@ -193,10 +201,7 @@ exports.connectToDB = async (retries = 5, no_fallback = false, custom_connection
|
|
|
|
|
|
|
|
|
|
exports._connectToDB = async (custom_connection_string = null) => {
|
|
|
|
|
const uri = !custom_connection_string ? config_api.getConfigItem('ytdl_mongodb_connection_string') : custom_connection_string; // "mongodb://127.0.0.1:27017/?compressors=zlib&gssapiServiceName=mongodb";
|
|
|
|
|
const client = new MongoClient(uri, {
|
|
|
|
|
useNewUrlParser: true,
|
|
|
|
|
useUnifiedTopology: true,
|
|
|
|
|
});
|
|
|
|
|
const client = new MongoClient(uri);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
await client.connect();
|
|
|
|
|
@ -336,12 +341,12 @@ exports.insertRecordIntoTable = async (table, doc, replaceFilter = null) => {
|
|
|
|
|
}
|
|
|
|
|
]);
|
|
|
|
|
logger.debug(`Inserted doc into ${table} with filter: ${JSON.stringify(replaceFilter)}`);
|
|
|
|
|
return !!(output['result']['ok']);
|
|
|
|
|
return isMongoWriteAck(output);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const output = await database.collection(table).insertOne(doc);
|
|
|
|
|
logger.debug(`Inserted doc into ${table}`);
|
|
|
|
|
return !!(output['result']['ok']);
|
|
|
|
|
return isMongoWriteAck(output);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
exports.insertRecordsIntoTable = async (table, docs, ignore_errors = false) => {
|
|
|
|
|
@ -360,7 +365,7 @@ exports.insertRecordsIntoTable = async (table, docs, ignore_errors = false) => {
|
|
|
|
|
}
|
|
|
|
|
const output = await database.collection(table).insertMany(docs, {ordered: !ignore_errors});
|
|
|
|
|
logger.debug(`Inserted ${output.insertedCount} docs into ${table}`);
|
|
|
|
|
return !!(output['result']['ok']);
|
|
|
|
|
return isMongoWriteAck(output);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
exports.bulkInsertRecordsIntoTable = async (table, docs) => {
|
|
|
|
|
@ -368,18 +373,18 @@ exports.bulkInsertRecordsIntoTable = async (table, docs) => {
|
|
|
|
|
if (using_local_db) {
|
|
|
|
|
return await exports.insertRecordsIntoTable(table, docs);
|
|
|
|
|
}
|
|
|
|
|
if (!docs || docs.length === 0) return true;
|
|
|
|
|
|
|
|
|
|
// not a necessary function as insertRecords does the same thing but gives us more control on batch size if needed
|
|
|
|
|
const table_collection = database.collection(table);
|
|
|
|
|
|
|
|
|
|
let bulk = table_collection.initializeOrderedBulkOp(); // Initialize the Ordered Batch
|
|
|
|
|
|
|
|
|
|
for (let i = 0; i < docs.length; i++) {
|
|
|
|
|
bulk.insert(docs[i]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const output = await bulk.execute();
|
|
|
|
|
return !!(output['result']['ok']);
|
|
|
|
|
const output = await database.collection(table).bulkWrite(
|
|
|
|
|
docs.map(doc => ({
|
|
|
|
|
insertOne: {
|
|
|
|
|
document: doc
|
|
|
|
|
}
|
|
|
|
|
})),
|
|
|
|
|
{ ordered: true }
|
|
|
|
|
);
|
|
|
|
|
return isMongoWriteAck(output);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -407,7 +412,12 @@ exports.getRecords = async (table, filter_obj = null, return_count = false, sort
|
|
|
|
|
return !return_count ? cursor : cursor.length;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const cursor = filter_obj ? database.collection(table).find(filter_obj) : database.collection(table).find();
|
|
|
|
|
const collection = database.collection(table);
|
|
|
|
|
if (return_count) {
|
|
|
|
|
return await collection.countDocuments(filter_obj || {});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const cursor = filter_obj ? collection.find(filter_obj) : collection.find();
|
|
|
|
|
if (sort) {
|
|
|
|
|
cursor.sort({[sort['by']]: sort['order']});
|
|
|
|
|
}
|
|
|
|
|
@ -415,7 +425,7 @@ exports.getRecords = async (table, filter_obj = null, return_count = false, sort
|
|
|
|
|
cursor.skip(range[0]).limit(range[1] - range[0]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return !return_count ? await cursor.toArray() : await cursor.count();
|
|
|
|
|
return await cursor.toArray();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update
|
|
|
|
|
@ -455,7 +465,7 @@ exports.updateRecord = async (table, filter_obj, update_obj, nested_mode = false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const output = await database.collection(table).updateOne(sanitized_filter_obj, {$set: sanitized_update_obj});
|
|
|
|
|
return !!(output['result']['ok']);
|
|
|
|
|
return isMongoWriteAck(output);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
exports.updateRecords = async (table, filter_obj, update_obj) => {
|
|
|
|
|
@ -494,7 +504,7 @@ exports.updateRecords = async (table, filter_obj, update_obj) => {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const output = await database.collection(table).updateMany(sanitized_filter_obj, {$set: sanitized_update_obj});
|
|
|
|
|
return !!(output['result']['ok']);
|
|
|
|
|
return isMongoWriteAck(output);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
exports.removePropertyFromRecord = async (table, filter_obj, remove_obj) => {
|
|
|
|
|
@ -506,7 +516,7 @@ exports.removePropertyFromRecord = async (table, filter_obj, remove_obj) => {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const output = await database.collection(table).updateOne(filter_obj, {$unset: remove_obj});
|
|
|
|
|
return !!(output['result']['ok']);
|
|
|
|
|
return isMongoWriteAck(output);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
exports.bulkUpdateRecordsByKey = async (table, key_label, update_obj) => {
|
|
|
|
|
@ -526,21 +536,19 @@ exports.bulkUpdateRecordsByKey = async (table, key_label, update_obj) => {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const table_collection = database.collection(table);
|
|
|
|
|
|
|
|
|
|
let bulk = table_collection.initializeOrderedBulkOp(); // Initialize the Ordered Batch
|
|
|
|
|
|
|
|
|
|
const item_ids_to_update = Object.keys(update_obj);
|
|
|
|
|
if (item_ids_to_update.length === 0) return true;
|
|
|
|
|
|
|
|
|
|
for (let i = 0; i < item_ids_to_update.length; i++) {
|
|
|
|
|
const item_id_to_update = item_ids_to_update[i];
|
|
|
|
|
bulk.find({[key_label]: item_id_to_update }).updateOne({
|
|
|
|
|
"$set": update_obj[item_id_to_update]
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const output = await bulk.execute();
|
|
|
|
|
return !!(output['result']['ok']);
|
|
|
|
|
const output = await database.collection(table).bulkWrite(
|
|
|
|
|
item_ids_to_update.map(item_id_to_update => ({
|
|
|
|
|
updateOne: {
|
|
|
|
|
filter: {[key_label]: item_id_to_update},
|
|
|
|
|
update: { "$set": update_obj[item_id_to_update] }
|
|
|
|
|
}
|
|
|
|
|
})),
|
|
|
|
|
{ ordered: true }
|
|
|
|
|
);
|
|
|
|
|
return isMongoWriteAck(output);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
exports.pushToRecordsArray = async (table, filter_obj, key, value) => {
|
|
|
|
|
@ -551,7 +559,7 @@ exports.pushToRecordsArray = async (table, filter_obj, key, value) => {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const output = await database.collection(table).updateOne(filter_obj, {$push: {[key]: value}});
|
|
|
|
|
return !!(output['result']['ok']);
|
|
|
|
|
return isMongoWriteAck(output);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
exports.pullFromRecordsArray = async (table, filter_obj, key, value) => {
|
|
|
|
|
@ -562,7 +570,7 @@ exports.pullFromRecordsArray = async (table, filter_obj, key, value) => {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const output = await database.collection(table).updateOne(filter_obj, {$pull: {[key]: value}});
|
|
|
|
|
return !!(output['result']['ok']);
|
|
|
|
|
return isMongoWriteAck(output);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Delete
|
|
|
|
|
@ -575,7 +583,7 @@ exports.removeRecord = async (table, filter_obj) => {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const output = await database.collection(table).deleteOne(filter_obj);
|
|
|
|
|
return !!(output['result']['ok']);
|
|
|
|
|
return isMongoWriteAck(output);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// exports.removeRecordsByUIDBulk = async (table, uids) => {
|
|
|
|
|
@ -658,7 +666,7 @@ exports.removeAllRecords = async (table = null, filter_obj = null) => {
|
|
|
|
|
|
|
|
|
|
const output = await database.collection(table_to_remove).deleteMany(filter_obj ? filter_obj : {});
|
|
|
|
|
logger.debug(`Successfully removed records from ${table_to_remove}`);
|
|
|
|
|
success &= !!(output['result']['ok']);
|
|
|
|
|
success &= isMongoWriteAck(output);
|
|
|
|
|
}
|
|
|
|
|
return success;
|
|
|
|
|
}
|
|
|
|
|
@ -682,8 +690,7 @@ const getDBTableStats = async (table) => {
|
|
|
|
|
if (using_local_db) {
|
|
|
|
|
table_stats['records_count'] = local_db.get(table).value().length;
|
|
|
|
|
} else {
|
|
|
|
|
const stats = await database.collection(table).stats();
|
|
|
|
|
table_stats['records_count'] = stats.count;
|
|
|
|
|
table_stats['records_count'] = await database.collection(table).countDocuments({});
|
|
|
|
|
}
|
|
|
|
|
return table_stats;
|
|
|
|
|
}
|
|
|
|
|
|