Проблема
Бывают ситуации, когда разбитые на различные потоки задачи имеют разное время выполнения. В этом случае один или несколько потоков будут «простаивать», не производя полезных действий. Очевидно, это повлияет на общее время выполнения всех задач в худшую сторону. Например, в магазине есть две кассы. В очереди на первой кассе стоят покупателями с полными корзинами покупок, тогда как в очереди второй кассы такое же количество покупателей, но все они с одной шоколадкой. Когда вторая касса обслужит всех своих клиентов, то очередь первой кассы все еще будет полна.
Другой проблемой обычного распределения задач на потоки может быть ситуация, когда результат работы одной задачи вычисляется быстро, но накладные расходы на «ненужные» для клиента (сущность, которая инициировала выполнение задачи) операции существенны. Примером может служить очередь перед турникетами на входе в университет. Когда студент подносит карту к считывателю, то проверка на существование номера его карты в базе данных происходит очень быстро, а вот запись и обработка статистической информации в другую базу данных занимает продолжительное время. В этом случае, вся очередь студентов будет ждать, пока эти операции завершатся, причем тот факт, что студент сможет пройти, известен уже давно. Зачем заставлять всех ждать, пока выполняются «ненужные» для студентов операции?
Решение указанных выше проблем при программировании параллельных систем возможно с применением шаблона проектирования ActiveObject.
Описание паттерна
Суть паттерна заключается в том, чтобы отделить поток, в котором был инициирован запуск задачи от потока, в котором она будет исполняться. Есть несколько разновидностей реализаций паттерна. Рассмотрим метод с применением паттерна «Команда» (читателю рекомендуется ознакомиться с этим шаблоном программирования). Общая схема представлена на рисунке.
Клиенту доступны классы-наследники класса Command, которые переопределяют виртуальный метод execute(), а также ActiveObjectProxy, с помощью которого будет происходить взаимодействие с создаваемой системой.
Класс GuardedQueue – это потоко-защищённая очередь, которая реализует методы push и pop. Очередь оперирует объектами подклассов класса Command.
Основным элементом паттерна ActiveObject является класс Sheduler или «планировщик». Именно логика его построения определяет эффективность всей системы. Отметим важные моменты его реализации:
- В конструкторе создаются «threadNumber» потоков, каждый из которых не заканчивает свою работу до завершения работы приложения.
- В каждом из созданных потоков запускается функция dispatch(), которая в бесконечном цикле выталкивает верхний элемент из очереди «queue» и запускает его метод execute().
Эффективность работы паттерна определяется тем, что для каждой задачи не приходится создавать свой поток (процесс создания потока, как известно, занимает продолжительное время), а также тем, что потоки «не простаивают», если есть задачи в очереди.
Рассмотренная выше структура является базовой, облегченной для понимания читателя. Стоит отметить несколько важных моментов.
- Во-первых, возвращение результата выполнения задачи клиенту может быть реализовано как дополнительный метод getResult() и семафора в классе «Command». При этом клиент будет ждать открытия семафора до тех пор, пока результат не будет известен. Открытие же семафора будет происходить в методе «execute()» после выполнения необходимой работы и записи результата.
- Во-вторых, стоит отметить проблему остановки ActiveObject. Очевидно, что для корректного завершения всех созданных потоков, необходимо, чтобы каждый из них вышел из бесконечного цикла и сам завершился. Этого можно достичь, создав класс-наследник от Command под названием StopCommand. Далее, поместив в очередь планировщика столько объектов этого класса, сколько было создано потоков. В функции dispatch() необходимо реализовать дополнительную проверку на принадлежность объекта из верхушки очереди классу StopCommand. В случае принадлежности объекта этому классу, будет происходить выход из бесконечного цикла и корректное завершение потока. Так как в очереди будет объектов StopCommand такое число, которое равно числу потоков, то каждый поток получит свою команду на завершение.
Измерение эффективности
Создадим 1000 объектов класса ConcretCommand и для того, чтобы эмулировать различную задержку выполнения различных задач, каждому объекту присвоим случайное значение времени ожидания. Таким образом, класс Command и ConcretCommand будут иметь следующую реализацию:
class Command { protected: int sleep; // задержка HANDLE semaphore; char * name; public: Command(char *uniqName, int _sleep) { name = new char[strlen(uniqName)]; wchar_t *semName = new wchar_t[strlen(uniqName)]; for (int i = 0; i < strlen(uniqName); ++i) semName[i] = uniqName[i]; semaphore = CreateSemaphore(NULL, 0, 1, semName); this->sleep = _sleep; strcpy(name, uniqName); } void getResult() { // Считаем результат и устанавливаем его WaitForSingleObject(semaphore, INFINITE); // Делаем ненужные для клиента операции return; } virtual void execute() = 0; ~Command(void) { } }; class ConcretCommand: public Command { public: ConcretCommand(char *uniqName, int _sleep) : Command(uniqName, _sleep) { } void execute() { Sleep(sleep); ReleaseSemaphore(semaphore, 1, 0); // для подсчтета статистики нам не нужна лишняя информация //printf("My name: %s My sleep value: %d\n", name, sleep); } ~ConcretCommand(void) {} };
Создание 1000 объектов класса ConcretCommand будет иметь вид:
for (int i = 0; i < commandCount; ++i) { int sleep = rand() % 1000; if (i < 4 * commandCount / 5) sleep /= 10; command[i] = new ConcretCommand(intToStr(i + 1), sleep); }
Здесь переменная «commandCount» равна 1000. Заметим, что 20% объектов будут иметь задержку в 10 раз большую, по сравнению с остальными. Это позволит создать лучший разброс во времени выполнения команд.
Для того чтобы проследить изменение эффективности, будем проводить замеры времени выполнения всех задач при простом распределении задач на потоки, а также при использовании рассматриваемого паттерна.
В первом случае распределение на потоки и замеры времени будут иметь следующую реализацию:
void execute(void *prm) { int thread = (int)prm; int count = commandCount / threadCount; if (commandCount % threadCount != 0) count++; int start = count * thread; for (int i = start; i < start + count && i < commandCount; ++i) { command[i]->execute(); } return; } … код пропущен … printf("\nUse only multithreading:\n"); HANDLE threads[threadCount]; time_t start = clock(), end; for (int i = 0; i < threadCount; ++i) { threads[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)execute, (LPVOID)i, NULL, NULL); } for (int i = 0; i < commandCount; ++i) { command[i]->getResult(); } end = clock(); double firstTime = (double)(end - start) / CLK_TCK; printf("Time: %.3lf\n\n", firstTime);
Использование паттерна и замеры времени во втором случае будут выглядеть так:
printf("Use active object pattern:\n"); ActiveObjectProxy * proxy = new ActiveObjectProxy(threadCount); start = clock(); for (int i = 0; i < commandCount; ++i) { proxy->push(command[i]); } for (int i = 0; i < commandCount; ++i) { command[i]->getResult(); } end = clock(); double secondTime = (double)(end - start) / CLK_TCK; printf("Time: %.2lf\n", secondTime);
Запуск программы происходил на процессоре IntelCore i5, операционная система Windows 7 Professional. Полученные результаты сравнения приведены на рисунке ниже.
Паттерн Active Object может существенно повысить эффективность многопоточных приложений. Проблемы, поставленные в начале статьи, решаются, вследствие чего ни один поток не простаивает, что приводит к существенному уменьшению времени выполнения всех задач.
Реализация ActiveObject
class Proxy { private: Sheduler * sheduler; public: Proxy(int tCount) { sheduler = new Sheduler(tCount); } void push(Command * someCommand) { sheduler->push(someCommand); } void wait() { sheduler->waitAll(); } ~Proxy(void){} }; class Sheduler { private: GuardedQueue<Command *> *queue; // потоко-защищенная очередь! int tCount; HANDLE *threads; // функция, которая будет вызываться в потоках friend void dispach(GuardedQueue<Command *> *); public: Sheduler(int _tCount) { tCount = _tCount; threads = new HANDLE[tCount]; queue = new GuardedQueue<Command *>(); for (int i = 0; i < tCount; ++i) { threads[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)dispach, queue, NULL, NULL); } } void push(Command *concretCommand) { queue->push(concretCommand); } void waitAll() { WaitForMultipleObjects(tCount, threads, true, INFINITE); } ~Sheduler(void){} }; void dispach(GuardedQueue<Command *> *queue) { while (true) { Command * someCommand = queue->pop(); someCommand->execute(); } } template<typename T> class GuardedQueue { private: HANDLE semToPush, semToPop, control; queue<T> my_queue; bool condition; public: GuardedQueue(void) { semToPush = CreateSemaphore(NULL, 1, 1, L"semaphoreToPush_14.12.1989"); semToPop = CreateSemaphore(NULL, 0, 1, L"semaphoreToPop_14.12.1989"); control = CreateSemaphore(NULL, 1, 1, L"control_14.12.1989"); } void push(const T& obj) { WaitForSingleObject(semToPush, INFINITE); WaitForSingleObject(control, INFINITE); my_queue.push(obj); ReleaseSemaphore(control, 1, 0); ReleaseSemaphore(semToPop, 1, 0); ReleaseSemaphore(semToPush, 1, 0); } T pop() { T obj = NULL; while (obj == NULL) { WaitForSingleObject(semToPop, INFINITE); WaitForSingleObject(control, INFINITE); if (!my_queue.empty()) { obj = my_queue.front(); my_queue.pop(); ReleaseSemaphore(semToPop, 1, 0); } ReleaseSemaphore(control, 1, 0); } return obj; } };
ссылка на оригинал статьи http://habrahabr.ru/post/155521/
Добавить комментарий