«ZeroMQ».Глава 2: Знакомство с сокетами

от автора

Всем привет!
Продолжаю вольный перевод книги «ZeroMQ.Use ZeroMQ and learn how to apply different message patterns». Заранее прошу прощения, что так долго не публиковал продолжение, но как говорится: «Лень вперед нас родилась…». Ну что же, лирику в сторону, продолжим.

Содержание

После того, как мы рассмотрели основные структуры ZeroMQ в предыдущей главе, в этой мы рассмотрим сокеты, а именно:

  • паттерн клиент-сервер(publish-subscribe)
  • паттерн pipeline

Паттерн publish-subscribe

Во-первых, давайте введем классический паттерн, паттерн клиент-сервер (publish-subscribe), который является односторонним по характеру распределения, когда сервер отправляет сообщения определенному списку клиентов. Это модель один ко многим. Основная идея этого паттерна заключается в том, что сервер посылает сообщение и подключенные клиенты получают это сообщение, в то время как отключенные просто пропустят его. Сервер слабо связан с клиентами, его вообще не волнует, существуют ли какие-либо клиенты вообще. Это подобно тому, как работают телевизионные каналы или радиостанции. Телеканалы всегда транслируют телепередачи и только зрители принимают решение принимать данное вещание или нет. Если вы пропустите нужное время, то не сможете посмотреть любимую телепередачу (если у вас нет TiVo или нечто подобного, но давайте предположим, что наш сценарий происходит в мире, где записи не были изобретены). Преимущество шаблона publish-subscribe в том, что он обеспечивает динамичную топологию сети.
Модель клиент-сервер можно рассматривать со следующих основных сторон:

  • опубликовать: сообщение публикуется создателем
  • оповестить: клиент уведомляется о опубликовании сообщения
  • подписаться: клиенту выдается новая подпись
  • отписаться: клиент удаляет существующую подпись

Давайте рассмотрим пример, чтобы прояснить ситуацию. Рассмотрим сценарий, где мы хотели бы создать биржевую программу. Есть брокеры, и они хотят знать, какие действия происходят на рынке. Нашим сервером будет фондовый рынок, а брокеры будут клиентами.
Вместо реальных цен с фондовых рынков мы просто сгенерируем несколько чисел.
Прежде чем перейти к какому-либо коду, сначала давайте посмотрим, как выглядит модель клиент-сервер.

Ниже приведен код создателя(сервера):

/* * Stock Market Server * Binds PUB socket to tcp://*:4040 * Publishes random stock values of random companies */ #include <string.h> #include "zmq.h" int main (int argc, char const *argv[])  { 	void* context = zmq_ctx_new(); 	void* publisher = zmq_socket(context, ZMQ_PUB); 	printf("Starting server...\n"); 	int conn = zmq_bind(publisher, "tcp://*:4040"); 	const char* companies[2] = {"Company1", "Company2"}; 	int count = 0; 	for(;;)  	{ 		int price = count % 2; 		int which_company = count % 2; 		int index = strlen(companies[0]); 		char update[12]; 		snprintf(update, sizeof update, "%s",  			companies[which_company]); 		zmq_msg_t message; 		zmq_msg_init_size(&message, index); 		memcpy(zmq_msg_data(&message), update, index); 		zmq_msg_send(&message, publisher, 0); 		zmq_msg_close(&message); 		count++; 	} 	zmq_close(publisher); 	zmq_ctx_destroy(context); 	return 0; } 

Так же ниже приведем код клиента:

/* * Stock Market Client * Connects SUB socket to tcp://localhost:4040 * Collects stock exchange values */ #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[])  { 	void* context = zmq_ctx_new(); 	void* subscriber = zmq_socket(context, ZMQ_SUB); 	printf("Collecting stock information from the server.\n"); 	int conn = zmq_connect(subscriber, "tcp://localhost:4040"); 	conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0); 	int i; 	for(i = 0; i < 10; i++)  	{ 		zmq_msg_t reply; 		zmq_msg_init(&reply); 		zmq_msg_recv(&reply, subscriber, 0); 		int length = zmq_msg_size(&reply); 		char* value = malloc(length); 		memcpy(value, zmq_msg_data(&reply), length); 		zmq_msg_close(&reply); 		printf("%s\n", value);  		free(value); 	} 	zmq_close(subscriber); 	zmq_ctx_destroy(context); 	return 0; } 

Установка подписи с помощью zmq_setsockopt () и subscribeis является обязательным всякий раз, когда вы используете SUB сокет, в противном случае вы не будете получать никаких сообщений. Это очень распространенная ошибка.
Клиент может установить множество подписей, к любому из сообщений, которые он получает, если обновление соответствует любой из подписок. Так же он может отказаться от определенных подписок. Подписки имеют фиксированную длину.
Клиент получает сообщение с помощью zmq_msg_recv (). zmq_msg_recv () получает и сохраняет сообщение. Предыдущее сообщение, если таковое имеется, выгружается.

int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags); 

Флаг опции может принимать только одно значение — ZMQ_DONTWAIT. Если флаг ZMQ_DONTWAIT, то операция выполняется в режиме без блокирования. Если сообщение получено, то zmq_msg_recv() возвращает размер сообщения в байтах, в противном случае возвращается -1 и флаг сообщения об ошибке.
Модель клиент-сервер является асинхронной и отправление сообщения в SUB сокет вызывает ошибку. Вы могли бы вызвать zmq_msg_send() для отправки сообщений, но вы никогда не должны вызывать zmq_msg_recv() для PUB сокета.
Ниже приводится пример вывода на клиентской стороне:

Company2 570 Company2 878 Company2 981 Company2 783 Company1 855 Company1 524 Company2 639 Company1 984 Company1 158 Company2 145 

Сервер всегда будет отправлять сообщения, даже если нет клиентов. Вы можете испытать его и посмотреть на результат. Если вы это сделаете, то увидите что-то вроде следующего:

Sending... Company2 36 Sending... Company2 215 Sending... Company2 712 Sending... Company2 924 Sending... Company2 721 Sending... Company1 668 Sending... Company2 83 Sending... Company2 209 Sending... Company1 450 Sending... Company1 940 Sending... Company1 57 Sending... Company2 3 Sending... Company1 100 Sending... Company2 947 

Скажем, мы хотим получить результаты для Company1, или другой компании, имя которой вы укажете в качестве аргумента. В этом случае нам надо было бы изменить нашу клиентскую программу следующим образом:

// // Stock Market Client // Connects SUB socket to tcp://localhost:4040 // Collects stock exchange values // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[])  { 	void* context = zmq_ctx_new(); 	void* subscriber = zmq_socket(context, ZMQ_SUB); 	const char* filter; 	if(argc > 1)  	{ 		filter = argv[1]; 	}  	else  	{ 		filter = "Company1"; 	} 	printf("Collecting stock information from the server.\n"); 	int conn = zmq_connect(subscriber, "tcp://localhost:4040"); 	conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter,  	strlen(filter)); 	int i = 0; 	for(i = 0; i < 10; i++)  	{ 		zmq_msg_t reply; 		zmq_msg_init(&reply); 		zmq_msg_recv(&reply, subscriber, 0); 		int length = zmq_msg_size(&reply); 		char* value = malloc(length + 1); 		memcpy(value, zmq_msg_data(&reply), length); 		zmq_msg_close(&reply); 		printf("%s\n", value);  		free(value); 	} 	zmq_close(subscriber); 	zmq_ctx_destroy(context); 	return 0; } 

Вывод будет содержать что-то вроде следующего:

Company1 575 Company1 504 Company1 513 Company1 584 Company1 444 Company1 1010 Company1 524 Company1 963 Company1 929 Company1 718 

Фильтрация сообщений

Наше основное приложение фондовая биржа отправляет сообщения клиентам. Кажется, все сообщения доставлены, как и ожидалось, не так ли? К сожалению, нет.
Давайте изменим наш код сервера на следующий:

// // Stock Market Server // Binds PUB socket to tcp://*:4040 // Publishes random stock values of random companies // #include <string.h> #include "zmq.h" int main (int argc, char const *argv[])  { 	void* context = zmq_ctx_new(); 	void* publisher = zmq_socket(context, ZMQ_PUB); 	int conn = zmq_bind(publisher, "tcp://*:4040"); 	const char* companies[3] = {"Company1", "Company10",  		"Company101"}; 	int count = 0; 	for(;;)  	{ 		int price = count % 17; 		int which_company = count % 3; 		int index = strlen(companies[which_company]); 		char update[64]; 		snprintf(update, sizeof update, "%s",  			companies[which_company]); 		zmq_msg_t message; 		zmq_msg_init_size(&message, index); 		memcpy(zmq_msg_data(&message), update, index); 		zmq_msg_send(&message, publisher, 0); 		zmq_msg_close(&message); 		count++; 	} 	zmq_close(publisher); 	zmq_ctx_destroy(context); 	return 0; } 

Теперь давайте изменим наш клиентский код на следующий:

// // Stock Market Client // Connects SUB socket to tcp://localhost:4040 // Collects stock exchange values // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[])  { 	void* context = zmq_ctx_new(); 	void* subscriber = zmq_socket(context, ZMQ_SUB); 	const char* filter; 	if(argc > 1)  	{ 		filter = argv[1]; 	}  	else  	{ 		filter = "Company1"; 	} 	printf("Collecting stock information from the server.\n"); 	int conn = zmq_connect(subscriber, "tcp://localhost:4040"); 	conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter,  		strlen(filter)); 	int i = 0; 	for(i = 0; i < 10; i++)  	{ 		zmq_msg_t reply; 		zmq_msg_init(&reply); 		zmq_msg_recv(&reply, subscriber, 0); 		int length = zmq_msg_size(&reply); 		char* value = malloc(length + 1); 		memcpy(value, zmq_msg_data(&reply), length); 		zmq_msg_close(&reply); 		printf("%s\n", value);  		free(value); 	} 	zmq_close(subscriber); 	zmq_ctx_destroy(context); 	return 0; } 

В этом случае на выходе будет что-то похожее на следующее:

Collecting stock information from the server. Company101 950 Company10 707 Company101 55 Company101 343 Company10 111 Company1 651 Company10 287 Company101 8 Company1 889 Company101 536 

Наш клиентский код явно говорит, что мы хотим видеть результат для Company1. Однако, сервер опять посылает нам результаты для Company10 и Company101. Это, конечно, не то, что мы хотим. Мы должны решить эту маленькую проблему.
Мы можем сделать небольшой хак, чтобы получить то, что хотим, но использование разделителя является более простым вариантом.
Нам нужно внести некоторые изменения, как в клиентский код, так и в код сервера, мы будем фильтровать названия компаний, используя разделитель.
Ниже приведен обновленный код сервера, который исправляет прежние проблемы. Обратите внимание на выделенные линии, они показывают, как мы можем использовать разделитель, чтобы отправить сообщение для клиентов:

// // Stock Market Server // Binds PUB socket to tcp://*:4040 // Publishes random stock values of random companies // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[])  { 	void* context = zmq_ctx_new(); 	void* publisher = zmq_socket(context, ZMQ_PUB); 	int conn = zmq_bind(publisher, "tcp://*:4040"); 	conn = zmq_bind(publisher, "ipc://stock.ipc"); 	const char* companies[3] = {"Company1", "Company10",  		"Company101"}; 	for(;;)  	{ 		int price = count % 17; 		int which_company = count % 3; 		int index = strlen(companies[which_company]); 		char update[64]; 		sprintf(update, "%s| %d", companies[which_company], price); 		zmq_msg_t message; 		zmq_msg_init_size(&message, index); 		memcpy(zmq_msg_data(&message), update, index); 		zmq_msg_send(&message, publisher, 0); 		zmq_msg_close(&message); 		count++; 	} 	zmq_close(publisher); 	zmq_ctx_destroy(context); 	return 0; } 

Взглянем на обновленный код клиента для фильтрации результатов:

// // Stock Market Client // Connects SUB socket to tcp://localhost:4040 // Collects stock exchange values // #include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[])  { 	void* context = zmq_ctx_new(); 	void* subscriber = zmq_socket(context, ZMQ_SUB); 	const char* filter; 	filter = "Company1|"; 	printf("Collecting stock information from the server.\n"); 	int conn = zmq_connect(subscriber, "tcp://localhost:4040"); 	conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter,  		strlen(filter)); 	int i = 0; 	for(i = 0; i < 10; i++)  	{ 		zmq_msg_t reply; 		zmq_msg_init(&reply); 		zmq_msg_recv(&reply, subscriber, 0); 		int length = zmq_msg_size(&reply); 		char* value = malloc(length + 1); 		memcpy(value, zmq_msg_data(&reply), length); 		zmq_msg_close(&reply); 		printf("%s\n", value);  		free(value); 	} 	zmq_close(subscriber); 	zmq_ctx_destroy(context); 	return 0; } 

После изменений, которые были внесены в клиентский и серверный код, мы можем видеть именно те результаты, которые и ожидались.

Опции сокета

Так как мы используем модель клиент-сервер, то пользуемся параметром с именем ZMQ_SUBSCRIBE.

int conn = zmq_connect(subscriber, "tcp://localhost:4040"); conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, option_value,  	strlen(option_value)); 

Опции сокета устанавливаются в функции zmq_setsockopt(). Она принимает четыре параметра:

  • сокет
  • имя опции
  • значение опции
  • размер опции

Это можно увидеть из следующей строки:

int zmq_setsockopt (void *socket, int option_name, const void *option_ 	value, size_t option_len); 
Подписаться

ZMQ_SUBSCRIBE создает новое сообщение в ZMQ_SUB сокете. Если аргумент option_value не пустой, то мы подписываемся на все сообщения, которые начинаются с option_value. Вы можете настроить несколько фильтров для одного ZMQ_SUB сокета.

Отписаться

ZMQ_UNSUBSCRIBE удаляет сообщение из ZMQ_SUB сокета. Он удаляет только одно сообщение, даже если настроено несколько фильтров.
Главное, что мы должны усвоить о клиент-серверном сокете это то, что мы никогда не узнаем когда клиент начинает получать сообщения. В этом случае хорошей идеей будет запускать сначала клиента, а потом уже сервер. Потому что клиент всегда воспринимает первое сообщение как соединение с сервером, которое занимает много времени, а сервер может уже слать сообщения в это время.
Однако, мы будем говорить о том, как синхронизировать сервер с клиентом, так как мы не должны посылать сообщения, если клиент не на связи.

Замечания по модели клиент-сервер

В модели клиент-сервер следует обратить внимание на следующие моменты:

  • Сообщения ставятся в очередь на стороне сервера, если вы используете TCP, и клиент слишком медленно принимает сообщения. Мы покажем, как можно защитить приложение от этого.
  • Клиент может подключиться к нескольким серверам. Данные будут передаваться по стратегии справедливой очереди (fair-queue)
  • Сервер посылает все сообщения всем клиентам, и фильтрация выполняется на стороне клиента, это было продемонстрировано выше в программе биржевого рынка

В главе 4 мы еще вернемся к модели клиент-сервер, рассмотрим более сложные примеры и покажем, как бороться с «медленными» клиентами.

Паттерн pipeline

Давайте продолжим, рассмотрим модель pipeline. Паттерн pipeline передает данные между упорядоченными узлами в pipeline. Данные передаются непрерывно и на каждом шаге pipe присоединен к одному из нескольких узлов. Между узлами используется стратегия циклической передачи данных. Это немного похоже не модель запрос-ответ.

Стратегия разделяй и властвуй

От этой стратегии разделяй и властвуй нет никакого спасения, когда вы программируете. Помните, когда вы только начинали изучать программирование, и ваш преподаватель применял ее чуть ли не в сортировке слиянием и через неделю уже половина группы перестало посещать занятия. Я уверен, что все это отлично помнят. И вот опять, разделяй и властвуй!
Давайте напишем что-нибудь параллельное на ZeroMQ. Рассмотрим сценарий, где у нас есть генератор, который генерирует случайные числа. У нас есть рабочие, которые находят квадратный корень из этих чисел методом Ньютона. Так же мы располагаем сборщиком, который собирает результаты у рабочих.
Ниже приведен код сервера:

#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <sys/time.h> #include <time.h> #include "zmq.h" int main (int argc, char const *argv[])  { 	void* context = zmq_ctx_new(); 	// This is the socket that we send messages. 	void* socket = zmq_socket(context, ZMQ_PUSH); 	zmq_bind(socket, "tcp://*:4040"); 	// This is the socket that we send batch message. 	void* connector = zmq_socket(context, ZMQ_PUSH); 	zmq_connect(connector, "tcp://localhost:5050"); 	printf("Please press enter when workers are ready..."); 	getchar(); 	printf("Sending tasks to workers...\n"); 	// The first message. It's also the signal start of batch. 	int length = strlen("-1"); 	zmq_msg_t message; 	zmq_msg_init_size(&message, length); 	memcpy(zmq_msg_data(&message), "-1", length); 	zmq_msg_send(&message, connector, 0); 	zmq_msg_close(&message); 	// Generate some random numbers. 	srandom((unsigned) time(NULL)); 	// Send the tasks. 	int count; 	int msec = 0; 	for(count = 0; count < 100; count++)  	{ 		int load = (int) ((double) (100) * random () / RAND_MAX); 		msec += load; 		char string[10]; 		sprintf(string, "%d", load); 	} 	printf("Total: %d msec\n", msec); 	sleep(1); 	zmq_close(connector); 	zmq_close(socket); 	zmq_ctx_destroy(context); 	return 0; } 

Так же взглянем на код работника, где мы делаем некоторые расчеты по вычислению квадратного корня из числа методом Ньютона:

#include <stdlib.h> #include <string.h> #include <unistd.h> #include "zmq.h" double square(double x)  { 	return x * x; } double average(double x, double y)  { 	return (x + y) / 2.0; } double good_enough(double guess, double x)  { 	return abs(square(guess) - x) < 0.000001; } double improve(double guess, double x)  { 	return average(guess, x / guess); } double sqrt_inner(double guess, double x)  { 	if(good_enough(guess, x)) 		return guess; 	else 		return sqrt_inner(improve(guess, x), x); } double newton_sqrt(double x)  { 	return sqrt_inner(1.0, x); } int main (int argc, char const *argv[])  { 	void* context = zmq_ctx_new(); 	// Let's initialize a socket to receive messages. 	void* receiver = zmq_socket(context, ZMQ_PULL); 	zmq_connect(receiver, "tcp://localhost:4040"); 	// Let's initialize a socket to send the messages. 	void* sender = zmq_socket(context, ZMQ_PUSH); 	zmq_connect(sender, "tcp://localhost:5050"); 	for(;;)  	{ 		zmq_msg_t reply; 		zmq_msg_init(&reply); 		zmq_msg_recv(&reply, receiver, 0); 		int length = zmq_msg_size(&reply); 		char* msg = malloc(length + 1); 		memcpy(msg, zmq_msg_data(&reply), length); 		zmq_msg_close(&reply); 		fflush(stdout); 		double val = atof(msg); 		printf("%.1f: %.1f\n", val, newton_sqrt(val)); 		sleep(1); 		free(msg); 		zmq_msg_t message; 		char* ssend = "T"; 		int t_length = strlen(ssend); 		zmq_msg_init_size(&message, t_length); 		memcpy(zmq_msg_data(&message), ssend, t_length); 		zmq_msg_send(&message, receiver, 0); 		zmq_msg_close(&message); 	} 	zmq_close(receiver); 	zmq_close(sender); 	zmq_ctx_destroy(context); 	return 0; } 

Приведем код и сборщика:

#include <stdlib.h> #include <string.h> #include "zmq.h" int main (int argc, char const *argv[])  { 	void* context = zmq_ctx_new(); 	void* receiver = zmq_socket(context, ZMQ_PULL); 	zmq_bind(receiver, "tcp://*:5050"); 	// We receive the first message and discard it since it's the 	// signal start of batch which is -1. 	zmq_msg_t reply; 	zmq_msg_init(&reply); 	zmq_msg_recv(&reply, receiver, 0); 	int length = zmq_msg_size(&reply); 	char* msg = malloc(length + 1); 	memcpy(msg, zmq_msg_data(&reply), length); 	zmq_msg_close(&reply); 	free(msg); 	int count; 	for(count = 0; count < 100; count++)  	{ 		zmq_msg_t reply; 		zmq_msg_init(&reply); 		zmq_msg_recv(&reply, receiver, 0); 		int length = zmq_msg_size(&reply); 		char* value = malloc(length + 1); 		memcpy(value, zmq_msg_data(&reply), length); 		zmq_msg_close(&reply); 		free(value); 		if(count / 10 == 0) 			printf("10 Tasks have been processed."); 		fflush(stdout); 	} 	zmq_close(receiver); 	zmq_ctx_destroy(context); 	return 0; } 

Следующая диаграмма, представляет код написанный выше:

Что же мы имеем:

  • Для начала нужно синхронизировать время, когда работники начинают работать. Как говорилось ранее, процесс подключения занимает некоторое время. Если мы не сделаем синхронизацию, то первый работник будет получать сообщения, в то время как другие находятся на стадии подключения. Для того, чтобы предотвратить это, мы должны синхронизировать начало работы, чтобы все работало параллельно.
  • PULL сокет сборщика получает результаты, используя разработанную справедливую очередь (более подробно мы говорили о ней в первой части)
  • PUSH сокет сервера равномерно отправляет задачи работникам
  • Рабочие соединены как с сервером, так и со сборщиком. При желании вы можете задействовать и большее число работников.

Мы упоминали, что работники соединены как с сервером, так и со сборщиком. Давайте рассмотрим эти связи более подробно.
Давайте посмотрим на следующие строки из кода нашего работника:

// Let's initialize a socket to receive messages. void* receiver = zmq_socket(context, ZMQ_PULL); zmq_connect(receiver, "tcp://localhost:4040"); 
ZMQ_PULL сокет

Когда мы хотим получить данные от входа к узлам, мы используем ZMQ_PULL. Тип сокета ZMQ_PULL используется для получения сообщений из вышестоящих узлов в pipeline. Как мы уже говорили ранее, этот процесс осуществляется с помощью планирования справедливой очереди.

ZMQ_PUSH сокет

Когда мы хотим общаться с нижними узлами, мы используем ZMQ_PUSH. Тип сокета ZMQ_PUSH используется для отправки сообщений следующим узлам в pipeline.
ZMQ_PUSH никогда не отбрасывает сообщения. Если вышестоящий узел готов отправить сообщение нижестоящему узлу, но последний не готов принять послание и обработать его, то все сообщения, отправленные с помощью zmq_send() блокируются до тех пор, пока не станет доступен хотя бы один узел для приема сообщения.

Получение контекста ZeroMQ

Скорее всего, вы обратили внимание, что все примеры, которые были до этого приведены, начинались с zmq_ctx_new(). Приложения ZeroMQ всегда начинаются с создания контекста. Все сокеты создаются внутри одного процесса с помощью контекста, который участвует в процессе создания сокетов, так как они являются самым быстрым способом подключения потоков в одном процессе. Контекст ZeroMQ потокобезопасен, поэтому его можно легко передавать между потоками.
Если контекст ZeroMQ не может быть создан, то возвращается NULL.
Несмотря на то, что можно создать несколько контекстов, которые будут рассматриваться как отдельные приложения ZeroMQ, лучшей идеей будет создание одного контекста и передача его другим потокам.

Деструктор контекста ZeroMQ

В конце каждого приложения вам необходимо уничтожить контекст, который вы создали, вызвав zmq_ctx_destroy(). После вызова zmq_ctx_destroy() все процессы возвращают код ошибки(ETERM), zmq_ctx_destroy() блокирует вызовы открытых сокетов и закрывает их, вызывая zmq_close().

Очистка

Когда вы программируете на таких языках программирования как Pyhton или Java, то вам не нужно беспокоиться об управлении памятью, так как эти языки имеют встроенные сборщики мусора.
Наппример, Pyhton использует подсчет ссылок, когда счетчик становится равным нулю, то память освобождается автоматически. Таким образом, вы н должны явно закрывать соединение при написании приложение ZeroMQ на Pyhton, поскольку оно будет автоматически закрыто, как только счетчик ссылок объекта станет равным нулю. Однако, следует отметить, что это не будет работать в Jython, PyPy, или IronPython. Во всяком случае, можно найти достаточно информации в документации Python. Давайте вернемся к нашей главное задаче.
Когда вы пишете на С, то управление памятью полностью лежит на вашей ответственности. В противном случае вы будете иметь нестабильное приложение, которое будет иметь утечки памяти.
Вы должны сами позаботиться о закрытии сокетов, уничтожении сообщений и контекста ZeroMQ. Есть несколько вещей, которые нужно рассмотреть, чтобы успешно завершить приложение:

  • Как говорилось ранее, чтобы закрыть приложение вам нужно уничтожить контекст ZeroMQ, вызвав zmq_ctx_destroy(). Однако, если имеются открытые сокеты, то zmq_ctx_destroy может бесконечно долго ждать их закрытия. Таким образом, необходимо сначала закрыть все сокеты, а затем вызвать zmq_ctx_destroy(), чтобы уничтожить контекст.
  • zmq_ctx_destroy() будет ждать бесконечно долго, если имеется открытое соединение или имеются сообщения в очереди для отправки.
  • всякий раз, когда вы заканчиваете обработку сообщения, необходимо сразу его закрыть, вызвав zmq_msg_close(), в противном случае ваше приложение будет иметь утечки памяти.
  • не открывайте много сокетов. Если вы это делаете, то это в значительной степени означает, что вы делаете что-то не так и лучше спроектируйте приложение с нуля.

Вы можете удивиться, увидев что будет происходит с вашим приложением, если оно написано неправильно, особенно если оно многопоточное. В этом случае отловить ошибку будет крайне сложно.

Обнаружение утечек памяти

Приложение, написанное на С или С++ нуждается в хорошо написанном менеджере памяти, так как управление памятью полностью ложится на плечи программиста. Для этого мы будем использовать замечательный Linux-инструмент под названием Valgrind. Среди многих других полезных функций для анализа исходного кода этот инструмент может быть использован для обнаружения утечек памяти.
Следующий раздел — это небольшой учебник по Valgrind, в нем мы более детально посмотрим, как использовать Valgrind при написании приложений на ZeroMQ.

Введение в Valgrind

Вы можете компилировать ваше приложение с использованием параметра –g для вывода отладочной информации. В этом случае сообщения об ошибках будут содержать точные номера строк.
Рассмотрим следующий пример:

#include <stdio.h> #include <stdlib.h> int main(int argc, char const *argv[])  { 	char* a = malloc(4); 	int b; 	printf("b = %d\n", b); 	return 0; } 

Давайте скомпилируем в gcc введя gcc –g –o test test.c. Теперь пришло время для запуска Valgrind для проверки утечек памяти. Давайте выполним следующую команду:

valgrind --leak-check=full --show-reachable=yes test 

После того, как мы ввели предыдущую команду, Valgrind начнет проверять код на наличие ошибок памяти с помощью инструмента memcheck. Его можно вызвать отдельно, выполнив tool=memcheck, но это было бы бессмысленно, так как memcheck является инструментом по умолчанию. Вывод будет похож на следующий:

==98190== Conditional jump or move depends on uninitialised value(s) ==98190== at 0x2D923: __vfprintf  ==98190== by 0x4AC5A: vfprintf_l ==98190== by 0x952BE: printf ==98190== by 0x1F5E: main (test.c:8) ==98190== 4 bytes in 1 blocks are definitely lost in loss record 1 of 5 ==98190== at 0xF656: malloc (vg_replace_malloc.c:195) ==98190== by 0x1F46: main (test.c:6) ==98190== LEAK SUMMARY: ==98190== definitely lost: 4 bytes in 1 blocks ==98190== indirectly lost: 0 bytes in 0 blocks ==98190== possibly lost: 0 bytes in 0 blocks 

Теперь давайте немного опишем предыдущий вывод:

  • мы не будем обсуждать 98190, так как это ID процесса
  • Conditional jump or move depends on uninitialised value(s) означает что инициализация в нашем коде прошла успешно
  • definitely lost означает, что есть утечка памяти, и мы должны ее исправить
  • indirectly lost указывает на блок, который может быть потерян
  • possibly lost означает, что есть возможная утечка памяти

По умолчанию Valgrind использует $PREFIX/lib/valgrind/default.supp. Тем не менее, мы должны создать свой файл для использования его в ZeroMQ, выглядеть он будет как то так:

{ 	<socketcall_sendto> 	Memcheck:Param 	socketcall.sendto(msg) 	fun:send 	... } { 	<socketcall_sendto> 	Memcheck:Param 	socketcall.send(msg) 	fun:send 	... } 

Тогда можно было бы запустить Valgrind с нужными нам аргументами следующим образом:

valgrind --leak-check=full --show-reachable=yes --suppressions=zeromq.supp server 

Заключение

В этой главе мы познакомились с сокетами и ввели две новые модели, а именно модель клиент-сервер и паттерн pipeline. Так же мы обсудили как решать определенные задачи используя данные шаблоны, и показали это на простых примерах. Так же мы рассмотрели отличный инструмент для нахождения утечек памяти Valgrind.

Большое спасибо за внимание, замечания по переводу прошу присылать в личку.

ссылка на оригинал статьи http://habrahabr.ru/post/216957/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *