RabbitMQ tutorials на C++

от автора

На сайте rabbitmq.com в разделе tutorials приведены примеры реализации на различных языках, но среди них нет C++. Под катом собраны ссылки на переведенные руководства, материалы и код под спойлером.

Кому удобнее просматривать код из под интерфейса GitHub, можно сразу перейти в репозиторий.

Данный материал использует реализацию клиента AMQP-CPP и POCO C++ для работы с сокетом.

«RabbitMQ tutorial 1 — Hello World»

receive.cpp

#include <iostream> #include "SimplePocoHandler.h"  int main(void) {     SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");      AMQP::Channel channel(&connection);     channel.declareQueue("hello");     channel.consume("hello", AMQP::noack).onReceived(             [](const AMQP::Message &message,                        uint64_t deliveryTag,                        bool redelivered)             {                  std::cout <<" [x] Received "<<message.message() << std::endl;             });      std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";     handler.loop();     return 0; } 

send.cpp

#include <iostream>  #include "SimplePocoHandler.h"  int main(void) {     SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");     AMQP::Channel channel(&connection);      channel.onReady([&]()     {         if(handler.connected())         {             channel.publish("", "hello", "Hello World!");             std::cout << " [x] Sent 'Hello World!'" << std::endl;             handler.quit();         }     });      handler.loop();     return 0; } 

«RabbitMQ tutorial 2 — Очередь задач»

worker.cpp

#include <iostream> #include <algorithm> #include <thread> #include <chrono>  #include "SimplePocoHandler.h"  int main(void) {     SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");      AMQP::Channel channel(&connection);     channel.declareQueue("task_queue", AMQP::durable);     channel.consume("task_queue", AMQP::noack).onReceived(             [&channel](const AMQP::Message &message,                        uint64_t deliveryTag,                        bool redelivered)             {                 const auto body = message.message();                 std::cout<<" [x] Received "<<body<<std::endl;                  size_t count = 0;                 std::for_each(body.cbegin(), body.cend(), [&](const char& ch)                         {                             if(ch =='.')                             {                                 ++count;                             }                         });                 std::this_thread::sleep_for (std::chrono::seconds(count));                  std::cout<<" [x] Done"<<std::endl;                 channel.ack(deliveryTag);             });      channel.setQos(1);     std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";     handler.loop();     return 0; } 

new_task.cpp

#include <iostream>  #include "SimplePocoHandler.h" #include "tools.h"  int main(int argc, const char* argv[]) {     const std::string msg =             argc > 1 ? join(&argv[1], &argv[argc], " ") : "Hello World!";      SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");     AMQP::Channel channel(&connection);      auto callback =             [&](const std::string &name, int msgcount, int consumercount)             {                 channel.publish("", "task_queue", msg);                 std::cout<<" [x] Sent '"<<msg<<"'\n";                 handler.quit();             };      channel.declareQueue("task_queue", AMQP::durable).onSuccess(callback);     handler.loop();     return 0; } 

«RabbitMQ tutorial 3 — Публикация/Подписка»

receive_logs.cpp

#include <iostream>  #include "SimplePocoHandler.h"  int main(void) {     SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");      AMQP::Channel channel(&connection);     auto receiveMessageCallback = [](const AMQP::Message &message,             uint64_t deliveryTag,             bool redelivered)     {          std::cout <<" [x] "<<message.message() << std::endl;     };      auto callback =             [&](const std::string &name, int msgcount, int consumercount)             {                 channel.bindQueue("logs", name,"");                 channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);             };      channel.declareExchange("logs", AMQP::fanout).onSuccess([&]()     {         channel.declareQueue(AMQP::exclusive).onSuccess(callback);      });      std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";     handler.loop();     return 0; } 

emit_log.cpp

#include <iostream>  #include "SimplePocoHandler.h" #include "tools.h"  int main(int argc, const char* argv[]) {     const std::string msg =             argc > 1 ? join(&argv[1], &argv[argc], " ") : "info: Hello World!";      SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");      AMQP::Channel channel(&connection);     channel.declareExchange("logs", AMQP::fanout).onSuccess([&]()     {         channel.publish("logs", "", msg);         std::cout << " [x] Sent "<<msg<< std::endl;         handler.quit();     });      handler.loop();     return 0; } 

«RabbitMQ tutorial 4 — Роутинг»

receive_logs_direct.cpp

#include <iostream> #include <algorithm>  #include "SimplePocoHandler.h"  int main(int argc, const char* argv[]) {     if(argc==1)     {         std::cout<<"Usage: "<<argv[0]<<" [info] [warning] [error]"<<std::endl;         return 1;     }     SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");      AMQP::Channel channel(&connection);      channel.declareExchange("direct_logs", AMQP::direct);      auto receiveMessageCallback =             [](const AMQP::Message &message,                uint64_t deliveryTag,                bool redelivered)             {                 std::cout <<" [x] "                           <<message.routingKey()                           <<":"                           <<message.message()                           << std::endl;             };      auto callback = [&](const std::string &name,             int msgcount,             int consumercount)     {         std::for_each(&argv[1],                 &argv[argc],                 [&](const char* severity)                 {                     channel.bindQueue("direct_logs","", severity);                     channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);                 });      };     channel.declareQueue(AMQP::exclusive).onSuccess(callback);      std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";     handler.loop();     return 0; } 

emit_log_direct.cpp

#include <iostream>  #include "SimplePocoHandler.h" #include "tools.h"  int main(int argc, const char* argv[]) {     const std::string severity = argc > 2 ? argv[1] : "info";     const std::string msg =             argc > 2 ? join(&argv[2], &argv[argc], " ") : "Hello World!";      SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");      AMQP::Channel channel(&connection);     channel.declareExchange("direct_logs", AMQP::direct).onSuccess([&]()     {         channel.publish("direct_logs", severity, msg);         std::cout << " [x] Sent "<<severity<<":"<<msg<< std::endl;         handler.quit();     });      handler.loop();     return 0; } 

«RabbitMQ tutorial 5 — Тематики»

receive_logs_topic.cpp

#include <iostream> #include <algorithm>  #include "SimplePocoHandler.h"  int main(int argc, const char* argv[]) {     if(argc==1)     {         std::cout<<"Usage: "<<argv[0]<<" [binding_key]..."<<std::endl;         return 1;     }     SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");      AMQP::Channel channel(&connection);      channel.declareExchange("topic_logs", AMQP::topic);      auto receiveMessageCallback =             [](const AMQP::Message &message,                uint64_t deliveryTag,                bool redelivered)             {                 std::cout <<" [x] "                           <<message.routingKey()                           <<":"                           <<message.message()                           << std::endl;             };      auto callback = [&](const std::string &name,             int msgcount,             int consumercount)     {         std::for_each(&argv[1],                 &argv[argc],                 [&](const char* bindingKeys)                 {                     std::cout<<bindingKeys<<std::endl;                     channel.bindQueue("topic_logs",name, bindingKeys);                     channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback);                 });      };     channel.declareQueue(AMQP::exclusive).onSuccess(callback);      std::cout << " [*] Waiting for messages. To exit press CTRL-C\n";     handler.loop();     return 0; } 

emit_log_topic.cpp

#include <iostream>  #include "SimplePocoHandler.h" #include "tools.h"  int main(int argc, const char* argv[]) {     const std::string msg =             argc > 1 ? join(&argv[2], &argv[argc], " ") : "Hello World!";     const std::string routing_key = argc > 1 ? argv[1] : "anonymous.info";      SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");      AMQP::Channel channel(&connection);     channel.declareExchange("topic_logs", AMQP::topic).onSuccess([&]()     {         channel.publish("topic_logs", routing_key, msg);         std::cout << " [x] Sent "<<routing_key<<":"<<msg<< std::endl;         handler.quit();     });      handler.loop();     return 0; } 

«RabbitMQ tutorial 6 — Удаленный вызов процедур»

rpc_server.cpp

#include <iostream> #include <algorithm> #include <thread> #include <chrono>  #include "SimplePocoHandler.h"  int fib(int n) {     switch (n)     {     case 0:         return 0;     case 1:         return 1;     default:         return fib(n - 1) + fib(n - 2);     } }  int main(void) {     SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");      AMQP::Channel channel(&connection);     channel.declareQueue("rpc_queue");     channel.consume("").onReceived([&channel](const AMQP::Message &message,             uint64_t deliveryTag,             bool redelivered)     {         const auto body = message.message();         std::cout<<" [.] fib("<<body<<")"<<std::endl;          AMQP::Envelope env(std::to_string(fib(std::stoi(body))));         env.setCorrelationID(message.correlationID());          channel.publish("", message.replyTo(), env);         channel.ack(deliveryTag);     });      channel.setQos(1);     std::cout << " [x] Awaiting RPC requests" << std::endl;     handler.loop();     return 0; } 

rpc_client.cpp

#include <iostream>  #include "tools.h" #include "SimplePocoHandler.h"  int main(int argc, const char* argv[]) {     const std::string correlation(uuid());      SimplePocoHandler handler("localhost", 5672);      AMQP::Connection connection(&handler, AMQP::Login("guest", "guest"), "/");      AMQP::Channel channel(&connection);     auto callback = [&](const std::string &name,             int msgcount,             int consumercount)     {         AMQP::Envelope env("30");         env.setCorrelationID(correlation);         env.setReplyTo(name);         channel.publish("","rpc_queue",env);         std::cout<<" [x] Requesting fib(30)"<<std::endl;      };     channel.declareQueue(AMQP::exclusive).onSuccess(callback);      auto receiveCallback = [&](const AMQP::Message &message,             uint64_t deliveryTag,             bool redelivered)     {         if(message.correlationID() != correlation)             return;          std::cout<<" [.] Got "<<message.message()<<std::endl;         handler.quit();     };      channel.consume("", AMQP::noack).onReceived(receiveCallback);      handler.loop();     return 0; } 

ссылка на оригинал статьи http://habrahabr.ru/post/253317/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *