Статья, в основном, ориентирована для специалистов, которые столкнулись с проблемой «исчезающих» документов в MongoDB и не понимающих, где найти историю этих документов, была ли она вообще сохранена в коллекции или же была обновлена и удалена, и в какой последовательности происходили все эти действия и в какое время.
MongoDB — это популярная NoSQL база данных, широко используемая для хранения больших объемов данных. Одной из ключевых возможностей MongoDB является механизм Oplog (операционный журнал), который позволяет отслеживать изменения в коллекциях. В этой статье мы рассмотрим, как работать с Oplog, искать документы, преобразовывать временные метки и выводить результаты в читаемом формате, что крайне удобно для аналитиков.
Введение в Oplog
Oplog представляет собой журнал, содержащий записи обо всех операциях, которые изменяют состояние базы данных. Это могут быть операции вставки [i], обновления [u] и удаления [d]. Oplog особенно полезен для репликации данных и отслеживания изменений в реальном времени.
Задача
Предположим, у нас есть коллекция документов в MongoDB, и мы хотим отслеживать изменения статусов этих документов. Наша цель — найти документы с определёнными начальными статусами, а затем найти последующие изменения этих документов на новые статусы. Мы также хотим преобразовать временные метки MongoDB в читаемый формат и вывести результаты.
Пример готового запроса в Oplog
function convertNumberLongToISOString(numberLong) { var date = new Date(numberLong.toNumber()); return date.toISOString(); } function convertTimestampToMoscowTime(timestamp) { var date = new Date(timestamp.getHighBits() * 1000); var moscowOffset = 3 * 60 * 60 * 1000; var moscowTime = new Date(date.getTime() + moscowOffset); var day = ('0' + moscowTime.getDate()).slice(-2); var month = ('0' + (moscowTime.getMonth() + 1)).slice(-2); var year = moscowTime.getFullYear(); var hours = ('0' + moscowTime.getHours()).slice(-2); var minutes = ('0' + moscowTime.getMinutes()).slice(-2); var seconds = ('0' + moscowTime.getSeconds()).slice(-2); return day + '-' + month + '-' + year + ' ' + hours + ':' + minutes + ':' + seconds; } var initialStatuses = ["PROCESS", "WELL_DONE"]; // может быть множество значений, перечисленных через запятую var targetStatuses = ["ERROR"]; // может быть множество значений, перечисленных через запятую var startDate = new Date('2020-03-01T00:00:00Z'); // проставляем нужную нам дату по Москве, начиная с которой будет осуществлятся поиск логов var startTimestamp = Timestamp(Math.floor(startDate.getTime() / 1000), 0); var initialDocuments = db.getCollection("oplog.rs").find({ "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1 "o.statusCode": { $in: initialStatuses }, "ts": { $gte: startTimestamp } }, { // тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса // перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object "o.statusCode": 1, "o.cadId": 1, "o.pupsId": 1, "o.number": 1, "o.date": 1, "o.dateFast": 1, "o.refactor": 1, "o.rembo": 1, "o._id": 1, "op": 1, // тут не стоит o. , тк это поле относится уже к oplog-данным "ts": 1 // тут не стоит o. , тк это поле относится уже к oplog-данным }).toArray(); if (initialDocuments.length > 0) { var documentIds = initialDocuments.map(function(doc) { return doc.o._id; // берем конкретный _id документа и смотрим переход по нужным нам статусам }); var targetDocuments = db.getCollection("oplog.rs").find({ "op": { $in: ["u", "i"] }, // ищем операции u и i (update - обновление и insert - новая), можно оставить только одну из них "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1 "o._id": { $in: documentIds }, "o.statusCode": { $in: targetStatuses }, "ts": { $gte: startTimestamp } }, { // тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса // перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object "o.statusCode": 1, "o.cadId": 1, "o.pupsId": 1, "o.number": 1, "o.date": 1, "o.dateFast": 1, "o.refactor": 1, "o.rembo": 1, "o._id": 1, "op": 1, // тут не стоит o. , тк это поле относится уже к oplog-данным "ts": 1 // тут не стоит o. , тк это поле относится уже к oplog-данным }).toArray(); var initialDocMap = {}; initialDocuments.forEach(function(doc) { initialDocMap[doc.o._id] = doc; }); var matchCount = 0; targetDocuments.forEach(function(doc) { if (matchCount >= 100) return; // остановка вывода результатов после 100 совпадений, чтобы не грохнуть БД) но вы можете снять это ограничение или уменьшить var prevDoc = initialDocMap[doc.o._id]; if (prevDoc && doc.ts.getHighBits() > prevDoc.ts.getHighBits()) { print("Document with pupsId" + prevDoc.o.pupsId + ":"); // тут пишем, что хотим видеть в заголовке найденных документов, в данном случае я хочу видеть в заголовке наименование компании printjson({ // перечисляем поля для красивого вывода, где можно преобразовать наименование поля из коллекции в более удобочитаемое _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog refactor: prevDoc.o.refactor, rembo: prevDoc.o.rembo, cadId: prevDoc.o.cadId, pupsId: prevDoc.o.pupsId, statusCode: prevDoc.o.statusCode, number: prevDoc.o.number, dateFast: convertNumberLongToISOString(prevDoc.o.dateFast), date: prevDoc.o.date, op: prevDoc.op, ts: convertTimestampToMoscowTime(prevDoc.ts) }); print("Document with pupsId" + doc.o.pupsId + ":"); printjson({ _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog refactor: prevDoc.o.refactor, rembo: prevDoc.o.rembo, cadId: prevDoc.o.cadId, pupsId: prevDoc.o.pupsId, statusCode: prevDoc.o.statusCode, number: prevDoc.o.number, dateFast: convertNumberLongToISOString(prevDoc.o.dateFast), date: prevDoc.o.date, op: doc.op, ts: convertTimestampToMoscowTime(doc.ts) }); matchCount++; } }); if (matchCount === 0) { print("No matching documents found."); // пишем в свободной форме, какой текст вывести, если документов не нашлось } } else { print("No documents found with initial statuses."); // пишем в свободной форме, какой текст вывести, если документов, с изначальным (initial) статусом не нашлось } // Поиск операций удаления var deleteOperations = db.getCollection("oplog.rs").find({ "op": "d" }).limit(100).toArray(); // Лимит на выдачу документов // Вывод операций удаления deleteOperations.forEach(function(doc) { print("Delete operation:"); printjson({ _id: doc.o._id, ts: convertTimestampToMoscowTime(doc.ts), op: doc.op, ns: doc.ns, ui: doc.ui }); });
Пример ответа
Document with pupsId Ромашка: { _id: '1234fgbf4d8d48aa8a2ca44565fds43j', // по этому _id мы нашли 3 записи обновления, которые произошли с этим документом после утсановленной нами даты в запросе 2020-03-01T00:00:00Z refactor: 'sdgds435g', rembo: null, cadId: '444fff', pupsId: 'UBP9JN', statusCode: 'ERROR', number: '692343', dateFast: '2023-11-15T18:16:59.636Z', date: '15.11.2023', op: 'u', // также можно побаловаться запросом и при необходимости преобразовать "u" в "обновление" ts: '16-11-2023 00:19:02' // наш преобразованный NumberLong в удобочитаемом формате } Document with pupsId Ромашка: { _id: '1234fgbf4d8d48aa8a2ca44565fds43j', refactor: 'sdfsdf454ff', rembo: null, cadId: '444fff', pupsId: 'UBP9JN', statusCode: 'WELL_DONE', number: '75324324', dateFast: '2023-11-16T15:02:04.655Z', date: '16.11.2023', op: 'u', ts: '16-11-2023 21:02:05' } Document with pupsId Ромашка: { _id: '1234fgbf4d8d48aa8a2ca44565fds43j', refactor: 'sdfsdg4543fg', rembo: null, cadId: '444fff', pupsId: 'UBP9JN', statusCode: 'ERROR', number: '75234234', dateFast: '2023-11-16T15:02:05.655Z', date: '16.11.2023', op: 'u', ts: '16-11-2023 21:04:35' }
Подробные шаги выполнения запроса выше
-
Получение UUID коллекции для запроса в Oplog.
-
Подготовка функций для преобразования временных меток.
-
Поиск начальных документов с определёнными статусами.
-
Поиск последующих изменений этих документов.
-
Вывод результатов в читаемом формате.
-
Поиск операций удаления [d].
Получение UUID коллекции для запроса в Oplog
Поскольку oplog хранится в БД local и используется сразу для всех коллекций внутри сета серверов, то нам необходимо получить конкретный UUID коллекции, из которой хотим получить хронологию данных по документам.
Запрос вводится в окне query-запроса выбранной коллекции
db.getCollectionInfos()
Пример ответа
Из этого ответа нам понадобится значение из поля «uuid», которое в дальнейшем будем вставлять в поле «ui» в окне query-запроса Oplog.
[ { "name" : "your-collection-name", "type" : "collection", "options" : { }, "info" : { "readOnly" : false, "uuid" : UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54") // необходимое значение }, "idIndex" : { "v" : 2.0, "key" : { "_id" : 1.0 }, "name" : "_id_" } } ]
Подготовка функций для преобразования временных меток
MongoDB хранит временные метки в формате Timestamp и NumberLong. Для удобства чтения нам нужно преобразовать их в стандартные временные форматы.
Функция для преобразования NumberLong в ISO-строку
function convertNumberLongToISOString(numberLong) { var date = new Date(numberLong.toNumber()); return date.toISOString(); }
Функция для преобразования Timestamp в московское время
function convertTimestampToMoscowTime(timestamp) { var date = new Date(timestamp.getHighBits() * 1000); var moscowOffset = 3 * 60 * 60 * 1000; // Москва на 3 часа впереди UTC var moscowTime = new Date(date.getTime() + moscowOffset); var day = ('0' + moscowTime.getDate()).slice(-2); var month = ('0' + (moscowTime.getMonth() + 1)).slice(-2); var year = moscowTime.getFullYear(); var hours = ('0' + moscowTime.getHours()).slice(-2); var minutes = ('0' + moscowTime.getMinutes()).slice(-2); var seconds = ('0' + moscowTime.getSeconds()).slice(-2); return day + '-' + month + '-' + year + ' ' + hours + ':' + minutes + ':' + seconds; }
Поиск начальных документов с определёнными статусами
Наша задача — найти документы с начальными статусами ERROR, начиная с определённой даты.
В примере мы используем поле statusCode, которое имеется в каждом документе нашей коллекции.
Выполняем поиск по логике — initialStatuses — статус, который был записан первым и targetStatuses — статус, который обновил предыдущий статус.
Для этого используем следующий запрос:
var initialStatuses = ["PROCESS", "WELL_DONE"]; // может быть множество значений, перечисленных через запятую var startDate = new Date('2024-03-01T00:00:00Z'); // проставляем нужную нам дату по Москве var startTimestamp = Timestamp(Math.floor(startDate.getTime() / 1000), 0); var initialDocuments = db.getCollection("oplog.rs").find({ "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1 "o.statusCode": { $in: initialStatuses }, "ts": { $gte: startTimestamp } }, { // тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса // перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object "o.statusCode": 1, "o.cadId": 1, "o.pupsId": 1, "o.number": 1, "o.date": 1, "o.dateFast": 1, "o.refactor": 1, "o.rembo": 1, "o._id": 1, "op": 1, "ts": 1 }).toArray();
Поиск последующих изменений этих документов
После получения начальных документов ищем их изменения на целевые статусы DEBITED или DOCUMENT_DONE.
var targetStatuses = ["ERROR"]; if (initialDocuments.length > 0) { var documentIds = initialDocuments.map(function(doc) { return doc.o._id; }); var targetDocuments = db.getCollection("oplog.rs").find({ "op": { $in: ["u", "i"] }, // ищем операции u и i (update - обновление и insert - новая) "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1 "o._id": { $in: documentIds }, "o.statusCode": { $in: targetStatuses }, "ts": { $gte: startTimestamp } }, { "o.statusCode": 1, "o.cadId": 1, "o.pupsId": 1, "o.number": 1, "o.date": 1, "o.dateFast": 1, "o.refactor": 1, "o.rembo": 1, "o._id": 1, "op": 1, "ts": 1 }).toArray();
Вывод результатов в читаемом формате
Для удобства создадим карту начальных документов, а затем выведем сопоставленные документы.
var initialDocMap = {}; initialDocuments.forEach(function(doc) { initialDocMap[doc.o._id] = doc; }); var matchCount = 0; targetDocuments.forEach(function(doc) { if (matchCount >= 100) return; var prevDoc = initialDocMap[doc.o._id]; if (prevDoc && doc.ts.getHighBits() > prevDoc.ts.getHighBits()) { print("Document with pupsId" + prevDoc.o.pupsId + ":"); printjson({ _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog refactor: prevDoc.o.refactor, rembo: prevDoc.o.rembo, cadId: prevDoc.o.cadId, pupsId: prevDoc.o.pupsId, statusCode: prevDoc.o.statusCode, number: prevDoc.o.number, dateFast: convertNumberLongToISOString(prevDoc.o.dateFast), date: prevDoc.o.date, op: prevDoc.op, ts: convertTimestampToMoscowTime(prevDoc.ts) }); print("Document with pupsId" + doc.o.pupsId + ":"); printjson({ _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog refactor: prevDoc.o.refactor, rembo: prevDoc.o.rembo, cadId: prevDoc.o.cadId, pupsId: prevDoc.o.pupsId, statusCode: prevDoc.o.statusCode, number: prevDoc.o.number, dateFast: convertNumberLongToISOString(prevDoc.o.dateFast), date: prevDoc.o.date, op: doc.op, ts: convertTimestampToMoscowTime(doc.ts) }); matchCount++; } }); if (matchCount === 0) { print("No matching documents found."); } } else { print("No documents found with initial statuses."); }
Поиск операций удаления [d]
Мы не можем вставить в начале нашего запроса операцию [d] удаления совместно с операциями [u] (обновление) и [i] (внесение), тк в выводе oplog показывает нужные нам _id и ts (timestamp — время) удаленного документа без полей, свойственных для документов в нашей коллекции.
// Поиск операций удаления var deleteOperations = db.getCollection("oplog.rs").find({ "op": "d" }).limit(100).toArray(); // Лимит на выдачу документов // Вывод операций удаления deleteOperations.forEach(function(doc) { print("Delete operation:"); printjson({ _id: doc.o._id, ts: convertTimestampToMoscowTime(doc.ts), op: doc.op, ns: doc.ns, ui: doc.ui }); });
Этот скрипт выполняет следующие шаги:
-
Создаёт карту начальных документов для быстрого доступа по идентификатору.
-
Проходит по каждому документу из целевых изменений и проверяет, есть ли соответствующий начальный документ с той же ID и более поздней временной меткой.
-
Выводит начальный и изменённый документ в читаемом формате, включая преобразованные временные метки.
Заключение
В этой статье мы рассмотрели, как работать с MongoDB Oplog для отслеживания изменений документов. Мы научились искать документы с определёнными начальными статусами, находить их последующие изменения и выводить результаты в читаемом формате.
ссылка на оригинал статьи https://habr.com/ru/articles/835236/
Добавить комментарий