Столкнулся с ситуацией, когда мои коллеги для организации локального персистентного key-value хранилища используют SQLite, MemcacheDB, Redis игнорируя встраиваемые хранилища такие как LevelDB, Sophia, HamsterDB и т.д.
Я разбил статью на две части:
LevelDB и его API
Некоторые свойства LevelDB:
- хранилище типа ключ-значение;
- ключ и значение это произвольный массив байт;
- данные хранятся упорядоченно, порядок можно задавать;
- прямой и обратный итератор для обхода данных;
- множественное атомарное обновление;
- поддержка снимков;
- сжатие данных через Snappy.
Открытие и закрытие
Открытие:
#include <assert> #include "leveldb/db.h" leveldb::DB* db; leveldb::Options options; options.create_if_missing = true; leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db); assert(status.ok());
Закрытие:
delete db;
Опции:
Имя | Описание | Значение по умолчанию |
---|---|---|
comparator | компаратор задающий порядок ключей | BytewiseComparator, использует внутри себя memcmp |
create_if_missing | создать базу, если отсутствует | false |
error_if_exists | выкинуть ошибку, если база существует | false |
paranoid_checks | агрессивная проверка целостности базы | false |
env | окружение через которое будет производится операции ввода/вывода | Env::Default() |
write_buffer_size | размер буфера на запись | 4MB |
max_open_files | количество открытых файлов | 1000 |
block_cache | использовать специальный кеш для блоков | NULL, создает и использует внутренний кеш объемом 8MB |
block_size | приблизительный объем пользовательских данных в блоке | 4K |
compression | сжатие блока | kSnappyCompression |
filter_policy | фильтр(Блума) для уменьшения операций чтения с диска. | NULL |
Slice
Slice это структура представляющая ключ и значение в ряде методов. Slice содержит в себе указатель на данные и размер данных, при этом Slice не содержит в себе буфера для данных, поэтому нельзя допускать таких ситуаций:
leveldb::Slice slice; if (...) { std::string str = ...; slice = str; } Use(slice);
leveldb::Slice имеет ряд конструкторов для удобства использования:
Slice(const char* d, size_t n) : data_(d), size_(n) { } Slice(const std::string& s) : data_(s.data()), size_(s.size()) { } Slice(const char* s) : data_(s), size_(strlen(s)) { }
И методы для получения данных
const char* leveldb::Slice::data() const; char leveldb::Slice::operator[](size_t n) const; std::string leveldb::Slice::ToString() const;
Статус
Для информирования о возможных ошибках большинство функций в LevelDB возвращают статус.
По статусу можно проверить выполнилась функция успешно и получить текстовое описание ошибки.
leveldb::Status s = ...; if (!s.ok()) cerr << s.ToString() << endl;
Чтение, запись, удаление
Сигнатуры Put, Get, Delete:
Status leveldb::DB::Put(const WriteOptions& options, const Slice& key, const Slice& value); Status leveldb::DB::Get(const ReadOptions& options, const Slice& key, std::string* value); Status leveldb::DB::Delete(const WriteOptions& options, const Slice& key);
Пример использования:
std::string value; leveldb::Status s = db->Get(leveldb::ReadOptions(), key1, &value); if (s.ok()) s = db->Put(leveldb::WriteOptions(), key2, value); if (s.ok()) s = db->Delete(leveldb::WriteOptions(), key1);
Итератор
Итератор представлен классом leveldb::Iterator и имеет следующий интерфейс:
bool leveldb::Iterator::Valid() const; void leveldb::Iterator::SeekToFirst(); void leveldb::Iterator::SeekToLast(); void leveldb::Iterator::Seek(const Slice& target); void leveldb::Iterator::Next(); void leveldb::Iterator::Prev(); Slice leveldb::Iterator::key() const; Slice leveldb::Iterator::value() const; Status leveldb::Iterator::status() const;
Интерфейс итератора предоставляет методы для последовательного и произвольного
доступа. Последовательный доступ может осуществлятся как в прямом, так и в обратном
направлении.
leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { cout << it->key().ToString() << ": " << it->value().ToString() << endl; } assert(it->status().ok()); // Check for any errors found during the scan delete it;
for (it->Seek(start); it->Valid() && it->key().ToString() < limit; it->Next()) { ... }
for (it->SeekToLast(); it->Valid(); it->Prev()) { ... }
Использование LevelDB, для хранения временных рядов.
Моя работа связана с системами мониторинга, и поэтому появились временные ряды и time series database.
Ограничимся двумя операциями:
- добавление данных;
- последовательное чтение интервала.
Схема данных
Для хранение метрик в хранилище типа ключ-значение используют следующую схему: ключ=метрика+временная метка+теги(теги опцианальны).
Подобным образом устроена OpenTSDB, работающая поверх HBase.
Внутри OpenTSDB есть схема uid’ов метрик и схема данных. Такой же принцып будет задействован и здесь.
Одна база будет использоваться для хранения идентификаторов метрик. Ключем тут будет число в size_t, значением строка в стиле Си.
Втроая база будет исопльзоваться под данные, ключем тут будет структура вида:
struct Key { size_t muid; time_t timestamp; };
значение будет хранится в виде double. Тут на полную катушку используется тот факт, что ключ и значение в LevelDB массив байт,
а значит можем использвать простые структуры данных без всякой сериализации.
Интерфейс хранилища
#pragma once #include <ctime> #include <cstdint> #include <memory> #include <unordered_map> namespace leveldb { class DB; class Iterator; class Slice; class Comparator; class Cache; } /*! * Хранилище метрик */ class Storage { public: class Iterator; typedef size_t MetricUid; /*! * Представление ключа */ struct Key { MetricUid muid; //!< uid метрики time_t timestamp; //!< время }; /*! * Конструктор * @param dir каталог для размещения базы данных и базы метрик * @param cacheSizeMb размер блока кеша */ Storage(const std::string& dir, size_t cacheSizeMb = 16); /*! * @brief Добавление метрики. * @param name имя метрики * @return уникальный идентификатор метрики * * Добавляет метрику в базу UID'ов и возвращает UID метрики. * Если метрика уже была добавлена, то возращает UID метрики */ MetricUid addMetric(const std::string& name); /*! * Записать значение * @param muid идентификатор метрики * @param timestamp временная точка * @param value значение * @return true если нет ошибок */ bool put(MetricUid muid, time_t timestamp, double value); /*! * Получить итератор для интервала значений метрики * @param muid идентификатор метрики * @param from начало интервала * @param to конец интервала * @return итератор */ Iterator get(MetricUid muid, time_t from, time_t to); Storage(const Storage&) = delete; Storage& operator=(const Storage&) = delete; private: /*! * Инициализация базы uid метрик */ void initUID(); /*! * Инициализация данных */ void initData(); private: /*! * Текущий индекс для UID */ MetricUid m_currentIndx; /*! * Базавый каталог */ std::string m_dir; /*! * Размер блока кеша */ size_t m_cacheSizeMb; /*! * Кеш для данных */ std::shared_ptr<leveldb::Cache> m_dataCache; /*! * База UID'ов */ std::shared_ptr<leveldb::DB> m_uid; /*! * База измерений */ std::shared_ptr<leveldb::DB> m_data; /*! * Мэп метрика -> uid */ std::unordered_map<std::string, MetricUid> m_metric2uid; }; /*! * Итератор для обхода последовательности данных */ class Storage::Iterator { public: typedef std::tuple<time_t, double> Value; typedef std::shared_ptr<leveldb::Iterator> IteratorPrivate; Iterator(); Iterator(const IteratorPrivate& iter, const Key& limit); /*! * Проверка итератора на валидность * @return true если итератор валиден */ bool valid() const; /*! * Получить значение * @return кортеж <время, значение> */ Value value() const; /*! * Переход к следующему элементу */ void next(); private: IteratorPrivate m_iter; //!< итератор LevelDB Key m_limit; //!< ключ для ограничения последовательности справа };
Конструктор Storage принимает путь к каталогу, где будут размещатся база с uid и база с данными, размер блока кеша.
Реализация
Начнем с компаратора, т.к. memcmp не подходит для сравнения чисел. Благодаря использования структуры в качестве ключа, код прост и читаем:
namespace { class TimeMeasurementComporator: public leveldb::Comparator { public: int Compare(const leveldb::Slice& a, const leveldb::Slice& b) const { const char* dataA = a.data(); const char* dataB = b.data(); const Storage::Key* keyA = reinterpret_cast<const Storage::Key*>(dataA); const Storage::Key* keyB = reinterpret_cast<const Storage::Key*>(dataB); if (keyA->muid < keyB->muid) { return -1; } else if (keyA->muid > keyB->muid) { return 1; } if (keyA->timestamp < keyB->timestamp) { return -1; } else if (keyA->timestamp > keyB->timestamp) { return 1; } return 0; } // Ignore the following methods for now: const char* Name() const { return "TimeMeasurementComporator"; } void FindShortestSeparator(std::string*, const leveldb::Slice&) const { } void FindShortSuccessor(std::string*) const { } }; TimeMeasurementComporator GLOBAL_COMPORATOR; }
Дальше инициализация/создание базы под данные:
void Storage::initData() { DB* data; Options options; options.create_if_missing = true; options.compression = kNoCompression; options.comparator = &GLOBAL_COMPORATOR; if (m_cacheSizeMb) { options.block_cache = leveldb::NewLRUCache(m_cacheSizeMb * 1048576); m_dataCache.reset(options.block_cache); } Status status = DB::Open(options, m_dir + "/data", &data); if (!status.ok()) { LOG(ERROR)<<"Error opening database "<<status.ToString(); exit(1); } m_data.reset(data); }
В опциях передается глобальный компаратор, и отключено сжатие т.к. LelelDB собирается без Snappy.
Инициализация базы с идентификаторами метрик:
void Storage::initUID() { Options options; options.create_if_missing = true; options.compression = kNoCompression; DB* cfg; Status status = DB::Open(options, m_dir + "/conf", &cfg); if (!status.ok()) { LOG(ERROR)<<"Error opening database "<<status.ToString(); exit(1); } m_uid.reset(cfg); std::unique_ptr<leveldb::Iterator> it( m_uid->NewIterator(leveldb::ReadOptions())); for (it->SeekToFirst(); it->Valid(); it->Next()) { const size_t* index = reinterpret_cast<const size_t*>(it->key().data()); m_metric2uid[it->value().ToString()] = *index; m_currentIndx = *index; } }
Тут происходит инициализация базы и заполнения отображения метрики в UID.
Добавление данных довольно простое:
Storage::MetricUid Storage::addMetric(const std::string& name) { auto result = m_metric2uid.find(name); if (result != m_metric2uid.end()) { return result->second; } ++m_currentIndx; m_metric2uid[name] = m_currentIndx; const auto s = m_uid->Put(WriteOptions(), Slice(reinterpret_cast<const char*>(&m_currentIndx), sizeof(m_currentIndx)), name); if (!s.ok()) { LOG(ERROR)<<"Error put "<<s.ToString(); } return m_currentIndx; } bool Storage::put(MetricUid muid, time_t timestamp, double value) { const Key key = {muid, timestamp}; const auto s = m_data->Put(WriteOptions(), Slice(reinterpret_cast<const char*>(&key), sizeof(key)), Slice(reinterpret_cast<char*>(&value), sizeof(value))); if (!s.ok()) { LOG(ERROR)<<"Error put "<<s.ToString(); } return s.ok(); }
Получение данных реализовано посредством создание обертки над итератором LevelDB:
Storage::Iterator Storage::get(MetricUid muid, time_t from, time_t to) { const Key begin = {muid, from}; const Key end = { muid, to }; Storage::Iterator::IteratorPrivate iter(m_data->NewIterator(ReadOptions())); iter->Seek(Slice(reinterpret_cast<const char*>(&begin), sizeof(begin))); return Storage::Iterator(iter, end); } Storage::Iterator::Iterator(): m_iter(nullptr) { memset(&m_limit, 0, sizeof(m_limit)); } Storage::Iterator::Iterator(const IteratorPrivate& iter, const Key& limit) : m_iter(iter), m_limit(limit) { } bool Storage::Iterator::valid() const { if(!m_iter) { return false; } const Slice right(reinterpret_cast<const char*>(&m_limit), sizeof(m_limit)); return m_iter->Valid() && (GLOBAL_COMPORATOR.Compare(m_iter->key(),right) < 0); } Storage::Iterator::Value Storage::Iterator::value() const { if(!m_iter) { return Value(0,0); } const Key* data =reinterpret_cast<const Key*>(m_iter->key().data()); double val = *reinterpret_cast<const double*>(m_iter->value().data()); return Value(data->timestamp, val); } void Storage::Iterator::next() { if(m_iter && m_iter->Valid()) { m_iter->Next(); } }
Исходные коды прототипа находится на GitHub.
Из интересного:
Обзор популярных современных алгоритмов хранения данных на диске: LevelDB, TokuDB, LMDB, Sophia
Глубокое погружение в дисковые структуры данных, B-деревья, LSM-деревья и фрактальные деревья
ссылка на оригинал статьи http://habrahabr.ru/post/256207/