zmq зависает в zmq_proxy () во время очистки

Я использую ZeroMQ версии 4.2.4 на машине с Ubuntu (4.4.0-28-generic # 47 ~ 14.04.1-Ubuntu).

я поднял mtserver.c от https://github.com/booksbyus/zguide/blob/master/examples/C/mtserver.c.
Шаблон REQ-ROUTER-DEALER-REP, Ниже приведена слегка измененная версия того же кода.

Проблема — когда я пытаюсь завершить работу после всех рабочих потоков, код зависает zmq_proxy( clients, workers, NULL ),

Я пробовал с LINGER опция таймера на сокете «клиенты», но это не помогло. Ценю любую помощь, чтобы помочь мне отладить. Благодарю.

(gdb) bt
#0  0x00007fd6a4febc9d in poll () at ../sysdeps/unix/syscall-template.S:81
#1  0x00007fd6a58232fa in poll (__timeout=-1, __nfds=1, __fds=0x7ffd692d80e0) at /usr/include/x86_64-linux-gnu/bits/poll2.h:46
#2  zmq::signaler_t::wait (this=this@entry=0x1f09e28, timeout_=timeout_@entry=-1) at src/signaler.cpp:232
#3  0x00007fd6a5809f95 in zmq::mailbox_t::recv (this=0x1f09dc0, cmd_=0x7ffd692d8140, timeout_=-1) at src/mailbox.cpp:81
#4  0x00007fd6a582497d in zmq::socket_base_t::process_commands (this=this@entry=0x1f09850, timeout_=timeout_@entry=-1,
throttle_=throttle_@entry=false) at src/socket_base.cpp:1341
#5  0x00007fd6a5824ee3 in zmq::socket_base_t::send (this=this@entry=0x1f09850, msg_=msg_@entry=0x7ffd692d83d0, flags_=<optimized out>)
at src/socket_base.cpp:1156
#6  0x00007fd6a5819d24 in forward (from_=from_@entry=0x1f077a0, from_stats=from_stats@entry=0x7ffd692d8330, to_=to_@entry=0x1f09850,
to_stats=to_stats@entry=0x7ffd692d8350, capture_=capture_@entry=0x0, msg_=...) at src/proxy.cpp:147
#7  0x00007fd6a581a7d3 in zmq::proxy (frontend_=0x1f077a0, backend_=0x1f09850, capture_=0x0, control_=0x0) at src/proxy.cpp:462
#8  0x00000000004018b2 in zmqMTServer::Start (this=this@entry=0x7ffd692d84e0) at mtserver.cpp:75
#9  0x0000000000401207 in main () at mtserver.cpp:89

(GDB

#include <thread>
#include <iostream>
#include <zmq.h>
#include <assert.h>
#include <string.h>

using namespace std;
bool zmqServerShutdown = false;

class zmqMTServer {
private:
ushort workerThreads;
void* zmqContext;
unique_ptr<thread[]> workerThreadIDs;
void workerRoutine(void);

public:
zmqMTServer(ushort threads) : workerThreads(threads)
{
zmqContext = zmq_ctx_new();
assert(zmqContext);
auto rc = zmq_ctx_set(zmqContext, ZMQ_IO_THREADS, 1);
assert(rc == 0);
rc = zmq_ctx_set(zmqContext, ZMQ_MAX_SOCKETS, ZMQ_MAX_SOCKETS_DFLT);
assert(rc == 0);
}
int Start(void);
~zmqMTServer(void)
{
for (auto i = 0; i < workerThreads; i++)
workerThreadIDs[i].join();
zmq_ctx_destroy(zmqContext);
}
};

void zmqMTServer::workerRoutine(void)
{
void *receiver = zmq_socket(zmqContext, ZMQ_REP);
assert(receiver);
auto rc = zmq_connect(receiver, "inproc://workerThreads");
assert(rc == 0);
cout << "Worker thread(" << this_thread::get_id() << ") started \n";
while (!zmqServerShutdown) {
char buf[256];
auto size = zmq_recv(receiver, buf, 255, 0);
if (size == -1) {
cout << "workerRoutine(): zmq_recv size = " << size << "\n";
continue; // some thing went wrong
}
buf[size] = '\0';
cout << "Worker thread(" << this_thread::get_id() << ") Received request: " << buf << "\n";

size = zmq_send(receiver, buf, strlen(buf), 0);
}
cout << "zmqMTServer worker thread exiting " << this_thread::get_id() << "\n";
rc = zmq_close(receiver);
}

int zmqMTServer::Start(void)
{
void *clients = zmq_socket(zmqContext, ZMQ_ROUTER);
assert(clients);

auto rc = zmq_bind(clients, "tcp://*:10051");
assert(rc == 0);
void *workers = zmq_socket(zmqContext, ZMQ_DEALER);
rc = zmq_bind (workers, "inproc://workerThreads");
assert(rc == 0);

//  Launch pool of worker threads
workerThreadIDs = unique_ptr<thread[]>(new thread[workerThreads]);
for(auto i = 0; i < workerThreads; i++) {
workerThreadIDs[i] = thread(&zmqMTServer::workerRoutine, this);
}

zmq_proxy(clients, workers, NULL);

zmq_close(clients);
zmq_close(workers);
return 0;
}

int main(void)
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
cout << "ZMQ version : " << major << "." << minor << "." << patch << "\n";

zmqMTServer server(3);
server.Start();
}

1

Решение

[+1] за аналитические мысли об использовании ZMQ_LINGER


Ох уж эти дешевые и плохие примеры v / s реального мира Распределенные системы

Вкратце, код, заимствованный из примера из учебника, страдает от использования режима блокировки zmq_recv(...) операции.

Лучше проектировать распределенную систему принципиально неблокирующим образом. Таким образом, вы никогда не потеряете контроль. Да, большинство примеров и фрагментов кода из школьной книги по-прежнему отображаются с блокировкой чтения, но это никогда не должно появиться в серьезном рабочем коде.

Зачем? Потому что, если не что иное, состояние блокировки выводит вас из игры, и вы можете просто стоять и молиться, чтобы произошло внешнее событие. Если этого не произойдет, ваш драгоценный код так и останется на ветру …

zmq.h

#define ZMQ_DONTWAIT 1
...
#define ZMQ_NOBLOCK ZMQ_DONTWAIT

Итак, измените дизайн ожидающей сообщения части, чтобы использовать zmq_poll(), снабженные некоторым достаточно коротким (или прямым нулем) тайм-аутом, и следят за каждым приходом сообщения POSACKed с неблокирующим чтением:

zmq_recv( ...., ZMQ_NOBLOCK );

Таким образом, ваш код никогда не останется в ожидании внутри какого-либо из потоков контекста-экземпляра (-ов).

просто не может позволить себе повесить трубку.

1

Другие решения

Других решений пока нет …

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector