Время от времени на форумах рунета в ветке Qt появляются вопросы связанные с программированием сетевых приложение. Одна из проблем которая терзает этих людей, это подход к организации сервера. Обычно перечисляют три подхода:
- однопоточный асинхронный;
- многопоточный, создавать по потоку на соединение;
- многопоточный, с пулом потоков на QThreadPool и QRunnable.
Когда говоришь про фиксированное количество рабочих потоков со своим циклом обработки событий, то просят привести пример. Дальше будет приведен пример сервера с пулом потоков, каждый из которых имеет свой цикл обработки событий.
Действующие лица:
- класс Server, приминающий соединения и раздающий задачи рабочим;
- класс Worker, экземпляры которого будут создавать в рабочих потоках экземпляры класса Client;
- класс Client инкапсулирующий запросы клиента и реализующий SOCKS 4
Самый простой из этой троицы Worker, он наследуется от QObject и реализует всего одну функцию создания клиента и чисто для «правильного» употребления потоков:
class Worker: public QObject { Q_OBJECT public: Q_INVOKABLE void addClient(qintptr socketDescriptor); };
void Worker::addClient(qintptr socketDescriptor) { new Client(socketDescriptor, this); }
Сервер так же прост:
class Server: public QTcpServer { Q_OBJECT public: Server(size_t threads = 4, QObject * parent = nullptr); ~Server(); protected: virtual void incomingConnection(qintptr socketDescriptor); private: void initThreads(); private: size_t m_threadCount; QVector<QThread*> m_threads; QVector<Worker*> m_workers; size_t m_rrcounter; };
Server::Server(size_t threads, QObject * parent) : QTcpServer(parent), m_threadCount(threads), m_rrcounter(0) { initThreads(); } Server::~Server() { for(QThread* thread: m_threads) { thread->quit(); thread->wait(); } } void Server::initThreads() { for (size_t i = 0; i < m_threadCount; ++i) { QThread* thread = new QThread(this); Worker* worker = new Worker(); worker->moveToThread(thread); connect(thread, &QThread::finished, worker, &QObject::deleteLater); m_threads.push_back(thread); m_workers.push_back(worker); thread->start(); } } void Server::incomingConnection(qintptr socketDescriptor) { Worker* worker = m_workers[m_rrcounter % m_threadCount]; ++m_rrcounter; QMetaObject::invokeMethod(worker, "addClient", Qt::QueuedConnection, Q_ARG(qintptr, socketDescriptor)); }
Он в конструкторе создает потоки, рабочих и перемещает рабочих в потоки. Каждое новое соединение он передает рабочему. Рабочего он выбирает «почестноку», т. е. по Round-robin.
SOCKS 4 очень простой протокол, нужно лишь:
- прочитать IP-адрес, номер порта;
- установить соединение с «миром»;
- отправить клиенту сообщение, что запрос подтвержден;
- пересылать данные из одного сокета в другой, пока кто-нибудь не закроет соединение.
class Client: public QObject { Q_OBJECT public: Client(qintptr socketDescriptor, QObject* parent = 0); public slots: void onRequest(); void client2world(); void world2client(); void sendSocksAnsver(); void onClientDisconnected(); void onWorldDisconnected(); private: void done(); private: QTcpSocket m_client; QTcpSocket m_world; };
namespace { #pragma pack(push, 1) struct socks4request { uint8_t version; uint8_t command; uint16_t port; uint32_t address; uint8_t end; }; struct socks4ansver { uint8_t empty = 0; uint8_t status; uint16_t field1 = 0; uint32_t field2 = 0; }; #pragma pack(pop) enum SocksStatus { Granted = 0x5a, Failed = 0x5b, Failed_no_identd = 0x5c, Failed_bad_user_id = 0x5d }; } Client::Client(qintptr socketDescriptor, QObject* parent) : QObject(parent) { m_client.setSocketDescriptor(socketDescriptor); connect(&m_client, &QTcpSocket::readyRead, this, &Client::onRequest); connect(&m_client,&QTcpSocket::disconnected, this, &Client::onClientDisconnected); connect(&m_world, &QTcpSocket::connected, this, &Client::sendSocksAnsver); connect(&m_world, &QTcpSocket::readyRead, this, &Client::world2client); connect(&m_world,&QTcpSocket::disconnected, this, &Client::onWorldDisconnected); } void Client::onRequest() { QByteArray request = m_client.readAll(); socks4request* header = reinterpret_cast<socks4request*>(request.data()); #if Q_BYTE_ORDER == Q_LITTLE_ENDIAN const QHostAddress address(qFromBigEndian(header->address)); #else const QHostAddress address(header->address); #endif #if Q_BYTE_ORDER == Q_LITTLE_ENDIAN const uint16_t port = qFromBigEndian(header->port); #else const uint16_t port = header->port; #endif //qDebug()<<"connection:"<<address<<"port:"<<port; m_world.connectToHost(address, port); disconnect(&m_client, &QTcpSocket::readyRead, this, &Client::onRequest); connect(&m_client, &QTcpSocket::readyRead, this, &Client::client2world); } void Client::sendSocksAnsver() { socks4ansver ans; ans.status = Granted; m_client.write(reinterpret_cast<char*>(&ans), sizeof(ans)); m_client.flush(); } void Client::client2world() { m_world.write(m_client.readAll()); } void Client::world2client() { m_client.write(m_world.readAll()); } void Client::onClientDisconnected() { m_world.flush(); done(); } void Client::onWorldDisconnected() { m_client.flush(); done(); } void Client::done() { m_client.close(); m_world.close(); deleteLater(); }
Epoll и Qt
Крик подобен грому:
— Дайте людям рому
Нужно по любому
Людям выпить рому!
Если мы скомпилим предыдущий код и прогоним его через strace -f, то увидим вызовы poll. Обязательно найдется кто-то, кто скажет своё веское «фи», мол с epoll будет «ну ваще ракета».
В Qt есть класс QAbstractEventDispatcher, позволяющий определять свой диспетчер событий. Естественно нашлись добрые люди которые сделали и выложили диспетчеры с разными бекенд. Вот небольшой их список:
- connectedtable/qeventdispatcher_epoll;
- sjinks/qt_eventdispatcher_epoll;
- sjinks/qt_eventdispatcher_libev;
- sjinks/qt_eventdispatcher_libevent
При использовании своего диспетчера в main.cpp прописываем
QCoreApplication::setEventDispatcher(new QEventDispatcherEpoll); QCoreApplication app(argc, argv)
а метод initThreads у сервера становится таким:
void Server::initThreads() { for (size_t i = 0; i < m_threadCount; ++i) { QThread* thread = new QThread(this); thread->setEventDispatcher(new QEventDispatcherEpoll); Worker* worker = new Worker(); worker->moveToThread(thread); connect(thread, &QThread::finished, worker, &QObject::deleteLater); m_threads.push_back(thread); m_workers.push_back(worker); thread->start(); } }
И если мы снова запустим strace, то увидим заветные вызовы функций с префиксом epoll_.
Выводы
Выводы сугубо прагматические.
Если вы прикладной программист и у вас нет задач из разряда «больших» данных или highload по Бунину, то пишите на чем хотите и как можете. Задача прикладного программиста выдать продукт определенного качества, затратив определенное количество ресурсов. В противном случае одними лишь сокетами с epoll не обойдешься.
P.S.
Исходные коды доступны на GitHub.
ссылка на оригинал статьи http://habrahabr.ru/post/263549/
Добавить комментарий