Взаимодействие PHP и Erlang посредством RabbitMQ

от автора

Вступление
Чем больше программируешь на php, тем чаще попадаются задачи, для решения которых нужен демон на сервере. Да, конечно существует phpDaemon, cron или костыли, которые при каждом n-ом запуске скрипта вызывают какой-то определенный набор операций. Но когда мы говорим о проектах с нагрузкой больше, чем на обычном сайте, мы начинаем расстраиваться.

В одном из проектов для решения такой задачи мы решили использовать связку php+RabbitMQ+erlang. На php уже был написан необходимый функционал, нам надо было лишь разнести вызовы по времени и на разные машинки. Конкретно задача звучала так: написать парсер пользователей с внешнего хранилища данных и, самое главное, поддерживать актуальность данных, а в случае их изменения, посылать уведомления.

Исходные данные
— Rest-функция на php, добавляющая пользователя в избранное. Из этого списка в дальнейшем и будут формироваться пользователи, актуальность информации которых мы и будем поддерживать, а в случае изменения — посылать уведомления на клиента.
— Приложение на erlange, который должен координировать, каких пользователей мы сейчас обрабатывем и т.д. Обработка информации с внешнего источника осуществляется двумя путями — это парсинг html страниц или где это возможно — запросы api. Для обработки ответов используется rest-функция, написанная на php.
— Мы должны поддерживать возможность простого масштабирования на большое количество серверов. Список пользователей, которые надо парсить, находятся в очередях, формируемых RabbitMQ.

Задача номер 1 — настроить расширение php для работы с RabbitMQ
В первую очередь устанавливаем дополнительный программные пакеты:

apt-get install gcc apt-get install php5-dev
Далее сама установка, найденная на просторах интернета:

#download and install the rabbitmq c amqp lib wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.5.1/rabbitmq-c-0.5.1.tar.gz tar -zxvf rabbitmq-c-0.5.1.tar.gz cd rabbitmq-c-0.5.1/ ./configure make sudo make install cd .. #download and compile the amqp wget http://pecl.php.net/get/amqp-1.4.0.tgz tar -zxvf amqp-1.4.0.tgz cd amqp-1.4.0/ phpize && ./configure —with-amqp && make && sudo make install #Add amqp extension to php mods-availabile directory echo "extension=amqp.so" > /etc/php5/mods-available/amqp.ini #Enabled it in cli cd /etc/php5/cli/conf.d/ ln -s ../../mods-available/amqp.ini 20-amqp.ini php -m | grep amqp #Enabled it in cli cd /etc/php5/apache2/conf.d/ ln -s ../../mods-available/amqp.ini 20-amqp.ini #restart Apache and than check phpinfo on web service apache2 restart
Если вам повезло, то все установилось верно.

Задача номер 2 — установить RabbitMQ и web-панель управления им
sudo apt-get install rabbitmq-server rabbitmq-plugins enable rabbitmq_management rabbitmqctl stop rabbitmq-server -detached
Далее по данному адресу вы получаете доступ к управления очередями, по умолчанию логин(guest) и пароль(guest)
ip.addres:15672/

Задача номер 3 — через php влиять на очереди RabbitMQ
//создание точки обмена $rabbit = new AMQPConnection(array(‘host’ => ‘127.0.0.1’, ‘port’ => ‘5672’, ‘login’ => ‘guest’, ‘password’ => ‘guest’)); $rabbit->connect(); $testChannel = new AMQPChannel($rabbit); $exchange = new AMQPExchange($testChannel); $exchange->setName(‘logoooooo’); $exchange->setType(AMQP_EX_TYPE_DIRECT); $exchange->declare(); //создания очереди $testChannel = new AMQPChannel($rabbit); $queue = new AMQPQueue($testChannel); $queue->setName("yyyyyyy2"); $queue->declare(); //привязываем очередь к точки обмена $testChannel = new AMQPChannel($rabbit); $queue = new AMQPQueue($testChannel); $queue->setName("yyyyyyy2"); $queue->bind(‘logoooooo’); $queue->declare(); //отправить сообщение на точку $testChannel = new AMQPChannel($rabbit); $exchange = new AMQPExchange($testChannel); $exchange->setName(‘logoooooo’); $exchange->publish(‘pooooooooooooooooooooooooooooooo’);

Задача номер 4 — Создать приложение на erlange используя rebar
Устанавливаем rebar
apt-get install rebar mkdir 1 rebar create template=simpleapp srvid=my_server46 rebar create template=simplesrv srvid=my_server46
Задача номер 5 — Запускаем приложение на erlange
Сначала устанавливаем CMake:

apt-get install make
В Makefile прописываем следующее:

all: rebar compile run: ERL_LIBS=deps erl +K true -name myapp_app@127.0.0.1 -boot start_sasl -pa ebin -s myapp_app -sasl errlog_type error
Строчка -pa ebin -s myapp_app означает, что мы запускаем ebin/myapp_app.erl и в нем функцию myapp_app:start().
ERL_LIBS=deps означает, что мы подгружаем все библиотеки, которые расположены в папке deps.

Задача номер 6 — подключить необходимые библиотеки для связи RabbitMQ и Erlang
В rebar.config помещаем следующее:

{deps, [ {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq-3.0.2"}}} ]}. {erl_opts, [ debug_info, compressed, report, warn_export_all, warn_export_vars, warn_shadow_vars, warn_unused_function, warn_deprecated_function, warn_obsolete_guard, warn_unused_import % warnings_as_errors ]}.
Выполняем rebar get-deps, подтягивающий зависимости. Далее возникли сложности с оставшимися библиотеками, поэтому пришлось использовать то, что написано на официальном сайте RabbitMQ. Но перед эти доустанавливаем необходимые пакеты:

apt-get install xsltproc apt-get install zip
После заходим в папочку deps, которую создал rebar и, используя git, все скачиваем, а после устанавливаем:

cd deps git clone https://github.com/rabbitmq/rabbitmq-erlang-client.git git clone https://github.com/rabbitmq/rabbitmq-server.git git clone https://github.com/rabbitmq/rabbitmq-codegen.git cd rabbitmq-erlang-client make
Задача номер 7 — из Erlangа получать сообщения из очередей RabbitMQ
Файл myapp_app.erl оставляем чуть чуть редактируем, чтобы можно было запускать из makefile, что мы написали:

-module(myapp_app). -behaviour(application). -export([start/0,start/2, stop/1]). start() -> myapp_sup:start_link(). start(_StartType, _StartArgs) -> myapp_sup:start_link(). stop(_State) -> ok.
Файл myapp_sup.erl, отвечающий за наблюдение процессами, дописываем вызов нашего модуля из init:

-module(myapp_sup). -behaviour(supervisor). -export([start_link/0]). -export([init/1]). -define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> {ok, { {one_for_one, 5, 10}, [{my_server46_0, {my_server46, start_link, []},permanent, brutal_kill, worker, [my_server46]}]} }.
Модуль, отвечающий за связь с RabbitMQ:

-module(my_server46). -behaviour(gen_server). -include("deps/rabbitmq-erlang-client/include/amqp_client.hrl"). -define(SERVER, ?MODULE). -export([start_link/0,main/0,loop/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). init(Args) -> main(), {ok, Args}. main() -> {ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}), {ok, Channel} = amqp_connection:open_channel(Connection), io:format(" [*] Waiting for messages. To exit press CTRL+C~n"), amqp_channel:call(Channel, #’basic.qos'{prefetch_count = 1}), amqp_channel:subscribe(Channel, #’basic.consume'{queue = <<"yyyyyyy2">>},self()), receive #’basic.consume_ok'{} -> io:fwrite(" _rec_main_ok_ "),ok end, loop(Channel), io:fwrite("begin~n", []). loop(Channel)-> receive {#’basic.deliver'{delivery_tag = Tag}, #amqp_msg{payload = Body}} -> Dots = length([C || C <- binary_to_list(Body), C == $.]), io:format(" [x] Received Body ~p~n", [Body]), receive after Dots*1000 -> io:format(" _loop_rec_after_ ~p",[0]), ok end, timer:sleep(3500), amqp_channel:cast(Channel, #’basic.ack'{delivery_tag = Tag}), loop(Channel), io:format(" [x] Done 3~n") end. handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}.
Тут все достаточно просто, мы подписываемся на очередь «yyyyyyy2»:

amqp_channel:subscribe(Channel, #’basic.consume'{queue = <<"yyyyyyy2">>},self())
Затем сообщаем RabbitMQ, что сообщение успешно обработано:

amqp_channel:cast(Channel, #’basic.ack'{delivery_tag = Tag}) http://habrahabr.ru/post/251927/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *