c # — Шаблон ZeroMQ PUB / SUB с многопоточным отменой поллера

У меня есть два приложения, сервер C ++ и пользовательский интерфейс WPF C #. Код C ++ принимает запросы (откуда угодно / кто угодно) через службу сообщений ZeroMQ [PUB / SUB]. Я использую свой код C # для бэк-тестирования и для создания «бэк-тестов» и их выполнения. Эти обратные тесты могут состоять из множества «модульных тестов», каждый из которых отправляет / получает тысячи сообщений с сервера C ++.

В настоящее время отдельные бэк-тесты работают хорошо и могут отправлять N модульных тестов, каждый из которых содержит тысячи запросов и захватов. Моя проблема — архитектура; когда я отправляю еще один бэк-тест (следующий за первым), я получаю проблему с повторной подпиской на события из-за того, что поток опроса не отменен и не удален. Это приводит к ошибочному выводу. Это может показаться тривиальной проблемой (возможно, для некоторых из вас), но отмена этой задачи опроса в моей текущей конфигурации оказывается проблематичной. Какой-то код …

Мой класс брокера сообщений прост и выглядит

public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
private Task pollingTask;
private NetMQContext context;
private PublisherSocket pubSocket;

private CancellationTokenSource source;
private CancellationToken token;
private ManualResetEvent pollerCancelled;

public MessageBroker()
{
this.source = new CancellationTokenSource();
this.token = source.Token;

StartPolling();
context = NetMQContext.Create();
pubSocket = context.CreatePublisherSocket();
pubSocket.Connect(PublisherAddress);
}

public void Dispatch(Taurus.FeedMux message)
{
pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
}

private void StartPolling()
{
pollerCancelled = new ManualResetEvent(false);
pollingTask = Task.Run(() =>
{
try
{
using (var context = NetMQContext.Create())
using (var subSocket = context.CreateSubscriberSocket())
{
byte[] buffer = null;
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect(SubscriberAddress);
subSocket.Subscribe(String.Empty);
while (true)
{
buffer = subSocket.Receive();
MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
if (this.token.IsCancellationRequested)
this.token.ThrowIfCancellationRequested();
}
}
}
catch (OperationCanceledException)
{
pollerCancelled.Set();
}
}, this.token);
}

private void CancelPolling()
{
source.Cancel();
pollerCancelled.WaitOne();
pollerCancelled.Close();
}

public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }

private bool disposed = false;

protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
if (this.pollingTask != null)
{
CancelPolling();
if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
this.pollingTask.Status == TaskStatus.Faulted ||
this.pollingTask.Status == TaskStatus.Canceled)
{
this.pollingTask.Dispose();
this.pollingTask = null;
}
}
if (this.context != null)
{
this.context.Dispose();
this.context = null;
}
if (this.pubSocket != null)
{
this.pubSocket.Dispose();
this.pubSocket = null;
}
if (this.source != null)
{
this.source.Dispose();
this.source = null;
}
}
disposed = true;
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

~MessageBroker()
{
Dispose(false);
}
}

«Движок» тестирования на истории, используемый для выполнения каждого обратного теста, сначала создает Dictionary содержащий каждый Test (модульный тест) и сообщения для отправки в приложение C ++ для каждого теста.

DispatchTests метод, вот он

private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
broker = new MessageBroker();
broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
testCompleted = new ManualResetEvent(false);

try
{
// Loop through the tests.
foreach (var kvp in feedMuxCollection)
{
testCompleted.Reset();
Test t = kvp.Key;
t.Bets = new List<Taurus.Bet>();
foreach (Taurus.FeedMux mux in kvp.Value)
{
token.ThrowIfCancellationRequested();
broker.Dispatch(mux);
}
broker.Dispatch(new Taurus.FeedMux()
{
type = Taurus.FeedMux.Type.PING,
ping = new Taurus.Ping() { event_id = t.EventID }
});
testCompleted.WaitOne(); // Wait until all messages are received for this test.
}
testCompleted.Close();
}
finally
{
broker.Dispose(); // Dispose the broker.
}
}

PING сообщение в конце, это сказать C ++, что мы закончили. Затем мы форсируем ожидание, чтобы следующий тест [unit] не отправлялся до получения всех возвратов из кода C ++ — мы делаем это, используя ManualResetEvent,

Когда C ++ получает сообщение PING, он отправляет сообщение обратно. Мы обрабатываем полученные сообщения через OnMessageRecieved и ПИН говорит нам, чтобы установить ManualResetEvent.Set() чтобы мы могли продолжить юнит-тестирование; «Следующий, пожалуйста»…

private async void OnMessageRecieved(Taurus.FeedMux mux)
{
string errorMsg = String.Empty;
if (mux.type == Taurus.FeedMux.Type.MSG)
{
// Do stuff.
}
else if (mux.type == Taurus.FeedMux.Type.PING)
{
// Do stuff.

// We are finished reciving messages for this "unit test"testCompleted.Set();
}
}

Моя проблема в том, что broker.Dispose() в конце концов выше никогда не ударил. Я ценю, что, наконец, блоки, которые выполняются в фоновых потоках, не гарантированно будут выполнены.

Вычеркнутый текст выше был из-за того, что я возился с кодом; Я останавливал родительский поток до того, как ребенок закончил. Тем не менее, есть еще проблемы …

Сейчас broker.Dispose() называется правильно, и broker.Dispose() в этом методе я пытаюсь отменить поток опроса и избавиться от Task правильно, чтобы избежать каких-либо нескольких подписок.

Чтобы отменить тему я использую CancelPolling() метод

private void CancelPolling()
{
source.Cancel();
pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
pollerCancelled.Close();
}

но в StartPolling() метод

while (true)
{
buffer = subSocket.Receive();
MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
if (this.token.IsCancellationRequested)
this.token.ThrowIfCancellationRequested();
}

ThrowIfCancellationRequested() никогда не вызывается и поток никогда не отменяется, поэтому никогда не удаляется должным образом. Поток опроса блокируется subSocket.Receive() метод.

Теперь мне не ясно, как добиться того, чего я хочу, мне нужно вызвать broker.Dispose()/PollerCancel() в потоке, отличном от того, который использовался для опроса сообщений, и в некоторых случаях вызывал отмену. Прерывание потока — не то, во что я хочу попасть любой ценой.

По сути, я хочу правильно распоряжаться broker перед выполнением следующего обратного теста, как мне правильно обработать это, разделить опрос и запустить его в отдельном Домене Приложения?

Я пытался, избавляясь от OnMessageRecived обработчик, но он явно выполняется в том же потоке, что и модуль опроса, и это не способ сделать это, без вызова дополнительных потоков он блокируется.

Какой лучший способ добиться того, чего я хочу а также есть ли образец за такой случай, которым я могу следовать?

Спасибо за ваше время.

17

Решение

Вот как я в конце концов обошел это [хотя я открыт для лучшего решения!]

public class FeedMuxMessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
// Vars.
private NetMQContext context;
private PublisherSocket pubSocket;
private Poller poller;

private CancellationTokenSource source;
private CancellationToken token;
private ManualResetEvent pollerCancelled;

/// <summary>
/// Default ctor.
/// </summary>
public FeedMuxMessageBroker()
{
context = NetMQContext.Create();

pubSocket = context.CreatePublisherSocket();
pubSocket.Connect(PublisherAddress);

pollerCancelled = new ManualResetEvent(false);
source = new CancellationTokenSource();
token = source.Token;
StartPolling();
}

#region Methods.
/// <summary>
/// Send the mux message to listners.
/// </summary>
/// <param name="message">The message to dispatch.</param>
public void Dispatch(Taurus.FeedMux message)
{
pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
}

/// <summary>
/// Start polling for messages.
/// </summary>
private void StartPolling()
{
Task.Run(() =>
{
using (var subSocket = context.CreateSubscriberSocket())
{
byte[] buffer = null;
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect(SubscriberAddress);
subSocket.Subscribe(String.Empty);
subSocket.ReceiveReady += (s, a) =>
{
buffer = subSocket.Receive();
if (MessageRecieved != null)
MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
};

// Poll.
poller = new Poller();
poller.AddSocket(subSocket);
poller.PollTillCancelled();
token.ThrowIfCancellationRequested();
}
}, token).ContinueWith(ant =>
{
pollerCancelled.Set();
}, TaskContinuationOptions.OnlyOnCanceled);
}

/// <summary>
/// Cancel polling to allow the broker to be disposed.
/// </summary>
private void CancelPolling()
{
source.Cancel();
poller.Cancel();

pollerCancelled.WaitOne();
pollerCancelled.Close();
}
#endregion // Methods.

#region Properties.
/// <summary>
/// Event that is raised when a message is recived.
/// </summary>
public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }

/// <summary>
/// The address to use for the publisher socket.
/// </summary>
public string PublisherAddress { get { return "tcp://127.0.0.1:6500"; } }

/// <summary>
/// The address to use for the subscriber socket.
/// </summary>
public string SubscriberAddress { get { return "tcp://127.0.0.1:6501"; } }
#endregion // Properties.

#region IDisposable Members.
private bool disposed = false;

/// <summary>
/// Dispose managed resources.
/// </summary>
/// <param name="disposing">Is desposing.</param>
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
CancelPolling();
if (pubSocket != null)
{
pubSocket.Disconnect(PublisherAddress);
pubSocket.Dispose();
pubSocket = null;
}
if (poller != null)
{
poller.Dispose();
poller = null;
}
if (context != null)
{
context.Terminate();
context.Dispose();
context = null;
}
if (source != null)
{
source.Dispose();
source = null;
}
}

// Shared cleanup logic.
disposed = true;
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Finalizer.
/// </summary>
~FeedMuxMessageBroker()
{
Dispose(false);
}
#endregion // IDisposable Members.
}

Таким образом, мы проводим опрос таким же образом, но используя Poller класс от NetMQ. В продолжении Задачи мы установили, поэтому мы уверены, что оба Poller а также Task отменены Мы тогда безопасно распоряжаться …

2

Другие решения

Высший уровень взгляд на предмет

Ваши усилия и усилия, направленные на создание системы тестирования, сигнализируют о том, что ваша воля направлена ​​на разработку строгого и профессионального уровня подхода, который заставил меня сначала поднять голову в знак восхищения таким смелым начинанием.

Хотя тестирование является важным мероприятием для предоставления разумных количественных доказательств того, что тестируемая система соответствует определенным ожиданиям, успех в этом зависит от того, насколько среда тестирования соответствует условиям реального развертывания.

Можно согласиться с тем, что тестирование на других, разных, базах не доказывает, что реальное развертывание будет работать так, как ожидается в среде, которая принципиально отличается от тестируемой (ых).


Элементарный контроль или просто государственный контроль, вот в чем вопрос.

Ваши усилия (по крайней мере, во время публикации OP) сосредоточены на архитектуре кода, которая пытается сохранить экземпляры на месте и переустанавливать внутреннее состояние экземпляра Poller до запуска следующей тестовой батареи.

На мой взгляд, тестирование должно следовать нескольким принципам, если вы стремитесь к профессиональному тестированию:

  • Принцип повторяемости теста (повторные тесты должны давать те же результаты, что позволяет избежать квази-тестирования, которое дает только результат — «лотерея»)

  • Принцип невмешательства в тестирование (повторные прогоны испытаний не должны подвергаться «внешнему» вмешательству, не контролируемому сценарием испытаний)

Сказав это, позвольте мне привести несколько заметок, вдохновленных Гарри Марковицем, Нобелистом, награжденным за его замечательные количественные исследования оптимизации портфеля.

Скорее сделайте шаг назад, чтобы получить контроль над полным жизненным циклом элементов

CACI Simulations, Inc., (одна из компаний Гарри Марковица), разработала в начале 90-х годов свою флагманскую программную среду COMET III — исключительно мощный механизм моделирования для крупных, сложных конструкторских макетов и моделирования производительности процессов, работающих в крупномасштабных вычислениях / сетевые / телекоммуникационные сети.

Самым большим впечатлением от COMET III было то, что он мог генерировать сценарии тестирования, включая настраиваемую предварительную (-ые) предварительную (-ые) предварительную нагрузку (-и), которые приводили к тому, что тестируемые элементы переходили в состояние, подобное тому, что означает «усталость» в механическом эксперименты по испытанию на пытки или что означает водородно-диффузная хрупкость для металлургов атомных электростанций.

Да, как только вы перейдете к деталям низкого уровня о том, как алгоритмы, буферы узлов, выделения памяти, выборки по конвейерной / сбалансированной нагрузке / архитектуре обработки сетки, накладные расходы на отказоустойчивость, политики сбора мусора и ограниченные алгоритмы совместного использования ресурсов работа и воздействие (при реальных условиях нагрузки «давление») сквозная производительность / задержки, эта функция просто необходима.

Это означает, что простого связанного с экземпляром простого управления с точки зрения состояния недостаточно, поскольку оно не обеспечивает средства ни для повторяемости теста, ни для изоляции / невмешательства в тестирование. Проще говоря, даже если вы найдете способ «перезагрузить» экземпляр Poller, это не приведет вас к реалистичному тестированию с гарантированной повторяемостью теста с возможным предварительным прогревом.

Необходим шаг назад и более высокий уровень управления абстракцией и тестовым сценарием.

Как это относится к проблеме ОП?

  • Вместо просто государственного контроля
  • Создать многоуровневую архитектуру / плоскость (и) управления / отдельную сигнализацию

ZeroMQ способ поддержки этой цели

  • Создавайте суперструктуры как нетривиальные шаблоны
  • Используйте полный контроль жизненного цикла экземпляров, используемых в сценариях тестирования
  • Сохранить ZeroMQ-максимы: разделение нуля, блокировка нуля, …
  • Выгода от мульти-контекста ()
1

По вопросам рекламы [email protected]