Использование LevelDB

Столкнулся с ситуацией, когда мои коллеги для организации локального персистентного key-value хранилища используют SQLite, MemcacheDB, Redis игнорируя встраиваемые хранилища такие как LevelDB, Sophia, HamsterDB и т.д.

Я разбил статью на две части:

  1. небольшое введение в api LevelDB;
  2. использование LevelDB, для хранения временных рядов.

LevelDB и его API

Некоторые свойства LevelDB:

  • хранилище типа ключ-значение;
  • ключ и значение это произвольный массив байт;
  • данные хранятся упорядоченно, порядок можно задавать;
  • прямой и обратный итератор для обхода данных;
  • множественное атомарное обновление;
  • поддержка снимков;
  • сжатие данных через Snappy.
Открытие и закрытие

Открытие:

#include <assert> #include "leveldb/db.h"  leveldb::DB* db; leveldb::Options options; options.create_if_missing = true; leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db); assert(status.ok()); 

Закрытие:

delete db; 

Опции:

Имя Описание Значение по умолчанию
comparator компаратор задающий порядок ключей BytewiseComparator, использует внутри себя memcmp
create_if_missing создать базу, если отсутствует false
error_if_exists выкинуть ошибку, если база существует false
paranoid_checks агрессивная проверка целостности базы false
env окружение через которое будет производится операции ввода/вывода Env::Default()
write_buffer_size размер буфера на запись 4MB
max_open_files количество открытых файлов 1000
block_cache использовать специальный кеш для блоков NULL, создает и использует внутренний кеш объемом 8MB
block_size приблизительный объем пользовательских данных в блоке 4K
compression сжатие блока kSnappyCompression
filter_policy фильтр(Блума) для уменьшения операций чтения с диска. NULL
Slice

Slice это структура представляющая ключ и значение в ряде методов. Slice содержит в себе указатель на данные и размер данных, при этом Slice не содержит в себе буфера для данных, поэтому нельзя допускать таких ситуаций:

leveldb::Slice slice; if (...) {   std::string str = ...;   slice = str; } Use(slice); 

leveldb::Slice имеет ряд конструкторов для удобства использования:

Slice(const char* d, size_t n) : data_(d), size_(n) { } Slice(const std::string& s) : data_(s.data()), size_(s.size()) { } Slice(const char* s) : data_(s), size_(strlen(s)) { } 

И методы для получения данных

const char* leveldb::Slice::data() const; char leveldb::Slice::operator[](size_t n) const; std::string leveldb::Slice::ToString() const; 
Статус

Для информирования о возможных ошибках большинство функций в LevelDB возвращают статус.
По статусу можно проверить выполнилась функция успешно и получить текстовое описание ошибки.

leveldb::Status s = ...; if (!s.ok()) cerr << s.ToString() << endl; 
Чтение, запись, удаление

Сигнатуры Put, Get, Delete:

Status leveldb::DB::Put(const WriteOptions& options, const Slice& key, const Slice& value); Status leveldb::DB::Get(const ReadOptions& options, const Slice& key, std::string* value); Status leveldb::DB::Delete(const WriteOptions& options, const Slice& key); 

Пример использования:

std::string value; leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value); if (s.ok()) s = db->Put(leveldb::WriteOptions(), key2, value); if (s.ok()) s = db->Delete(leveldb::WriteOptions(), key1); 
Итератор

Итератор представлен классом leveldb::Iterator и имеет следующий интерфейс:

bool leveldb::Iterator::Valid() const; void leveldb::Iterator::SeekToFirst(); void leveldb::Iterator::SeekToLast(); void leveldb::Iterator::Seek(const Slice& target); void leveldb::Iterator::Next(); void leveldb::Iterator::Prev(); Slice leveldb::Iterator::key() const; Slice leveldb::Iterator::value() const; Status leveldb::Iterator::status() const; 

Интерфейс итератора предоставляет методы для последовательного и произвольного
доступа. Последовательный доступ может осуществлятся как в прямом, так и в обратном
направлении.

leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) {   cout << it->key().ToString() << ": "  << it->value().ToString() << endl; } assert(it->status().ok());  // Check for any errors found during the scan delete it; 

for (it->Seek(start);      it->Valid() && it->key().ToString() < limit;      it->Next()) {   ... }  

for (it->SeekToLast(); it->Valid(); it->Prev()) {   ... } 

Использование LevelDB, для хранения временных рядов.

Моя работа связана с системами мониторинга, и поэтому появились временные ряды и time series database.

Ограничимся двумя операциями:

  • добавление данных;
  • последовательное чтение интервала.
Схема данных

Для хранение метрик в хранилище типа ключ-значение используют следующую схему: ключ=метрика+временная метка+теги(теги опцианальны).
Подобным образом устроена OpenTSDB, работающая поверх HBase.
Внутри OpenTSDB есть схема uid’ов метрик и схема данных. Такой же принцып будет задействован и здесь.

Одна база будет использоваться для хранения идентификаторов метрик. Ключем тут будет число в size_t, значением строка в стиле Си.

Втроая база будет исопльзоваться под данные, ключем тут будет структура вида:

struct Key {     size_t muid;     time_t timestamp; }; 

значение будет хранится в виде double. Тут на полную катушку используется тот факт, что ключ и значение в LevelDB массив байт,
а значит можем использвать простые структуры данных без всякой сериализации.

Интерфейс хранилища
#pragma once  #include <ctime> #include <cstdint>  #include <memory> #include <unordered_map>  namespace leveldb {     class DB;     class Iterator;     class Slice;     class Comparator;     class Cache; }  /*!  * Хранилище метрик  */ class Storage { public:     class Iterator;     typedef size_t MetricUid;      /*!      * Представление ключа      */     struct Key     {         MetricUid muid;   //!< uid метрики         time_t timestamp; //!< время     };      /*!      * Конструктор      * @param dir каталог для размещения базы данных и базы метрик      * @param cacheSizeMb размер блока кеша      */     Storage(const std::string& dir, size_t cacheSizeMb = 16);      /*!      * @brief Добавление метрики.      * @param name имя метрики      * @return уникальный идентификатор метрики      *      * Добавляет метрику в базу UID'ов и возвращает UID метрики.      * Если метрика уже была добавлена, то возращает UID метрики      */     MetricUid addMetric(const std::string& name);      /*!      * Записать значение      * @param muid идентификатор метрики      * @param timestamp временная точка      * @param value значение      * @return true если нет ошибок      */     bool put(MetricUid muid, time_t timestamp, double value);      /*!      * Получить итератор для интервала значений метрики      * @param muid идентификатор метрики      * @param from начало интервала      * @param to конец интервала      * @return итератор      */     Iterator get(MetricUid muid, time_t from, time_t to);      Storage(const Storage&) = delete;     Storage& operator=(const Storage&) = delete;  private:      /*!      * Инициализация базы uid метрик      */     void initUID();      /*!      * Инициализация данных      */     void initData();  private:      /*!      * Текущий индекс для UID      */     MetricUid m_currentIndx;      /*!      * Базавый каталог      */     std::string m_dir;      /*!      * Размер блока кеша      */     size_t m_cacheSizeMb;      /*!      * Кеш для данных      */     std::shared_ptr<leveldb::Cache> m_dataCache;      /*!      * База UID'ов      */     std::shared_ptr<leveldb::DB> m_uid;      /*!      * База измерений      */     std::shared_ptr<leveldb::DB> m_data;      /*!      * Мэп метрика -> uid      */     std::unordered_map<std::string, MetricUid> m_metric2uid; };  /*!  * Итератор для обхода последовательности данных  */ class Storage::Iterator { public:     typedef std::tuple<time_t, double> Value;     typedef std::shared_ptr<leveldb::Iterator> IteratorPrivate;      Iterator();      Iterator(const IteratorPrivate& iter, const Key& limit);      /*!      * Проверка итератора на валидность      * @return true если итератор валиден      */     bool valid() const;      /*!      * Получить значение      * @return кортеж <время, значение>      */     Value value() const;      /*!      * Переход к следующему элементу      */     void next();  private:      IteratorPrivate m_iter; //!< итератор LevelDB     Key m_limit; //!< ключ для ограничения последовательности справа }; 

Конструктор Storage принимает путь к каталогу, где будут размещатся база с uid и база с данными, размер блока кеша.

Реализация

Начнем с компаратора, т.к. memcmp не подходит для сравнения чисел. Благодаря использования структуры в качестве ключа, код прост и читаем:

namespace {     class TimeMeasurementComporator: public leveldb::Comparator     {     public:         int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const         {             const char* dataA = a.data();             const char* dataB = b.data();             const Storage::Key* keyA =                     reinterpret_cast<const Storage::Key*>(dataA);             const Storage::Key* keyB =                     reinterpret_cast<const Storage::Key*>(dataB);             if (keyA->muid < keyB->muid)             {                 return -1;             }             else if (keyA->muid > keyB->muid)             {                 return 1;             }              if (keyA->timestamp < keyB->timestamp)             {                 return -1;             }             else if (keyA->timestamp > keyB->timestamp)             {                 return 1;             }              return 0;         }          // Ignore the following methods for now:         const char* Name() const         {             return "TimeMeasurementComporator";         }         void FindShortestSeparator(std::string*, const leveldb::Slice&) const         {         }         void FindShortSuccessor(std::string*) const         {         }     };     TimeMeasurementComporator GLOBAL_COMPORATOR; } 

Дальше инициализация/создание базы под данные:

void Storage::initData() {     DB* data;     Options options;     options.create_if_missing = true;     options.compression = kNoCompression;     options.comparator = &GLOBAL_COMPORATOR;      if (m_cacheSizeMb)     {         options.block_cache = leveldb::NewLRUCache(m_cacheSizeMb * 1048576);         m_dataCache.reset(options.block_cache);     }      Status status = DB::Open(options, m_dir + "/data", &data);     if (!status.ok())     {         LOG(ERROR)<<"Error opening database "<<status.ToString();         exit(1);     }     m_data.reset(data); } 

В опциях передается глобальный компаратор, и отключено сжатие т.к. LelelDB собирается без Snappy.

Инициализация базы с идентификаторами метрик:

void Storage::initUID() {     Options options;     options.create_if_missing = true;     options.compression = kNoCompression;      DB* cfg;     Status status = DB::Open(options, m_dir + "/conf", &cfg);     if (!status.ok())     {         LOG(ERROR)<<"Error opening database "<<status.ToString();         exit(1);     }     m_uid.reset(cfg);      std::unique_ptr<leveldb::Iterator> it(             m_uid->NewIterator(leveldb::ReadOptions()));      for (it->SeekToFirst(); it->Valid(); it->Next())     {         const size_t* index = reinterpret_cast<const size_t*>(it->key().data());         m_metric2uid[it->value().ToString()] = *index;         m_currentIndx = *index;     } } 

Тут происходит инициализация базы и заполнения отображения метрики в UID.

Добавление данных довольно простое:

Storage::MetricUid Storage::addMetric(const std::string& name) {     auto result = m_metric2uid.find(name);     if (result != m_metric2uid.end())     {         return result->second;     }     ++m_currentIndx;     m_metric2uid[name] = m_currentIndx;     const auto s = m_uid->Put(WriteOptions(),             Slice(reinterpret_cast<const char*>(&m_currentIndx), sizeof(m_currentIndx)),             name);      if (!s.ok())     {         LOG(ERROR)<<"Error put "<<s.ToString();     }      return m_currentIndx; }  bool Storage::put(MetricUid muid, time_t timestamp, double value) {     const Key key = {muid, timestamp};      const auto s = m_data->Put(WriteOptions(),             Slice(reinterpret_cast<const char*>(&key), sizeof(key)),             Slice(reinterpret_cast<char*>(&value), sizeof(value)));      if (!s.ok())     {         LOG(ERROR)<<"Error put "<<s.ToString();     }      return s.ok(); } 

Получение данных реализовано посредством создание обертки над итератором LevelDB:

Storage::Iterator Storage::get(MetricUid muid, time_t from, time_t to) {     const Key begin = {muid, from};     const Key end = { muid, to };      Storage::Iterator::IteratorPrivate iter(m_data->NewIterator(ReadOptions()));     iter->Seek(Slice(reinterpret_cast<const char*>(&begin),                      sizeof(begin)));     return Storage::Iterator(iter, end); }  Storage::Iterator::Iterator():         m_iter(nullptr) {     memset(&m_limit, 0, sizeof(m_limit)); }   Storage::Iterator::Iterator(const IteratorPrivate& iter, const Key& limit) :         m_iter(iter),         m_limit(limit) { }  bool Storage::Iterator::valid() const {     if(!m_iter)     {         return false;     }      const Slice right(reinterpret_cast<const char*>(&m_limit),                       sizeof(m_limit));      return m_iter->Valid() &&            (GLOBAL_COMPORATOR.Compare(m_iter->key(),right) < 0); }  Storage::Iterator::Value Storage::Iterator::value() const {     if(!m_iter)     {         return Value(0,0);     }      const Key* data =reinterpret_cast<const Key*>(m_iter->key().data());     double val = *reinterpret_cast<const double*>(m_iter->value().data());     return Value(data->timestamp, val); }  void Storage::Iterator::next() {     if(m_iter && m_iter->Valid())     {         m_iter->Next();     } } 

Исходные коды прототипа находится на GitHub.

Из интересного:
Обзор популярных современных алгоритмов хранения данных на диске: LevelDB, TokuDB, LMDB, Sophia
Глубокое погружение в дисковые структуры данных, B-деревья, LSM-деревья и фрактальные деревья

ссылка на оригинал статьи http://habrahabr.ru/post/256207/

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *