Boost :: beast: множественные вызовы async_write вызывают ошибку подтверждения

Я пишу тесты для своего полнодуплексного сервера, и когда я делаю несколько (последовательно) async_write вызовы (хотя и покрыты пряди), я получаю следующую ошибку утверждения от boost::beast в файле boost/beast/websocket/detail/stream_base.hpp:

// If this assert goes off it means you are attempting to
// simultaneously initiate more than one of same asynchronous
// operation, which is not allowed. For example, you must wait
// for an async_read to complete before performing another
// async_read.
//
BOOST_ASSERT(id_ != T::id);

Чтобы воспроизвести проблему на вашем компьютере: Полный код клиента, который воспроизводит эту проблему (MCVE), можно найти Вот. Он не работает по ссылке, потому что вам нужен сервер (на вашем собственном компьютере, извините, так как это невозможно сделать удобно в сети, и лучше объективно показать, что проблема в клиенте, а не на сервере, если Я включаю это здесь). я использовал websocketd создать сервер с помощью команды ./websocketd --ssl --sslkey /path/to/server.key --sslcert /path/to/server.crt --port=8085 ./prog.py где ./prog.py это просто программа Python, которая печатает и сбрасывает (я получил от домашняя страница websocketd).

Вызов, который выполняет запись в клиенте, выглядит следующим образом:

  std::vector<std::vector<std::future<void>>> clients_write_futures(
clients_count);
for (int i = 0; i < clients_count; i++) {
clients_write_futures[i] = std::vector<std::future<void>>(num_of_messages);
for (int j = 0; j < num_of_messages; j++) {
clients_write_futures[i][j] =
clients[i]->write_data_async_future("Hello"); // writing here
}
}

Обратите внимание, что я использую только 1 клиента в примере. Массив клиентов — это просто обобщение для большей нагрузки на сервер при тестировании.

Мои комментарии по проблеме:

  1. Цикл является последовательным; это не так, как я делаю это в нескольких потоках
  2. Должна быть возможность осуществлять связь в полнодуплексной форме, когда на сервер отправляются сообщения с неопределенным числом номеров. Как еще можно сделать полнодуплексную связь?
  3. Я использую пряди, чтобы обернуть мои асинхронные вызовы, чтобы предотвратить любое столкновение в сокете через io_service / io_context
  4. Расследование этого с помощью отладчика показывает, что вторая итерация цикла последовательно терпит неудачу, что означает, что я делаю что-то принципиально неправильное, но я не знаю, что это такое. Другими словами: это, по-видимому, детерминированная проблема.

Что я здесь не так делаю? Как я могу написать неопределенное количество сообщений на мой сервер веб-сокетов?


РЕДАКТИРОВАТЬ:

Сехе, я хочу начать с того, что извиняюсь за неразбериху в коде (не понимал, что это плохо) и поблагодарить тебя за твои усилия. Я хотел бы, чтобы вы спросили меня, почему он структурирован таким (вероятно) организованным, а также хаотичным способом одновременно, и ответ прост: основной — это gtest-код, чтобы увидеть, работает ли мой универсальный универсальный клиент веб-сокетов, который я использую для протестировать мой сервер (который использует тонны многопоточных объектов io_service, которые я считаю чувствительными и нуждающимися в широком тестировании). Я планирую бомбардировать мой сервер множеством клиентов одновременно во время реальных производственных испытаний. Я разместил этот вопрос, потому что поведение клиента я не понимаю. В этом файле я создал MCVE (который люди постоянно запрашивают на SO). Мне потребовалось два часа, чтобы лишить меня кода для его создания, и в конце концов я скопировал код своего тестового устройства gtest (который является устройством на сервере), вставил его в основной файл и проверил, что проблема все еще существует на другом сервере, и удалил его. немного (что явно оказалось недостаточно).

Так почему я не ловлю исключения? Потому что gtest поймает их и посчитает, что тест не пройден. Основным является не производственный код, а клиент. Я многому научился из того, что вы упомянули, и я должен сказать, что глупо бросать и ловить, но я не знал о std :: make_exception_ptr (), поэтому я нашел свой (dumm) способ добиться того же результата: — ). Почему слишком много бесполезных функций: они бесполезны здесь в этом тесте / примере, но в целом я мог бы использовать их позже для других целей, так как этот клиент не только для этого случая.

Теперь вернемся к проблеме: я не понимаю, почему мы должны async_write с strand_, когда он используется последовательно в цикле в основном потоке (я неправильно выразил, что покрыл только обработчик). Я бы понял, почему обработчик покрыт, потому что сокет не потокобезопасный, а многопоточный io_service создаст гонку там. Мы также знаем, что io_service::post сам по себе является потокобезопасным (именно поэтому я подумал, что перенос async_write не нужен). Не могли бы вы объяснить, что при выполнении этого нам не нужно потокобезопасно, что нам нужно обернуть async_write? Я знаю, что вы уже знаете это, но тот же самый аргумент все еще стреляет. Мы упорядочили обработчик и асинхронную очередь, и клиент все еще не рад выполнению нескольких вызовов записи. Чего еще не хватает?

(Кстати, если ты пишешь, потом получаешь будущее, потом читаешь, потом пишешь снова, это работает. Вот почему я использую фьючерсы, чтобы точно определять тестовые случаи и определять порядок своих тестов во времени. Я параноик Вот.)

3

Решение

Вы сказал вы прикрываете async_write с прядью. Но ты такого не делаешь. Все, что вы можете увидеть, это делать оборачивая обработчики завершения в эту цепочку. Но вы публикуете асинхронные операции непосредственно.

Что еще хуже, вы делаете это из основного потока, в то время как в любом из потоков, связанных с вашим, выполняются асинхронные операции WSClient экземпляры, что означает, что вы одновременно обращаетесь к экземплярам объектов, которые не являются поточно-ориентированными.

Это гонка данных, так что вы получите Неопределенное поведение.

Наивным решением может быть:

std::future<void> write_data_async_future(const std::string &data) {
// shared_ptr is used to ensure data's survival
std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();

post(strand_, [=,self=shared_from_this()] {
websock.async_write(
boost::asio::buffer(*data_ptr),
boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, self,
std::placeholders::_1, std::placeholders::_2, data_ptr,
write_promise)));
});

return write_promise->get_future();
}

Но это недостаточно. Теперь вы можете быть уверены, что ни одна из ваших асинхронных операций или их завершений не будет выполняться одновременно, но вы все равно можете опубликовать следующую асинхронную операцию, прежде чем будет вызван обработчик завершения для первой.

Чтобы это исправить, вам просто нужно встать в очередь.

Если честно, я не уверен, почему вы так много внимания уделяете синхронизации с использованием фьючерсов. Это только затрудняет достижение этого. Если вы можете описать, чего вы / функционально / хотите достичь, я могу предложить решение, которое, вероятно, намного короче.

Комментарии к обзору кода

Прежде чем до меня дошло, что это за код, я потратил довольно много времени на чтение вашего кода. Я не хотел бы отнимать у вас записи, которые я сделал по пути.

Предупреждение: это было довольно длительное погружение. Я предоставляю это, потому что некоторые идеи могут помочь вам понять, как вам нужно реструктурировать свой код.

Я начал читать цепочки асинхронного кода до on_handshake (который наборы started_promise значение).

Затем я направился на мальстром, который является вашим main функция. Ваша основная функция — 50 строк кода ?! Наличие нескольких параллельных контейнеров и повторяющихся ручных вложенных циклов через них?

Вот что я получил после некоторого рефакторинга:

int main() {
std::vector<actor> actors(1);

for (auto& a : actors) {
a.client = std::make_shared<WSClient>();
a.session_start_future = a.client->start("127.0.0.1", "8085");
a.messages.resize(50);
}

for (auto& a : actors) { a.session_start_future.get(); }

for (auto& a : actors) { for (auto& m : a.messages) {
m.write_future = a.client->write_data_async_future("Hello");
} }

for (auto& a : actors) { for (auto& m : a.messages) {
m.read_future = a.client->read_data_async_future();
} }

for (auto& a : actors) { for (auto& m : a.messages) {
m.write_future.get();
std::string result = m.read_future.get();
} }
}

Все структуры данных были свернуты в маленький помощник actor:

struct actor {
std::shared_ptr<WSClient> client;
std::future<void> session_start_future;

struct message {
std::string message = GenerateRandomString(20);
std::future<void> write_future;
std::future<std::string> read_future;
};

std::vector<message> messages;
};

Сейчас у нас примерно час обзора кода, без выгоды, за исключением того, что теперь мы можем СКАЗАТЬ, что main делает, и есть некоторая уверенность, что нет никакой тривиальной ошибки с переменной цикла или что-то.

Сбор резервных копий

В начале написания: write_data_async_future, Подождите. Есть также write_data_async а также write_data_sync, Зачем? Вы хотите прочитать

Это хуже, WSClient только передал их предполагаемому не замужем сессия. Почему существует различие между WSClient а также WSClientSession вообще на данный момент? Я говорю, нет ни одного.

Испаряя 30 строк ненужного кода, мы по-прежнему сталкиваемся с той же ошибкой, так что это хорошо.

Где мы были. write_data_async_future, Ах да, нам нужны не будущие версии? Нет. Итак, еще 40 строк кода пропало.

Теперь по-настоящему: write_data_async_future:

std::future<void> write_data_async_future(const std::string &data) {
// shared_ptr is used to ensure data's survival
std::shared_ptr<std::string> data_ptr = std::make_shared<std::string>(data);
std::shared_ptr<std::promise<void> > write_promise = std::make_shared<std::promise<void> >();
websock.async_write(
boost::asio::buffer(*data_ptr),
boost::asio::bind_executor(strand_, std::bind(&WSClientSession::on_write_future, shared_from_this(),
std::placeholders::_1, std::placeholders::_2, data_ptr,
write_promise)));
return write_promise->get_future();
}

Смотрится … ладно. Подожди, есть on_write_future? Это, вероятно, означает, что нам нужно испарить больше строк неиспользуемого кода. Смотря … Да. Пуф, ушел.

К настоящему времени diffstat выглядит так:

  test.cpp | 683 +++++++++++++++++++++++----------------------------------------
1 file changed, 249 insertions(+), 434 deletions(-)

Вернемся к этой функции, поэтому давайте посмотрим на on_write_future:

void on_write_future(boost::system::error_code ec, std::size_t bytes_transferred,
std::shared_ptr<std::string> data_posted,
std::shared_ptr<std::promise<void> > write_promise) {
boost::ignore_unused(bytes_transferred);
boost::ignore_unused(data_posted);

if (ec) {
try {
throw std::runtime_error("Error thrown while performing async write: " + ec.message());
} catch (...) {
write_promise->set_exception(std::current_exception());
}
return;
}
write_promise->set_value();
}

Несколько вопросов. Все пройденное игнорируется. Я знаю, для чего вы передаете shared_ptrs, но, возможно, вам следует передавать их как часть объекта операции, чтобы избежать наличия большого количества отдельных shared-ptrs.

Бросить исключение, просто чтобы его поймать? Ммм. Я не уверен в этом. Возможно, просто установите новое исключение:

if (ec) {
write_promise->set_exception(
std::make_exception_ptr(std::system_error(ec, "async write failed")));
} else {
write_promise->set_value();
}

Даже с этим, теперь есть концептуальная проблема. То, как вы свободно используете get() без ловли в main это означает, что любая ошибка в любом соединении просто прервет все операции. Было бы весьма полезно иметь ошибку, просто прервав одно соединение / сеанс / клиент. Которые в вашем коде все довольно синонимы (а также с io_context а также thread).

Sidenote: вы сохраняете тему как член, но всегда отсоединяете ее. Это означает, что член бесполезен с тех пор.

В этот момент я взял перерыв в рассмотрении, и, как это случилось, я получил мозговую волну, которая показала мне проблему. Непечатыми результатами моих упражнений являются Вот. Обратите внимание, что вы не можете использовать его, потому что это на самом деле не решает проблему. Но это может помочь другими способами?

2

Другие решения

Других решений пока нет …

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector