Как подружить Bagri и MongoDB

от автора

Примерно месяц назад, я рассказал Хабру о проекте Bagri: NoSQL базе данных с открытым кодом, построенной поверх распределенного кэша.

После достаточно неплохого отклика, решил написать статью о том как можно наращивать функционал Bagri путем написания расширений (extensions) используя встроенный API системы.

image

На данный момент Bagri публикует два API для подключения к внешним системам: DataFormat API и DataStore API.

Первый предназначен для парсинга документов в новом формате и преобразования их во внутренний формат системы, а также для обратного построения документов в новом формате из внутреннего представления системы.

Второй API служит для загрузки/сохранения/удаления документов из внешних систем хранения. Зачастую, для подключения к новому источнику документов необходимо реализовать оба интерфейса.

Я покажу, как реализовать DataStore connector к MongoDB и использовать его в качестве системы хранения документов. В данном случае реализации DataFormat API не требуется, поскольку Mongo предоставляет документы в JSON формате, который изначально поддерживается системой.

Сразу хочу сделать пару замечаний:

  1. Практическая польза от такого коннектора? Очевидно, Mongo можно просто использовать в качестве централизованного хранилища документов. Так же он может быть полезен в сценариях, описанных в данной статье, когда данные уже хранятся в Mongo, но ее возможностей стало не хватать для развития функционала системы;
  2. Я не являюсь знатоком MongoDB, если есть более оптимальные способы работы с ней, я буду рад их услышать;

Итак, начнем.

DataStore API предполагает реализацию интерфейса com.bagri.xdm.cache.api.DocumentStore:

public interface DocumentStore { 	         /**          * Lifecycle method. Invoked when the store initialized.           *           * @param context the environment context          */         void init(Map<String, Object> context); 	         /**          * Lifecycle method. Invoked when parent schema is closing          */         void close();          /**          * Load document from persistent store          *           * @param key the document key          * @return XDM Document instance if corresponding document found, null otherwise          */         Document loadDocument(DocumentKey key);          /**          * Load bunch of documents from persistent store          *           * @param keys the collection of document keys to load          * @return the map of loaded documents with their keys          */         Map<DocumentKey, Document> loadAllDocuments(Collection<DocumentKey> keys);          /**          * Load document keys. Can do it in synch or asynch way.          *           * @return iterator over found document keys          */         Iterable<DocumentKey> loadAllDocumentKeys();          /**          * Stores document to persistent store.          *           * @param key the document key          * @param value the XDM document instance          */         void storeDocument(DocumentKey key, Document value);          /**          * Stores bunch of documents to persistent store          *           * @param entries the map of document keys and corresponding document instances          */         void storeAllDocuments(Map<DocumentKey, Document> entries);          /**          * Deletes document from persistent store          *           * @param key the document key          */         void deleteDocument(DocumentKey key);          /**          * Deletes bunch o documents from persistent store           *           * @param keys the keys identifying documents to be deleted           */         void deleteAllDocuments(Collection<DocumentKey> keys); 		 } 

Проще всего будет наследовать наш класс, реализующий интерфейс DocumentStore, от абстрактного класса DocumentStoreBase, предоставляющего набор вспомогательных методов для доступа к контексту системы. Начнем с обработки параметров подключения к внешней системе и его инициализации.

Для подключения к Mongo нам понадобятся адрес сервера mongod, имя базы данных и имена коллекций, из которых мы хотим загружать документы. Определим имена для этих параметров: mongo.db.uri, mongo.db.database, mongo.db.collections. Тогда код инициализации подключения к серверу mongo может выглядеть таким образом:

public class MongoDBStore extends DocumentStoreBase implements DocumentStore { 	         private MongoClient client;         private MongoDatabase db;         private Map<String, MongoCollection<org.bson.Document>> clMap = new HashMap<>(); 	         @Override         public void init(Map<String, Object> context) { 	    super.setContext(context); 	    String uri = (String) context.get("mongo.db.uri");  	    MongoClientURI mcUri = new MongoClientURI(uri); 	    client = new MongoClient(mcUri);  	    String dbName = (String) context.get("mongo.db.database"); 	    db = client.getDatabase(dbName); 		 	    String clNames = (String) context.get("mongo.db.collections"); 	    boolean all = "*".equals(clNames); 	    List<String> clns = Arrays.asList(clNames.split(",")); 	    for (String clName: db.listCollectionNames()) { 		if (all || clns.contains(clName)) { 		    MongoCollection<org.bson.Document> cln = db.getCollection(clName);  		    clMap.put(clName, cln); 		} 	    }         }          @Override         public void close() {                 client.close();         } } 

Метод init принимает параметры подключения к серверу mongod из контекста, устанавливает соединение и кэширует объект MongoCollection для каждой коллекции, объявленной для загрузки. Теперь нам нужно реализовать метод для загрузки всех ключей документов.

        private String getMappingKey(String id, String cln) { 	      return id + "::" + cln;         } 	         private String[] getMappingParts(String keyMap) { 	      return keyMap.split("::");         }          @Override         public Iterable<DocumentKey> loadAllDocumentKeys() { 	    SchemaRepository repo = getRepository();             if (repo == null) { 		return null; 	    }  	    String id; 	    DocumentKey key; 	    Map<DocumentKey, String> keyMap = new HashMap<>(); 	    for (MongoCollection<org.bson.Document> cln: clMap.values()) { 		String clName = cln.getNamespace().getCollectionName(); 		// load _ids only 		MongoCursor<org.bson.Document> cursor =   cln.find().projection(include(“_id”)).iterator(); 		                 while (cursor.hasNext()) { 		      org.bson.Document doc = cursor.next(); 		      id = doc.get(“_id”).toString(); 		      // TODO: handle possible duplicates via revisions 		      key = repo.getFactory().newDocumentKey(id, 0, 1); 		      keyMap.put(key, getMappingKey(id, clName)); 		} 	    }    	    PopulationManagement popManager = repo.getPopulationManagement();    	    popManager.setKeyMappings(keyMap); 	    return keyMap.keySet();         } 

И методы загрузки документов, соответствующих загруженным ключам:

       @Override         public Map<DocumentKey, Document> loadAllDocuments(Collection<DocumentKey> keys) { 	    Map<DocumentKey, Document> entries = new HashMap<>(keys.size()); 	    for (DocumentKey key: keys) { 		Document doc = loadDocument(key); 		if (doc != null) { 		      entries.put(key, doc); 		} 	    } 	    return entries;         }          @Override         public Document loadDocument(DocumentKey key) {           SchemaRepository repo = getRepository();     	  Document doc = null; 	    	  PopulationManagement popManager = repo.getPopulationManagement();      	  String id = popManager.getKeyMapping(key);      	  if (id == null) { 	      return null;     	  }     	  String[] mParts = getMappingParts(id); 	  Document newDoc = null;      	  int[] clns = null;     	  com.bagri.xdm.system.Collection xcl = repo.getSchema().getCollection(mParts[1]);     	  if (xcl != null) {     	        clns = new int[] {xcl.getId()};     	  }     	  MongoCollection<org.bson.Document> cln = clMap.get(mParts[1]);  	  Object oid;     	  Date creDate;     	  try {     	        oid = new ObjectId(mParts[0]);     	        creDate = ((ObjectId) oid).getDate();     	  } catch (IllegalArgumentException ex) { 	        oid = mParts[0];     	        creDate = new Date();     	  } 	  org.bson.Document mongoDoc = cln.find(eq("_id", oid)).first(); 	  String content = mongoDoc.toJson(new JsonWriterSettings(true));     	  try {        	        DocumentManagementl docMgr = (DocumentManagement) repo.getDocumentManagement();  	        newDoc = docMgr.createDocument(key, mParts[0], content, “JSON”, creDate, "owner", 1, clns, true); 	  } catch (XDMException ex) { 	        // TODO: log error, but do not stop the whole loading 	  } 	  return doc;       } 

Для начала этого достаточно, методы storeDocument/storeAllDocuments и deleteDocument/deleteAllDocuments предлагаю реализовать читателю самостоятельно. Также прошу учесть, что приведенный выше код предназначен только в целях демонстрации процесса реализации коннектора и не обрабатывает различные исключительные ситуации и возможные дополнительные параметры конфигурации. Полный код коннектора можно посмотреть и собрать из репозитория bagri-extensions.

Теперь нам нужно зарегистрировать DataStore connector и объявить схему, которая будет его использовать. Для этого нам необходимо добавить конфигурацию коннектора в файл <BAGRI_HOME>/config/config.xml, в секцию dataStores:

<dataStore name="mongo">     <version>1</version>     <createdAt>2016-08-01T16:17:20.542+03:00</createdAt>     <createdBy>admin</createdBy>     <description>MongoDB data store</description>     <enabled>true</enabled>     <storeClass>com.bagri.samples.MongoDBStore</storeClass>     <properties>         <entry name="mongo.db.uri">mongodb://localhost:27017</entry>         <entry name="mongo.db.database">test</entry>         <entry name="mongo.db.collections">*</entry>      </properties> </dataStore> 

Мы протестируем работу коннектора на примере коллекции restaurants, рассматриваемой во многих примерах работы с MongoDB. Загрузите коллекцию тестовых документов в Mongo, как показано здесь: docs.mongodb.com/getting-started/shell/import-data. Теперь зарегистрируем схему для работы с MongoDB и настроим ее на загрузку данных из данной коллекции. В том же файле config.xml добавим новую схему в раздел schemas:

Добавляем следующие параметры:

<entry name="xdm.schema.store.enabled">true</entry> <entry name="xdm.schema.store.type">mongo</entry> <entry name="mongo.db.collections">restaurants</entry> 

Раздел schemas в config.xml с новыми параметрами

<schema name="Mongo" active="true">             <version>1</version>             <createdAt>2016-08-01T21:30:58.096+04:00</createdAt>             <createdBy>admin</createdBy>             <description>Schema for MongoDB</description>             <properties>                 <entry name="xdm.schema.store.tx.buffer.size">1024</entry>                 <entry name="xdm.schema.data.backup.read">false</entry>                 <entry name="xdm.schema.trans.backup.async">0</entry>                 <entry name="xdm.schema.store.enabled">true</entry>                 <entry name="xdm.schema.thread.pool">10</entry>                 <entry name="xdm.schema.data.stats.enabled">true</entry>                 <entry name="xdm.schema.query.cache">true</entry>                 <entry name="xdm.schema.store.type">mongo</entry>                 <entry name="mongo.db.collections">restaurants</entry>                 <entry name="xdm.schema.format.default">JSON</entry>                 <entry name="xdm.schema.ports.first">10300</entry>                 <entry name="xdm.schema.ports.last">10400</entry>                 <entry name="xdm.schema.population.size">1</entry>                 <entry name="xdm.schema.population.buffer.size">1000000</entry>                 <entry name="xdm.schema.data.backup.async">1</entry>                 <entry name="xdm.schema.store.data.path">../data/mongo</entry>                 <entry name="xdm.schema.dict.backup.sync">0</entry>                 <entry name="xdm.schema.trans.backup.sync">1</entry>                 <entry name="xdm.schema.query.backup.sync">0</entry>                 <entry name="xdm.schema.buffer.size">16</entry>                 <entry name="xdm.schema.dict.backup.async">1</entry>                 <entry name="xdm.schema.dict.backup.read">true</entry>                 <entry name="xdm.schema.trans.backup.read">false</entry>                 <entry name="xdm.schema.query.backup.async">0</entry>                 <entry name="xdm.schema.members">localhost</entry>                 <entry name="xdm.schema.data.backup.sync">0</entry>                 <entry name="xdm.schema.partition.count">157</entry>                 <entry name="xdm.schema.query.backup.read">true</entry> 	        <entry name="xdm.schema.transaction.timeout">0</entry>                 <entry name="xdm.schema.health.threshold.low">25</entry>                 <entry name="xdm.schema.health.threshold.high">0</entry>                 <entry name="xdm.schema.query.parallel">true</entry>                 <entry name="xdm.schema.partition.pool">32</entry>                 <entry name="xqj.schema.baseUri">file:///../data/mongo/</entry>                 <entry name="xqj.schema.orderingMode">2</entry>                 <entry name="xqj.schema.queryLanguageTypeAndVersion">1</entry>                 <entry name="xqj.schema.bindingMode">0</entry>                 <entry name="xqj.schema.boundarySpacePolicy">1</entry>                 <entry name="xqj.schema.scrollability">1</entry>                 <entry name="xqj.schema.holdability">2</entry>                 <entry name="xqj.schema.copyNamespacesModePreserve">1</entry>                 <entry name="xqj.schema.queryTimeout">0</entry>                 <entry name="xqj.schema.defaultFunctionNamespace"></entry>                 <entry name="xqj.schema.defaultElementTypeNamespace"></entry>                 <entry name="xqj.schema.copyNamespacesModeInherit">1</entry>                 <entry name="xqj.schema.defaultOrderForEmptySequences">2</entry>                 <entry name="xqj.schema.defaultCollationUri"></entry>                 <entry name="xqj.schema.constructionMode">1</entry>             </properties>             <collections>                 <collection id="1" name="restaurants">                     <version>1</version>                     <createdAt>2016-08-01T01:01:26.965+03:00</createdAt>                     <createdBy>admin</createdBy>                     <description>Mongo restaurants collection</description>                     <enabled>true</enabled>                 </collection>             </collections>             <fragments/>             <indexes/>             <triggers/>         </schema> 

Кроме того, нужно создать новый файл с профилем сервера. В каталоге <BAGRI_HOME>/config создайте файл mongo.properties и укажите в нем используемую сервером схему:

xdm.cluster.node.schemas=Mongo 

Удостоверьтесь, что сервер MongoDB запущен и ждет подключений по адресу, указанному в настройках коннектора. Теперь можно стартовать сервер Bagri. В каталоге <BAGRI_HOME>/bin выполните команду >bgcache.cmd mongo (on Windows) или >./bgcache.sh mongo (on Linux). Данный скрипт стартует одиночный сервер Bagri с настройками из профиля mongo.properties. По окончании загрузки серверный лог должен содержать следующие строчки:

image

показывающие, что в коннектор инициализировал схему Mongo и загрузил в нее 25359 документов из внешнего сервера MongoDB.

Теперь я покажу, как можно манипулировать документами JSON с помощью запросов XQuery.

Для интерактивного выполнения запросов XQuery нам понадобится клиент, позволяющий это делать. С дистрибутивом Bagri поставляется plugin к VisualVM, предоставляющий данную функциональность. Инструкцию по его установке смотрите здесь.

Запустите административный сервер Bagri <BAGRI_HOME>/bin/bgadmin. Откройте приложение VisualVM, подключитесь к административному серверу Bagri Manager и выберите схему Mongo. Закладка DocumentManagement позволяет работать с документами и коллекциями:

image

, а закладка QueryManagement с запросами XQuery. Выполните следующий простой запрос для выборки ресторана по его идентификатору:

declare namespace m="http://www.w3.org/2005/xpath-functions/map";  let $props := map{'method': 'json', 'indent': fn:true()}  for $uri in fn:uri-collection("restaurants") let $map := fn:json-doc($uri) where m:get($map, 'restaurant_id') = '40362098' return (fn:serialize($map, $props), '\&\#xa;') 

* Обратите внимание, что символ перевода строки в самой последней строке экранирован \, так как Хабр превращает его в действительный перевод строки, так что при выполнении запроса символ \ нужно убрать.

image

Или другой, для выборки ресторанов по типу кухни:

declare namespace m="http://www.w3.org/2005/xpath-functions/map";  let $props := map{'method': 'json'}  for $uri in fn:uri-collection("restaurants") let $map := fn:json-doc($uri) where m:get($map, 'cuisine') = 'Bakery' return (fn:serialize($map, $props), '\&\#xa;') 

image

XQuery легко позволяет делать любые выборки, доступные в Mongo (кроме запросов по гео-индексам, прямо из коробки они еще не поддерживаются).

Теперь я покажу запросы, которые не поддерживаются в MongoDB: JOIN. Для этого можно привести данные в коллекции restaurants к более нормализованному виду, например отделить отзывы о ресторанах от данных по самому ресторану и сохранить их в разных коллекциях.

Выполните данный запрос и сохраните результаты в файл, потом сделайте импорт полученных данных в MongoDB, в коллекцию rest-short.

declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json'}  for $uri in fn:uri-collection("restaurants") let $rest := fn:json-doc($uri) let $rest := m:remove($rest, '_id') let $rest := m:remove($rest, 'grades') return (fn:serialize($rest, $props), '\&\#xa;') 

Следующий запрос выводит данные по отзывам. Так же сохраните их в отдельный файл и затем импортируйте в MongoDB в коллекцию grades.

declare namespace a="http://www.w3.org/2005/xpath-functions/array"; declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json'}  for $uri in fn:uri-collection("restaurants") let $rest := fn:json-doc($uri) let $grades := m:get($rest, 'grades') return    for $i in (1 to a:size($grades))     let $grade := a:get($grades, $i)     let $date := m:get($grade, 'date')     return ('{"restaurant_id": "', m:get($rest, 'restaurant_id'),  	'", "date": ', fn:serialize($date, $props),  	 ', "grade": "', m:get($grade, 'grade'),  	'", "score": "', m:get($grade, 'score'), '"}', '\&\#xa;') 

Теперь поправьте настройки схемы, чтобы заявить новые коллекции для загрузки:

<schema name="Mongo" active="true"> 	     ………...             <properties>                 <entry name="xdm.schema.store.collections">rest-short, grades</entry> 	         ……...             </properties>             <collections>                 <collection id="2" name="rest-short">                     <version>1</version>                     <createdAt>2016-08-01T01:01:26.965+03:00</createdAt>                     <createdBy>admin</createdBy>                     <description>Restaurant headers collection</description>                     <enabled>true</enabled>                 </collection>                 <collection id="3" name="grades">                     <version>1</version>                     <createdAt>2016-08-01T01:01:26.965+03:00</createdAt>                     <createdBy>admin</createdBy>                     <description>Restaurant grades collection</description>                     <enabled>true</enabled>                 </collection>             </collections>             <fragments/>             <indexes/>             <triggers/> </schema> 

Рестартуйте сервер Bagri для загрузки новых коллекций с данными. Теперь можно проверить, как работают join’ы. Выполните следующий запрос для формирования полной структуры restaurants из двух коллекций:

declare namespace m="http://www.w3.org/2005/xpath-functions/map"; let $props := map{'method': 'json'}  for $ruri in fn:uri-collection("rest-short") let $rest := fn:json-doc($ruri) let $rid := m:get($rest, 'restaurant_id') let $addr := m:get($rest, 'address') let $txt := ('{"restaurant_id": "', $rid, 	'", "cuisine": "', m:get($rest, 'cuisine'),  	'", "name": "', m:get($rest, 'name'),  	'", "borough": "', m:get($rest, 'borough'),  	'", "address": ', fn:serialize($addr, $props), 	', "grades": [') return ($txt, fn:string-join(   for $guri in fn:uri-collection("grades")   let $grade := fn:json-doc($guri)   let $gid := m:get($grade, 'restaurant_id')   where $gid = $rid   return fn:serialize(m:remove(m:remove($grade, '_id'), 'restaurant_id'), $props), ', '), ']}\&\#xa;') 

Итак, мы с вами рассмотрели как можно реализовать DataStore connector к MongoDB и использовать его в качестве системы хранения документов. Надеюсь эта статья сможет стать для вас отправной точкой для написания других расширений Багри или просто побудит вас более подробно ознакомиться с этим интересным продуктом. Проекту всегда требуются Java разработчки заинтересованные в развитии Bagri, более подробно на код проекта можно посмотреть на Гитхабе.
ссылка на оригинал статьи https://habrahabr.ru/post/312626/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *