Поэтому я пытаюсь установить простую связь между ZMQ в Python и ZMQ в расширении C / C ++. Python устанавливает контекст, связывает сокет inproc и передает контекст и имя сокета расширению. Расширение устанавливает собственный сокет, подключается и прослушивает сообщения. Затем Python отправляет заголовок, а затем строковое представление словаря в расширение. Довольно простые вещи с использованием сокетов REQ / REP. Тем не менее, по какой-то причине я не могу найти, что вызов socket.send блокируется, и расширение никогда не проходит мимо вызова zmq_recv. У меня есть тестовая среда, где у меня почти точно такой же сценарий, но сокеты не блокируются, и я трижды проверил код, и он должен работать точно так же.
ПИТОН:
import zmq
import cppextension
# No lectures about using threading please. I'm restricted to, in essence
# using this function because of the code base I'm working with.
from thread import start_new_thread
socket = self.zmq_context.socket(zmq.REQ)
socket_name = "inproc://agl"socket.bind(socket_name)
t = start_native_thread(cppextension.actor,
(self.zmq_context, socket_name))
test_send = {"foo": 1, "bar": 2}
# BLOCKS ON THIS LINE VVVVV
socket.send("TEST", flags=zmq.SNDMORE)
socket.send(str(test_send))
socket.recv()
socket.send("STOP")
C / C ++:
// Originally these used std::basic_string<Py_UNICODE> but I reverted
// back to normal std::string so I can use a JSON parsing library.
typedef string pystring;
typedef char pystring_t;
extern "C" PyObject *
actor(PyObject *self, PyObject *args) {
PyObject *py_context, *py_connect_to;
PyThreadState *_save;
void *context;
char *connect_to;
void *socket;
int rc;
if(!PyArg_ParseTuple(args, "OO", &py_context, &py_connect_to)) {
PyErr_SetString(PyExc_TypeError, "Expected two arguments (ZMQ context, name of socket to connect to)");
return NULL;
}
py_context = PyObject_GetAttrString(py_context, "_handle");
if(py_context == NULL) {
PyErr_SetString(PyExc_TypeError, "Could not get '_handle' from context");
return NULL;
}
if(!PyInt_Check(py_context)) {
PyErr_SetString(PyExc_TypeError, "_handle was not an integer");
return NULL;
}
context = (void*)PyInt_AsLong(py_context);
connect_to = new char[PyString_Size(py_connect_to) + 1];
strcpy(connect_to, PyString_AsString(py_connect_to));
_save = PyEval_SaveThread();
//
// GIL-less operation BEGIN
// ** WARNING: Do NOT call any functions that begin with 'Py', or touch any
// data structures that begin with 'Py' while in this section. It *WILL*
// blow up the Python interpreter.
//
socket = zmq_socket(context, ZMQ_REP);
rc = zmq_connect(socket, connect_to);
pystring TEST("TEST");
pystring STOP("STOP");
pystring SUCCESS("SUCCESS");
pystring FAILURE("FAILURE");
if(rc == 0) {
int going = 1;
// Should be able to hold a full megabyte of text, which should be enough
// for any message being passed in.
// Is there a way to query size of the incoming message...?
char buffer[1000000];
while(going) {
// BLOCKS ON THIS LINE VVVVVV
int size = zmq_recv(socket, buffer, 1000000, 0);
if(size == -1) {
// ERROR
continue;
}
// Assume we don't get larger than 1MB of data. Should put a
// check around this at some point, but not right now.
buffer[size] = 0;
pystring fullmsg(buffer);
cout << "ZMQ RECIEVED: " << fullmsg << endl;
if(fullmsg == TEST) {
size = zmq_recv(socket, &buffer, 1000000, 0);
if(size != -1) {
buffer[size] = 0;
pystring json_fullmsg(buffer);
cout << "ZMQ JSON: " << json_fullmsg << endl;
contacts.add(json_fullmsg);
zmq_send(socket, SUCCESS.c_str(), SUCCESS.size() + 1, 0);
}
else {
zmq_send(socket, FAILURE.c_str(), FAILURE.size() + 1, 0);
}
}
else if(fullmsg == STOP) {
going = 0;
zmq_send(socket, SUCCESS.c_str(), SUCCESS.size() + 1, 0);
}
}
}
else {
// ERROR
int err = zmq_errno();
switch(err) {
case EINVAL:
cout << "ZMQ CONNECT ERR: " << "Endpoint supplied is invalid" << endl;
break;
default:
cout << "ZMQ CONNECT ERR: " << err << endl;
break;
}
}
zmq_close(socket);
//
// GIL-less operation END
//
PyEval_RestoreThread(_save);
Py_INCREF(Py_None);
return Py_None;
}
Любая помощь в выяснении того, что здесь происходит, очень ценится.
РЕДАКТИРОВАТЬ: Также обратите внимание, что этот код будет работать в среде, где gevent установил стандартную библиотеку. Это одна из причин, по которой я использую thread.start_new_thread, потому что он был сохранен до того, как произошла сборка обезьян, и я хочу настоящий поток, а не зеленый поток.
две вещи,
потому что вы используете req / rep в вашей измененной версии, «отправить, отправить, recv, отправить …» будет
не работа. оба send / recv ДОЛЖНЫ работать в режиме lock-step (send, recv, send, revc.)
ZMQ_NOBLOCK вызовет исключение EAGAIN, что может означать «подключение к сокету
не завершена, пожалуйста, зайдите позже. «попробуйте установить таймер / сон после привязки и оба
отправить / ПРИЕМ. это то, что вызывает сообщение «Ресурс временно недоступен».
надеюсь это поможет
Мистер. onoffon
Не уверен, что это поможет, но здесь идет:
send()
блоки, и это не просто вопрос никогда не получить ответ? Вы могли бы позвонить send()
с ZMQ_NOBLOCK
и посмотрим, возникнет ли исключение. Если это так, то так и есть send()
не может поставить сообщение в очередь.PAIR
розетки вместо REQ/REP
? Руководство рекомендует это за inproc
связь между потоками.