Как использовать ZMQ неблокирующим образом, чтобы «обслуживать» статус долго выполняющегося задания, когда клиент запрашивает статус?
Приведенный ниже код иллюстрирует, как долго выполняемая задача может быть временно «прервана» для отправки текущего статуса.
Задача долго выполняется, потому что есть много urls
обрабатывать, а не потому, что обработка каждого URL занимает много времени. Это будет означать, что сервер может ответить клиенту с текущим статусом практически мгновенно.
Мне не удалось реализовать эту логику неблокирующим образом, как при использовании флага zmq.NOBLOCK
результаты в Again: Resource temporarily unavailable
и неиспользование флага означает, что сервер блокирует и ожидает получения сообщения.
Как добиться такой логики / поведения? Я открыт для использования C ++ или Python.
Код сервера:
import zmq
# Socket details
port = "5556"context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
# List of many urls
urls = ['http://google.com','http://yahoo.com']
def process(url):
"""Sample function"""pass
processed_urls = []
for url in urls:
# If a message has been received by a client respond to the message
# The response should be the current status.
if socket.recv(zmq.NOBLOCK):
msg = b"Processed the following urls %s" % str(processed_urls).encode()
socket.send(msg, zmq.NOBLOCK)
# Continue processing the urls
process(url)
processed_urls.append(url)
1-е место — НЕБЛОКИРОВКА — это двусторонний меч
1) GIL-сторона и / или процесс-боковая сторона «Блокировка» может появиться ( numpy
приведенный ниже пример, но допустимый для любых вызовов, блокирующих синхронизацию, которые не могут иметь легко достижимого неблокирующего обходного пути), в то время как некоторый внешний процесс или глобальная архитектура приложения, возможно, все еще должны иметь (по крайней мере) некоторое отвечающее & поведение рукопожатия даже от таких заведомо «заблокированных» областей кода Python.
2) Второй мир ваш ZeroMQ (Потенциально) -blocking вызов. Установка zmq.CONFLATE
может дополнительно помочь вам в PUSH-подобном URL-отчете с клиента, выполняющего долгую работу, на сервер. Задавать CONFLATE
как на стороне клиента, так и на стороне сокета для отчетов.
В любом месте, где я могу, я защищаю строго неблокирующие конструкции. Даже школьные примеры кода ZeroMQ должны быть реалистичными и справедливыми, чтобы не блокировать. Мы живем в 3-м тысячелетии, и код блокировки — это представление & разрушительное состояние использования ресурсов, главным образом вне сферы контроля в проектировании распределенных систем профессионального уровня.
####################################################################
### NEED TO VIEW aHealthSTATUS FROM AN EXTERNAL_UNIVERSE:
### ( A LIGHTWEIGHT EXCULPATED MONITOR TO OBSERVE THE HEALTH OF THE EXECUTION ENVIRONMENT FROM OUTSIDE OF THE VM-JAIL, FROM AN OUTER HYPERVISOR SPACE )
### ( + using signal.signal() )
import signal, os
#-------------------------------------------------------------------
# .SET ZeroMQ INFRASTRUCTURE:
#-------------------------------------------------------------------
# .DEF SIG_handler(s)
def SIG_handler_based_HealthREPORTER( SIGnum, aFrame ):
print( 'SIG_handler called to report state with signal', SIGnum )
#---------------------------------------------------------------
# ZeroMQ .send( .SIG/.MSG )
pass; # yes, all the needed magic comes right here
#-------------------------------------------------------------------
# FINALLY:
raise OSError( "Had to send a HealthREPORT" ) # ??? do we indeed need this circus to be always played around, except in a DEMO-mode?
#-------------------------------------------------------------------
# .ASSOC SIG_handler:
signal.signal( signal.SIGALRM, SIG_handler_based_HealthREPORTER ) # .SET { SIGALRM: <aHandler> }-assoc
#-------------------------------------------------------------------
# .SET 1[sec]-delay + 1[sec]-interval
signal.setitimer( signal.ITIMER_REAL, 1, 1 ) # .SET REAL-TIME Interval-based WatchDog -- Decrements interval timer in real time, and delivers SIGALRM upon expiration.# ------------------------------------------------------------------
# FINALLY:#-------------------------------------------------------------------
# .SET / .DEACTIVATE
signal.setitimer( signal.ITIMER_REAL, 0 ) # .SET / DEACTIVATE
#-------------------------------------------------------------------
# .TERM GRACEFULLY ZeroMQ INFRASTRUCTURE#-------------------------------------------------------------------
# CLEAN EXIT(0)
_exit(0)
Позвольте мне поделиться подходом, используемым для своего рода aHealthMONITOR
на действительно длинных принципиально-блочных вычислениях.
Давайте возьмем один пример GIL- «блокирующего» типа вычислений:
#######
# SETUP
signal.signal( signal.SIGALRM, SIG_ALRM_handler_A ) # .ASSOC { SIGALRM: thisHandler }
signal.setitimer( signal.ITIMER_REAL, 10, 5 ) # .SET @5 [sec] interval, after first run, starting after 10[sec] initial-delay
SIG_ALRM_last_ctx_switch_VOLUNTARY = -1 # .RESET .INIT()
Механика SIGALRM
+ ITIMER_REAL
обеспечить прекрасную автоматизацию, чтобы внешние миры были довольны, по крайней мере, некоторой отзывчивостью (в этом примере частотой ~ 0,2 Гц), но главным образом {up- | down -} — масштабируемой до любого разумного & все же общесистемное стабильное количество времени — тестирование обработчика 0,5 [ГГц] на виртуальной системе 1,0 [ГГц] оставлено на усмотрение хакера — в противном случае применяется здравый смысл для разумных масштабных факторов и неблокирующих / малозатратных конструкций)
ДЕМО-показания показывают, как involuntary=
переключение контекста демонстрирует блокирующую безразличную механику (считайте числа по мере их роста, в то время как произвольные остаются неизменными на протяжении всей части процесса, блокирующей GIL), поэтому аналогично def
-издание SIG_ALRM_handler_XYZ()
может предоставить решение для вашего независимого репортера по требованию.
SIG_ALRM_handler_A(): activated Wed Oct 19 14:13:14 2016 ------------------------------ pctxsw(voluntary=53151, involuntary=1169)
>>> SIG_ALRM_last_ctx_switch_VOLUNTARY 53243
>>> SIG_ALRM_last_ctx_switch_FORCED 1169
>>> [ np.math.factorial( 2**f ) for f in range(20) ][:5] # too fast to notice @5[sec]
[1, 2, 24, 40320, 20922789888000]
#########
# COMPUTE
# len(str([np.math.factorial(2**f) for f in range(20)][-1])) # .RUN A "FAT"-BLOCKING CHUNK OF A regex/numpy/C/FORTRAN-calculus
>>> len( str( [ np.math.factorial( 2**f ) for f in range(20) ][-1] ) )
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:15:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1234) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:15:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1257) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:15:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1282) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:15:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1305) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:15:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1330) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1352) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1377) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1400) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1425) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1448) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1473) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1496) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1521) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:44 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1543) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:49 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1568) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:54 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1591) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:16:59 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1616) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:04 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1639) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:09 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1664) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:14 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1687) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:19 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1713) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:24 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1740) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:29 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1767) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:34 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1790) INSPECT processes ... ev. add a StateFull-self-Introspection
SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: Wed Oct 19 14:17:39 2016 ------------------------------ pctxsw(voluntary=53366, involuntary=1812) INSPECT processes ... ev. add a StateFull-self-Introspection
2771010
В этом контексте процесса был использован этот обработчик:
########################################################################
### SIGALRM_handler_
###
import psutil, resource, os, time
SIG_ALRM_last_ctx_switch_VOLUNTARY = -1
SIG_ALRM_last_ctx_switch_FORCED = -1
def SIG_ALRM_handler_A( aSigNUM, aFrame ): # SIG_ALRM fired evenly even during [ np.math.factorial( 2**f ) for f in range( 20 ) ] C-based processing =======================================
#
# onEntry_ROTATE_SigHandlers() -- MAY set another sub-sampled SIG_ALRM_handler_B() ... { last: 0, 0: handler_A, 1: handler_B, 2: handler_C }
#
# onEntry_SEQ of calls of regular, hierarchically timed MONITORS ( just the SNAPSHOT-DATA ACQUISITION Code-SPRINTs, handle later due to possible TimeDOMAIN overlaps )
#
aProcess = psutil.Process( os.getpid() )
aProcessCpuPCT = aProcess.cpu_percent( interval = 0 ) # EVENLY-TIME-STEPPED
aCtxSwitchNUMs = aProcess.num_ctx_switches() # THIS PROCESS ( may inspect other per-incident later ... on anomaly )
aVolCtxSwitchCNT = aCtxSwitchNUMs.voluntary
aForcedSwitchCNT = aCtxSwitchNUMs.involuntary
global SIG_ALRM_last_ctx_switch_VOLUNTARY
global SIG_ALRM_last_ctx_switch_FORCED
if ( SIG_ALRM_last_ctx_switch_VOLUNTARY != -1 ): # .INIT VALUE STILL UNCHANGED
#----------
# .ON_TICK: must process delta(s)
if ( SIG_ALRM_last_ctx_switch_VOLUNTARY == aVolCtxSwitchCNT ):
#
# AN INDIRECT INDICATION OF A LONG-RUNNING WORKLOAD OUTSIDE GIL-STEPPING ( regex / C-lib / FORTRAN / numpy-block et al )
# ||||| vvv
# SIG_: Wed Oct 19 12:24:32 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=315) ~~~ 0.0
# SIG_: Wed Oct 19 12:24:37 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=323) ~~~ 0.0
# SIG_: Wed Oct 19 12:24:42 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=331) ~~~ 0.0
# SIG_: Wed Oct 19 12:24:47 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=338) ~~~ 0.0
# SIG_: Wed Oct 19 12:24:52 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=346) ~~~ 0.0
# SIG_: Wed Oct 19 12:24:57 2016 ------------------------------ pctxsw(voluntary=48714, involuntary=353) ~~~ 0.0
# ... ||||| ^^^
# 00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000]
# >>> ||||| |||
# vvvvv |||
# SIG_: Wed Oct 19 12:26:17 2016 ------------------------------ pctxsw(voluntary=49983, involuntary=502) ~~~ 0.0
# SIG_: Wed Oct 19 12:26:22 2016 ------------------------------ pctxsw(voluntary=49984, involuntary=502) ~~~ 0.0
# SIG_: Wed Oct 19 12:26:27 2016 ------------------------------ pctxsw(voluntary=49985, involuntary=502) ~~~ 0.0
# SIG_: Wed Oct 19 12:26:32 2016 ------------------------------ pctxsw(voluntary=49986, involuntary=502) ~~~ 0.0
# SIG_: Wed Oct 19 12:26:37 2016 ------------------------------ pctxsw(voluntary=49987, involuntary=502) ~~~ 0.0
# SIG_: Wed Oct 19 12:26:42 2016 ------------------------------ pctxsw(voluntary=49988, involuntary=502) ~~~ 0.0
print( "SIG_ALRM_handler_A(): A SUSPECT CPU-LOAD:: ", time.ctime(), 10 * "-", aProcess.num_ctx_switches(), "{0:_>60s}".format( str( aProcess.threads() ) ), " INSPECT processes ... ev. add a StateFull-self-Introspection" )
else:
#----------
# .ON_INIT: may report .INIT()
print( "SIG_ALRM_handler_A(): activated ", time.ctime(), 30 * "-", aProcess.num_ctx_switches() )
##########
# FINALLY:
SIG_ALRM_last_ctx_switch_VOLUNTARY = aVolCtxSwitchCNT # .STO ACTUALs
SIG_ALRM_last_ctx_switch_FORCED = aForcedSwitchCNT # .STO ACTUALs
Других решений пока нет …