Как использовать zmq внутри функции (неблокирующим образом) для получения состояния функции по запросу клиента?

Как использовать ZMQ неблокирующим образом, чтобы «обслуживать» статус долго выполняющегося задания, когда клиент запрашивает статус?

Приведенный ниже код иллюстрирует, как долго выполняемая задача может быть временно «прервана» для отправки текущего статуса.

Задача долго выполняется, потому что есть много urls обрабатывать, а не потому, что обработка каждого URL занимает много времени. Это будет означать, что сервер может ответить клиенту с текущим статусом практически мгновенно.

Мне не удалось реализовать эту логику неблокирующим образом, как при использовании флага zmq.NOBLOCK результаты в Again: Resource temporarily unavailableи неиспользование флага означает, что сервер блокирует и ожидает получения сообщения.

Как добиться такой логики / поведения? Я открыт для использования C ++ или Python.

Код сервера:

import zmq

# Socket details
port = "5556"context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

# List of many urls
urls = ['http://google.com','http://yahoo.com']

def process(url):
"""Sample function"""pass

processed_urls = []
for url in urls:

# If a message has been received by a client respond to the message
# The response should be the current status.
if socket.recv(zmq.NOBLOCK):
msg = b"Processed the following urls %s" % str(processed_urls).encode()
socket.send(msg, zmq.NOBLOCK)

# Continue processing the urls
process(url)
processed_urls.append(url)

1

Решение

1-е место — НЕБЛОКИРОВКА — это двусторонний меч

Есть два мира, каждый из которых может, а иногда и блокирует.


1) GIL-сторона и / или процесс-боковая сторона «Блокировка» может появиться ( numpy приведенный ниже пример, но допустимый для любых вызовов, блокирующих синхронизацию, которые не могут иметь легко достижимого неблокирующего обходного пути), в то время как некоторый внешний процесс или глобальная архитектура приложения, возможно, все еще должны иметь (по крайней мере) некоторое отвечающее & поведение рукопожатия даже от таких заведомо «заблокированных» областей кода Python.

2) Второй мир ваш ZeroMQ (Потенциально) -blocking вызов. Установка zmq.CONFLATE может дополнительно помочь вам в PUSH-подобном URL-отчете с клиента, выполняющего долгую работу, на сервер. Задавать CONFLATE как на стороне клиента, так и на стороне сокета для отчетов.

В любом месте, где я могу, я защищаю строго неблокирующие конструкции. Даже школьные примеры кода ZeroMQ должны быть реалистичными и справедливыми, чтобы не блокировать. Мы живем в 3-м тысячелетии, и код блокировки — это представление & разрушительное состояние использования ресурсов, главным образом вне сферы контроля в проектировании распределенных систем профессионального уровня.


Основные леса:

####################################################################
### NEED TO VIEW aHealthSTATUS FROM AN EXTERNAL_UNIVERSE:
### ( A LIGHTWEIGHT EXCULPATED MONITOR TO OBSERVE THE HEALTH OF THE EXECUTION ENVIRONMENT FROM OUTSIDE OF THE VM-JAIL, FROM AN OUTER HYPERVISOR SPACE )
### ( + using signal.signal() )

import signal, os
#-------------------------------------------------------------------
# .SET  ZeroMQ INFRASTRUCTURE:

#-------------------------------------------------------------------
# .DEF  SIG_handler(s)

def SIG_handler_based_HealthREPORTER( SIGnum, aFrame ):
print( 'SIG_handler called to report state with signal', SIGnum )
#---------------------------------------------------------------
# ZeroMQ .send( .SIG/.MSG )

pass;   # yes, all the needed magic comes right here

#-------------------------------------------------------------------
# FINALLY:

raise OSError( "Had to send a HealthREPORT" )                   # ??? do we indeed need this circus to be always played around, except in a DEMO-mode?

#-------------------------------------------------------------------
# .ASSOC SIG_handler:

signal.signal( signal.SIGALRM, SIG_handler_based_HealthREPORTER )   # .SET { SIGALRM: <aHandler> }-assoc

#-------------------------------------------------------------------
# .SET 1[sec]-delay + 1[sec]-interval

signal.setitimer( signal.ITIMER_REAL, 1, 1 )                        # .SET REAL-TIME Interval-based WatchDog -- Decrements interval timer in real time, and delivers SIGALRM upon expiration.# ------------------------------------------------------------------
# FINALLY:#-------------------------------------------------------------------
# .SET / .DEACTIVATE
signal.setitimer( signal.ITIMER_REAL, 0 )                           # .SET / DEACTIVATE

#-------------------------------------------------------------------
# .TERM GRACEFULLY ZeroMQ INFRASTRUCTURE#-------------------------------------------------------------------
# CLEAN EXIT(0)
_exit(0)

Позвольте мне поделиться подходом, используемым для своего рода aHealthMONITOR на действительно длинных принципиально-блочных вычислениях.

Давайте возьмем один пример GIL- «блокирующего» типа вычислений:

#######
# SETUP
signal.signal(    signal.SIGALRM, SIG_ALRM_handler_A )          # .ASSOC { SIGALRM: thisHandler }
signal.setitimer( signal.ITIMER_REAL, 10, 5 )                   # .SET   @5 [sec] interval, after first run, starting after 10[sec] initial-delay
SIG_ALRM_last_ctx_switch_VOLUNTARY = -1                         # .RESET .INIT()

Механика SIGALRM + ITIMER_REAL обеспечить прекрасную автоматизацию, чтобы внешние миры были довольны, по крайней мере, некоторой отзывчивостью (в этом примере частотой ~ 0,2 Гц), но главным образом {up- | down -} — масштабируемой до любого разумного & все же общесистемное стабильное количество времени тестирование обработчика 0,5 [ГГц] на виртуальной системе 1,0 [ГГц] оставлено на усмотрение хакера в противном случае применяется здравый смысл для разумных масштабных факторов и неблокирующих / малозатратных конструкций)

ДЕМО-показания показывают, как involuntary= переключение контекста демонстрирует блокирующую безразличную механику (считайте числа по мере их роста, в то время как произвольные остаются неизменными на протяжении всей части процесса, блокирующей GIL), поэтому аналогично def-издание SIG_ALRM_handler_XYZ() может предоставить решение для вашего независимого репортера по требованию.

SIG_ALRM_handler_A(): activated             Wed Oct 19 14:13:14 2016 ------------------------------ pctxsw(voluntary=53151, involuntary=1169)

>>> SIG_ALRM_last_ctx_switch_VOLUNTARY                              53243
>>> SIG_ALRM_last_ctx_switch_FORCED                                  1169

>>> [ np.math.factorial( 2**f ) for f in range(20) ][:5]            # too fast to notice @5[sec]
[1, 2, 24, 40320, 20922789888000]

#########
# COMPUTE
# len(str([np.math.factorial(2**f) for f in range(20)][-1]))    # .RUN   A "FAT"-BLOCKING CHUNK OF A regex/numpy/C/FORTRAN-calculus

>>> len( str( [ np.math.factorial( 2**f ) for f in range(20) ][-1] ) )
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1234)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1257)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1282)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1305)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:15:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1330)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1352)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1377)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1400)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1425)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1448)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1473)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1496)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1521)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1543)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1568)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1591)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:16:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1616)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1639)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1664)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1687)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1713)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1740)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1767)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1790)  INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD::  Wed Oct 19 14:17:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1812)  INSPECT processes ... ev. add a StateFull-self-Introspection
2771010

В этом контексте процесса был использован этот обработчик:

########################################################################
### SIGALRM_handler_
###

import psutil, resource, os, time

SIG_ALRM_last_ctx_switch_VOLUNTARY = -1
SIG_ALRM_last_ctx_switch_FORCED    = -1

def SIG_ALRM_handler_A( aSigNUM, aFrame ):                              # SIG_ALRM fired evenly even during [ np.math.factorial( 2**f ) for f in range( 20 ) ] C-based processing =======================================
#
# onEntry_ROTATE_SigHandlers() -- MAY set another sub-sampled SIG_ALRM_handler_B() ... { last: 0, 0: handler_A, 1: handler_B, 2: handler_C }
#
# onEntry_SEQ of calls of regular, hierarchically timed MONITORS ( just the SNAPSHOT-DATA ACQUISITION Code-SPRINTs, handle later due to possible TimeDOMAIN overlaps )
#
aProcess         =   psutil.Process( os.getpid() )
aProcessCpuPCT   =         aProcess.cpu_percent( interval = 0 )     # EVENLY-TIME-STEPPED
aCtxSwitchNUMs   =         aProcess.num_ctx_switches()              # THIS PROCESS ( may inspect other per-incident later ... on anomaly )

aVolCtxSwitchCNT = aCtxSwitchNUMs.voluntary
aForcedSwitchCNT = aCtxSwitchNUMs.involuntary

global SIG_ALRM_last_ctx_switch_VOLUNTARY
global SIG_ALRM_last_ctx_switch_FORCED

if (     SIG_ALRM_last_ctx_switch_VOLUNTARY != -1 ):                # .INIT VALUE STILL UNCHANGED
#----------
# .ON_TICK: must process delta(s)
if ( SIG_ALRM_last_ctx_switch_VOLUNTARY == aVolCtxSwitchCNT ):
#
# AN INDIRECT INDICATION OF A LONG-RUNNING WORKLOAD OUTSIDE GIL-STEPPING ( regex / C-lib / FORTRAN / numpy-block et al )
#                                                                                 |||||              vvv
# SIG_:  Wed Oct 19 12:24:32 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=315)  ~~~  0.0
# SIG_:  Wed Oct 19 12:24:37 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=323)  ~~~  0.0
# SIG_:  Wed Oct 19 12:24:42 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=331)  ~~~  0.0
# SIG_:  Wed Oct 19 12:24:47 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=338)  ~~~  0.0
# SIG_:  Wed Oct 19 12:24:52 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=346)  ~~~  0.0
# SIG_:  Wed Oct 19 12:24:57 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=353)  ~~~  0.0
# ...                                                                             |||||              ^^^
# 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000]
# >>>                                                                             |||||              |||
#                                                                                 vvvvv              |||
# SIG_:  Wed Oct 19 12:26:17 2016 ------------------------------ pctxsw(voluntary=49983, involuntary=502)  ~~~  0.0
# SIG_:  Wed Oct 19 12:26:22 2016 ------------------------------ pctxsw(voluntary=49984, involuntary=502)  ~~~  0.0
# SIG_:  Wed Oct 19 12:26:27 2016 ------------------------------ pctxsw(voluntary=49985, involuntary=502)  ~~~  0.0
# SIG_:  Wed Oct 19 12:26:32 2016 ------------------------------ pctxsw(voluntary=49986, involuntary=502)  ~~~  0.0
# SIG_:  Wed Oct 19 12:26:37 2016 ------------------------------ pctxsw(voluntary=49987, involuntary=502)  ~~~  0.0
# SIG_:  Wed Oct 19 12:26:42 2016 ------------------------------ pctxsw(voluntary=49988, involuntary=502)  ~~~  0.0
print(   "SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: ", time.ctime(), 10 * "-",  aProcess.num_ctx_switches(), "{0:_>60s}".format( str( aProcess.threads() ) ),          " INSPECT processes ... ev. add a StateFull-self-Introspection" )
else:
#----------
# .ON_INIT: may report .INIT()
print(   "SIG_ALRM_handler_A(): activated            ", time.ctime(), 30 * "-",  aProcess.num_ctx_switches() )

##########
# FINALLY:

SIG_ALRM_last_ctx_switch_VOLUNTARY = aVolCtxSwitchCNT               # .STO ACTUALs
SIG_ALRM_last_ctx_switch_FORCED    = aForcedSwitchCNT               # .STO ACTUALs
2

Другие решения

Других решений пока нет …

По вопросам рекламы [email protected]