Работа с MongoDB Oplog: Как отслеживать изменения документов

от автора

Статья, в основном, ориентирована для специалистов, которые столкнулись с проблемой «исчезающих» документов в MongoDB и не понимающих, где найти историю этих документов, была ли она вообще сохранена в коллекции или же была обновлена и удалена, и в какой последовательности происходили все эти действия и в какое время.

MongoDB — это популярная NoSQL база данных, широко используемая для хранения больших объемов данных. Одной из ключевых возможностей MongoDB является механизм Oplog (операционный журнал), который позволяет отслеживать изменения в коллекциях. В этой статье мы рассмотрим, как работать с Oplog, искать документы, преобразовывать временные метки и выводить результаты в читаемом формате, что крайне удобно для аналитиков.

Введение в Oplog

Oplog представляет собой журнал, содержащий записи обо всех операциях, которые изменяют состояние базы данных. Это могут быть операции вставки [i], обновления [u] и удаления [d]. Oplog особенно полезен для репликации данных и отслеживания изменений в реальном времени.

Задача

Предположим, у нас есть коллекция документов в MongoDB, и мы хотим отслеживать изменения статусов этих документов. Наша цель — найти документы с определёнными начальными статусами, а затем найти последующие изменения этих документов на новые статусы. Мы также хотим преобразовать временные метки MongoDB в читаемый формат и вывести результаты.

Пример готового запроса в Oplog

function convertNumberLongToISOString(numberLong) {     var date = new Date(numberLong.toNumber());     return date.toISOString(); }  function convertTimestampToMoscowTime(timestamp) {     var date = new Date(timestamp.getHighBits() * 1000);     var moscowOffset = 3 * 60 * 60 * 1000;     var moscowTime = new Date(date.getTime() + moscowOffset);      var day = ('0' + moscowTime.getDate()).slice(-2);     var month = ('0' + (moscowTime.getMonth() + 1)).slice(-2);     var year = moscowTime.getFullYear();     var hours = ('0' + moscowTime.getHours()).slice(-2);     var minutes = ('0' + moscowTime.getMinutes()).slice(-2);     var seconds = ('0' + moscowTime.getSeconds()).slice(-2);      return day + '-' + month + '-' + year + ' ' + hours + ':' + minutes + ':' + seconds; }  var initialStatuses = ["PROCESS", "WELL_DONE"]; // может быть множество значений, перечисленных через запятую var targetStatuses = ["ERROR"]; // может быть множество значений, перечисленных через запятую  var startDate = new Date('2020-03-01T00:00:00Z'); // проставляем нужную нам дату по Москве, начиная с которой будет осуществлятся поиск логов var startTimestamp = Timestamp(Math.floor(startDate.getTime() / 1000), 0);  var initialDocuments = db.getCollection("oplog.rs").find({     "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1     "o.statusCode": { $in: initialStatuses },     "ts": { $gte: startTimestamp } }, {      // тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса      // перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object     "o.statusCode": 1,     "o.cadId": 1,     "o.pupsId": 1,     "o.number": 1,     "o.date": 1,     "o.dateFast": 1,     "o.refactor": 1,     "o.rembo": 1,     "o._id": 1,     "op": 1, // тут не стоит o. , тк это поле относится уже к oplog-данным     "ts": 1 // тут не стоит o. , тк это поле относится уже к oplog-данным }).toArray();  if (initialDocuments.length > 0) {     var documentIds = initialDocuments.map(function(doc) {         return doc.o._id; // берем конкретный _id документа и смотрим переход по нужным нам статусам     });      var targetDocuments = db.getCollection("oplog.rs").find({         "op": { $in: ["u", "i"] }, // ищем операции u и i (update - обновление и insert - новая), можно оставить только одну из них         "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1         "o._id": { $in: documentIds },         "o.statusCode": { $in: targetStatuses },         "ts": { $gte: startTimestamp }     }, {          // тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса          // перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object        "o.statusCode": 1,        "o.cadId": 1,        "o.pupsId": 1,        "o.number": 1,                 "o.date": 1,        "o.dateFast": 1,        "o.refactor": 1,        "o.rembo": 1,        "o._id": 1,        "op": 1, // тут не стоит o. , тк это поле относится уже к oplog-данным        "ts": 1 // тут не стоит o. , тк это поле относится уже к oplog-данным     }).toArray();      var initialDocMap = {};     initialDocuments.forEach(function(doc) {         initialDocMap[doc.o._id] = doc;     });      var matchCount = 0;     targetDocuments.forEach(function(doc) {         if (matchCount >= 100) return; //  остановка вывода результатов после 100 совпадений, чтобы не грохнуть БД) но вы можете снять это ограничение или уменьшить          var prevDoc = initialDocMap[doc.o._id];         if (prevDoc && doc.ts.getHighBits() > prevDoc.ts.getHighBits()) {             print("Document with pupsId" + prevDoc.o.pupsId + ":"); // тут пишем, что хотим видеть в заголовке найденных документов, в данном случае я хочу видеть в заголовке наименование компании             printjson({ // перечисляем поля для красивого вывода, где можно преобразовать наименование поля из коллекции в более удобочитаемое                 _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog                 refactor: prevDoc.o.refactor,                 rembo: prevDoc.o.rembo,                 cadId: prevDoc.o.cadId,                 pupsId: prevDoc.o.pupsId,                 statusCode: prevDoc.o.statusCode,                 number: prevDoc.o.number,                 dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),                 date: prevDoc.o.date,                 op: prevDoc.op,                 ts: convertTimestampToMoscowTime(prevDoc.ts)             });              print("Document with pupsId" + doc.o.pupsId + ":");             printjson({                 _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog                 refactor: prevDoc.o.refactor,                 rembo: prevDoc.o.rembo,                 cadId: prevDoc.o.cadId,                 pupsId: prevDoc.o.pupsId,                 statusCode: prevDoc.o.statusCode,                 number: prevDoc.o.number,                 dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),                 date: prevDoc.o.date,                 op: doc.op,                 ts: convertTimestampToMoscowTime(doc.ts)             });              matchCount++;         }     });      if (matchCount === 0) {         print("No matching documents found."); // пишем в свободной форме, какой текст вывести, если документов не нашлось     } } else {     print("No documents found with initial statuses."); // пишем в свободной форме, какой текст вывести, если документов, с изначальным (initial) статусом не нашлось }  // Поиск операций удаления var deleteOperations = db.getCollection("oplog.rs").find({     "op": "d" }).limit(100).toArray(); // Лимит на выдачу документов  // Вывод операций удаления deleteOperations.forEach(function(doc) {     print("Delete operation:");     printjson({         _id: doc.o._id,         ts: convertTimestampToMoscowTime(doc.ts),         op: doc.op,         ns: doc.ns,         ui: doc.ui     }); });

Пример ответа

Document with pupsId Ромашка: {   _id: '1234fgbf4d8d48aa8a2ca44565fds43j', // по этому _id мы нашли 3 записи обновления, которые произошли с этим документом после утсановленной нами даты в запросе 2020-03-01T00:00:00Z    refactor: 'sdgds435g',   rembo: null,   cadId: '444fff',   pupsId: 'UBP9JN',   statusCode: 'ERROR',   number: '692343',   dateFast: '2023-11-15T18:16:59.636Z',   date: '15.11.2023',   op: 'u', // также можно побаловаться запросом и при необходимости преобразовать "u" в "обновление"   ts: '16-11-2023 00:19:02' // наш преобразованный NumberLong в удобочитаемом формате } Document with pupsId Ромашка: {   _id: '1234fgbf4d8d48aa8a2ca44565fds43j',   refactor: 'sdfsdf454ff',   rembo: null,   cadId: '444fff',   pupsId: 'UBP9JN',   statusCode: 'WELL_DONE',   number: '75324324',   dateFast: '2023-11-16T15:02:04.655Z',   date: '16.11.2023',   op: 'u',   ts: '16-11-2023 21:02:05' } Document with pupsId Ромашка: {   _id: '1234fgbf4d8d48aa8a2ca44565fds43j',   refactor: 'sdfsdg4543fg',   rembo: null,   cadId: '444fff',   pupsId: 'UBP9JN',   statusCode: 'ERROR',   number: '75234234',   dateFast: '2023-11-16T15:02:05.655Z',   date: '16.11.2023',   op: 'u',   ts: '16-11-2023 21:04:35' }

Подробные шаги выполнения запроса выше

  1. Получение UUID коллекции для запроса в Oplog.

  2. Подготовка функций для преобразования временных меток.

  3. Поиск начальных документов с определёнными статусами.

  4. Поиск последующих изменений этих документов.

  5. Вывод результатов в читаемом формате.

  6. Поиск операций удаления [d].

Получение UUID коллекции для запроса в Oplog

Поскольку oplog хранится в БД local и используется сразу для всех коллекций внутри сета серверов, то нам необходимо получить конкретный UUID коллекции, из которой хотим получить хронологию данных по документам.

Запрос вводится в окне query-запроса выбранной коллекции

db.getCollectionInfos()

Пример ответа

Из этого ответа нам понадобится значение из поля «uuid», которое в дальнейшем будем вставлять в поле «ui» в окне query-запроса Oplog.

[     {         "name" : "your-collection-name",         "type" : "collection",         "options" : {          },         "info" : {             "readOnly" : false,             "uuid" : UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54") // необходимое значение         },         "idIndex" : {             "v" : 2.0,             "key" : {                 "_id" : 1.0             },             "name" : "_id_"         }     } ]

Подготовка функций для преобразования временных меток

MongoDB хранит временные метки в формате Timestamp и NumberLong. Для удобства чтения нам нужно преобразовать их в стандартные временные форматы.

Функция для преобразования NumberLong в ISO-строку

function convertNumberLongToISOString(numberLong) {     var date = new Date(numberLong.toNumber());     return date.toISOString(); }

Функция для преобразования Timestamp в московское время

function convertTimestampToMoscowTime(timestamp) {     var date = new Date(timestamp.getHighBits() * 1000);     var moscowOffset = 3 * 60 * 60 * 1000; // Москва на 3 часа впереди UTC     var moscowTime = new Date(date.getTime() + moscowOffset);      var day = ('0' + moscowTime.getDate()).slice(-2);     var month = ('0' + (moscowTime.getMonth() + 1)).slice(-2);     var year = moscowTime.getFullYear();     var hours = ('0' + moscowTime.getHours()).slice(-2);     var minutes = ('0' + moscowTime.getMinutes()).slice(-2);     var seconds = ('0' + moscowTime.getSeconds()).slice(-2);      return day + '-' + month + '-' + year + ' ' + hours + ':' + minutes + ':' + seconds; }

Поиск начальных документов с определёнными статусами

Наша задача — найти документы с начальными статусами ERROR, начиная с определённой даты.

В примере мы используем поле statusCode, которое имеется в каждом документе нашей коллекции.

Выполняем поиск по логике — initialStatuses — статус, который был записан первым и targetStatuses — статус, который обновил предыдущий статус.

Для этого используем следующий запрос:

var initialStatuses = ["PROCESS", "WELL_DONE"]; // может быть множество значений, перечисленных через запятую var startDate = new Date('2024-03-01T00:00:00Z'); // проставляем нужную нам дату по Москве var startTimestamp = Timestamp(Math.floor(startDate.getTime() / 1000), 0);  var initialDocuments = db.getCollection("oplog.rs").find({     "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1     "o.statusCode": { $in: initialStatuses },     "ts": { $gte: startTimestamp } }, { // тут перечисляем все поля из коллекции, которые хотим видеть в ответе запроса      // перед каждым наименованием поля стоит o. - технически необходимо, сокращение от object     "o.statusCode": 1,     "o.cadId": 1,     "o.pupsId": 1,     "o.number": 1,              "o.date": 1,     "o.dateFast": 1,     "o.refactor": 1,     "o.rembo": 1,     "o._id": 1,     "op": 1,     "ts": 1 }).toArray();

Поиск последующих изменений этих документов

После получения начальных документов ищем их изменения на целевые статусы DEBITED или DOCUMENT_DONE.

var targetStatuses = ["ERROR"];  if (initialDocuments.length > 0) {     var documentIds = initialDocuments.map(function(doc) {         return doc.o._id;     });      var targetDocuments = db.getCollection("oplog.rs").find({         "op": { $in: ["u", "i"] }, // ищем операции u и i (update - обновление и insert - новая)         "ui": UUID("1234e321-a6fr-4egv-b2bf-5aedfv5rgv54"), // пишем значение из выполненного запроса в п.1         "o._id": { $in: documentIds },         "o.statusCode": { $in: targetStatuses },         "ts": { $gte: startTimestamp }     }, {        "o.statusCode": 1,        "o.cadId": 1,        "o.pupsId": 1,        "o.number": 1,                 "o.date": 1,        "o.dateFast": 1,        "o.refactor": 1,        "o.rembo": 1,        "o._id": 1,        "op": 1,        "ts": 1     }).toArray();

Вывод результатов в читаемом формате

Для удобства создадим карту начальных документов, а затем выведем сопоставленные документы.

    var initialDocMap = {};     initialDocuments.forEach(function(doc) {         initialDocMap[doc.o._id] = doc;     });      var matchCount = 0;     targetDocuments.forEach(function(doc) {         if (matchCount >= 100) return;          var prevDoc = initialDocMap[doc.o._id];         if (prevDoc && doc.ts.getHighBits() > prevDoc.ts.getHighBits()) {             print("Document with pupsId" + prevDoc.o.pupsId + ":");             printjson({                 _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog                 refactor: prevDoc.o.refactor,                 rembo: prevDoc.o.rembo,                 cadId: prevDoc.o.cadId,                 pupsId: prevDoc.o.pupsId,                 statusCode: prevDoc.o.statusCode,                 number: prevDoc.o.number,                 dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),                 date: prevDoc.o.date,                 op: prevDoc.op,                 ts: convertTimestampToMoscowTime(prevDoc.ts)             });              print("Document with pupsId" + doc.o.pupsId + ":");             printjson({                 _id: prevDoc.o._id, // слева пишем наименование поля из коллекции, а справа как хотим, чтобы отображалось в выводе в oplog                 refactor: prevDoc.o.refactor,                 rembo: prevDoc.o.rembo,                 cadId: prevDoc.o.cadId,                 pupsId: prevDoc.o.pupsId,                 statusCode: prevDoc.o.statusCode,                 number: prevDoc.o.number,                 dateFast: convertNumberLongToISOString(prevDoc.o.dateFast),                 date: prevDoc.o.date,                 op: doc.op,                 ts: convertTimestampToMoscowTime(doc.ts)             });              matchCount++;         }     });      if (matchCount === 0) {         print("No matching documents found.");     } } else {     print("No documents found with initial statuses."); }

Поиск операций удаления [d]

Мы не можем вставить в начале нашего запроса операцию [d] удаления совместно с операциями [u] (обновление) и [i] (внесение), тк в выводе oplog показывает нужные нам _id и ts (timestamp — время) удаленного документа без полей, свойственных для документов в нашей коллекции.

// Поиск операций удаления var deleteOperations = db.getCollection("oplog.rs").find({     "op": "d" }).limit(100).toArray(); // Лимит на выдачу документов  // Вывод операций удаления deleteOperations.forEach(function(doc) {     print("Delete operation:");     printjson({         _id: doc.o._id,         ts: convertTimestampToMoscowTime(doc.ts),         op: doc.op,         ns: doc.ns,         ui: doc.ui     }); });

Этот скрипт выполняет следующие шаги:

  1. Создаёт карту начальных документов для быстрого доступа по идентификатору.

  2. Проходит по каждому документу из целевых изменений и проверяет, есть ли соответствующий начальный документ с той же ID и более поздней временной меткой.

  3. Выводит начальный и изменённый документ в читаемом формате, включая преобразованные временные метки.

Заключение

В этой статье мы рассмотрели, как работать с MongoDB Oplog для отслеживания изменений документов. Мы научились искать документы с определёнными начальными статусами, находить их последующие изменения и выводить результаты в читаемом формате.


ссылка на оригинал статьи https://habr.com/ru/articles/835236/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *