Лепим микросервис

от автора

Подкинули задачу сделать микросервис, который получает данные от RabbitMQ, обрабатывает, и отправляет данные дальше по этапу в RabbitMQ. После отправки задания, я посмотрел на то что поучилось. Оказалось, что этот набор компонентов можно использовать для быстрого прототипирования pipeline архитектуры

Используемые компоненты:

Для примера буду делать микросервис для выдачи рейтинга игроков. От ядра системы в микросервис приходят следующие сообщения:

  • player_registered(id,name);
  • player_renamed(id,name);
  • player_won(id, points).

Сервис раз в минуту должен отсылать сообщение с содержимым рейтинга.Рейтинг сортируется по набранным очкам за календарную неделю.

REACT-CPP

REACT-CPP — это обертка над libev на C++11. Эта библиотека нужна для организации цикла обработка событий(event loop).
Т.к. кроме работы с сокетом потребуются таймеры и обработчики unix сигналов.

class Application { public:      Application();     ~Application();      using IntervalWatcherPtr = std::shared_ptr<React::IntervalWatcher>;      void run();     void shutdown();     //...  private:      bool onMinute();     //...      private:      React::MainLoop m_loop;     IntervalWatcherPtr m_minuteTimer;     //... };

void Application::run() {     m_minuteTimer = m_loop.onInterval(5.0, 60.0, std::bind(&Application::onMinute, this));      m_loop.onSignal(SIGTERM, [this]() -> bool     {         shutdown();         return false;     });      m_loop.onSignal(SIGUSR1, [this]()->bool{         cleanRating();         return true;     });      //...     m_loop.run(); }  bool Application::onMinute() {     calculateRating();     sendRating();     return true; }

Тут создаю таймер который стартует через 5 секунд и который будет вызывать обработчик каждые 60 секунд.
Любой приличный демон/сервис должен иметь обработчик SIGTERM, что бы из вне попросить его корректно завершится.
Что касается обработчика SIGUSR1 тут можно самостоятельно вычислять начало/конец недели через Boost.Date_Time, но мне тупо лень, когда в GNU/Linux есть cron+pkill.

AMQP-CPP

С тех пор как опубликовал RabbitMQ tutorials на C++ AMQP-CPP обзавелась реализацией обработчика на libev и libuv.

Подключение и обработка сообщения:

void Application::createChannel(AMQP::TcpConnection &connection) {     m_channel = std::make_unique<AMQP::TcpChannel>(&connection);      m_channel->declareQueue(m_cfg.source().name, AMQP::durable)         .onSuccess([&](const std::string &name, uint32_t messagecount, uint32_t consumercount)                    {                        LOG(INFO) << "Declared queue "                                  << name                                  << ", message count: "                                  << messagecount;                         m_channel->consume(m_cfg.source().name)                            .onReceived([&](const AMQP::Message &message,                                            uint64_t deliveryTag,                                            bool redelivered)                                        {                                            onMessage(message, deliveryTag, redelivered);                                        })                            .onError([](const char *message)                                     {                                         LOG(ERROR) << "Error consume:" << message;                                         APP->shutdown();                                     });                    })         .onError([&](const char *message)                  {                      LOG(ERROR) << "Error declare queue:" << message;                      shutdown();                  }); }  void Application::onMessage(const AMQP::Message &message,                             uint64_t deliveryTag,                             bool redelivered) {     parseMessage(message);     m_channel->ack(deliveryTag); }

Публикация сообщения:

AMQP::Envelope env(s.GetString());  m_channel->publish("", m_cfg.destination().name, env);

LevelDB

Может потребоваться локальное хранилище данных. Взял LelevDB, я о нем писал в Использование LevelDB. Сделал лишь небольшую RAII обертку:

Код обертки

class DataBase { public:      DataBase();      bool open(const std::string &path2base, bool compression = true);      bool put(const std::string &key, const ByteArray &value, bool sync = false);     ByteArray get(const std::string &key);      Snapshot snapshot();      Iterator iterator();  private:      std::shared_ptr<leveldb::DB> m_backend; };  class Snapshot { public:      Snapshot();      ~Snapshot();      ByteArray get(const std::string &key);      Iterator iterator();  private:      Snapshot(const std::weak_ptr<leveldb::DB> &backend, const leveldb::Snapshot *snapshot);  private:      friend class DataBase;      std::weak_ptr<leveldb::DB> m_backend;     const leveldb::Snapshot *m_shapshot; };  class Iterator { public:      Iterator(std::unique_ptr<leveldb::Iterator> rawIterator);     Iterator(Iterator &&iter);      /*!      * Create empty iterator      */     Iterator() = default;      ~Iterator();      bool isValid() const noexcept;      void next();      void prev();      std::string key();     ByteArray value();      /*!      * Seek to first      */     void toFirst();      /*!      * Seek to last      */     void toLast();      Iterator(const Iterator &) = delete;     Iterator &operator=(const Iterator &) = delete;  private:      std::unique_ptr<leveldb::Iterator> m_iterator; };

LevelDB используется для сохранения/востановления состояния.

void Application::loadFromLocalStorage() {     auto snapshot = m_localStorage->snapshot();     auto iter = snapshot.iterator();     iter.toFirst();     while (iter.isValid()) {         auto player = new Player(iter.value());         m_id2player[player->id] = player;         m_players.push_back(player);         iter.next();     } }  void Application::updatePlayerInBD(const Player *player) {     if (!m_localStorage->put(std::to_string(player->id), player->serialize())) {         LOG(ERROR) << "[" << player->id << ", "                    << player->name                    << "] is not updated in the database";     } }

Логика сервиса

Данные приходят в формате JSON.

Разбирает json используя RapidJSON, ищу подходящий метод, вызываю нужный обработчик:

void Application::parseMessage(const AMQP::Message &message) {     /*      * Схемка имеет вид      * {      *   "method":"player_registered",      *   "params":{      *   ...      *   }      * }      */     rapidjson::Document doc;     doc.Parse(message.body(), message.bodySize());      const std::string method = doc["method"].GetString();     auto iter = m_handlers.find(method);     if (iter != m_handlers.end()) {         iter->second(*this, doc["params"]);     }     else {         LOG(WARNING) << "Unknown method:" << method;     } }

Сами методы простые:

void Application::onPlayerRegistered(const JValue &params) {     auto obj = params.GetObject();     const uint64_t playerId = obj["id"].GetUint64();     if (!isRegistred(playerId)) {         auto player = new Player;         player->id = playerId;         player->name = obj["name"].GetString();         m_players.push_back(player);         m_id2player[playerId] = player;         updatePlayerInBD(player);     } }  void Application::onPlayerRenamed(const JValue &params) {     auto obj = params.GetObject();     const uint64_t playerId = obj["id"].GetUint64();     if (isRegistred(playerId)) {         auto player = m_id2player[playerId];         player->name = obj["name"].GetString();         updatePlayerInBD(player);     }     else {         LOG(WARNING) << "Renaming an unknown user[" << playerId << "]";     } }  void Application::onPlayerWon(const JValue &params) {     auto obj = params.GetObject();     const uint64_t playerId = obj["id"].GetUint64();     if (isRegistred(playerId)) {         auto player = m_id2player[playerId];         player->points += obj["points"].GetInt64();         updatePlayerInBD(player);     }     else {         LOG(WARNING) << "Unknown player[" << playerId << "]";     } }

Раз в минуту сортируем игроков и отправляем рейтинг:

bool Application::onMinute() {     calculateRating();     sendRating();     return true; }  void Application::calculateRating() {     std::sort(m_players.begin(), m_players.end(), [](const Player *a, const Player *b)     {         return a->points > b->points;     }); }  void Application::sendRating() {     using namespace rapidjson;      StringBuffer s;     Writer<StringBuffer> writer(s);     writer.StartArray();      const size_t count = std::min(m_players.size(), size_t(10));     for (size_t i = 0;          i < count;          ++i) {         writer.StartObject();          writer.Key("id");         writer.Uint64(m_players[i]->id);          writer.Key("name");         writer.String(m_players[i]->name.c_str());          writer.Key("points");         writer.Int64(m_players[i]->points);          writer.EndObject();     }      writer.EndArray();     AMQP::Envelope env(s.GetString());      m_channel->publish("", m_cfg.destination().name, env); }

Весь код доступен на GitHub’e. Исходники библиотек поставляются вместе с сервисом и собираются автоматически на GNU/Linux с gcc.

Подведем итоги, что имеем:

  • event loop с таймерами, обработчиками сигналов и всеми остальными плюшками libev;
  • работа с RabbitMQ;
  • встроенное key-value хранилище;
  • поддержка json.

ссылка на оригинал статьи https://habrahabr.ru/post/315268/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *