boost — Как ограничить количество запущенных экземпляров в Stack Overflow

У меня есть класс C ++, который выделяет много памяти. Он делает это, вызывая стороннюю библиотеку, которая предназначена для сбоя, если она не может выделить память, и иногда мое приложение создает несколько экземпляров моего класса в параллельных потоках. Слишком много потоков, у меня сбой.
Моя лучшая идея для решения заключается в том, чтобы убедиться, что никогда не будет, скажем, более трех экземпляров, работающих одновременно. (Это хорошая идея?)
И моя текущая лучшая идея для реализации тот это использовать буст-мьютекс. Что-то вроде следующего псевдокода,

MyClass::MyClass(){
my_thread_number = -1; //this is a class variable
while (my_thread_number == -1)
for (int i=0; i < MAX_PROCESSES; i++)
if(try_lock a mutex named i){
my_thread_number = i;
break;
}
//Now I know that my thread has mutex number i and it is allowed to run
}

MyClass::~MyClass(){
release mutex named my_thread_number
}

Как вы видите, я не совсем уверен в точном синтаксисе мьютексов здесь. Итак, подводя итог, мои вопросы

  1. Я на правильном пути, когда я хочу решить мою ошибку памяти, ограничивая количество потоков?
  2. Если да, я должен сделать это с мьютексами или другими способами?
  3. Если да, мой алгоритм звучит правильно?
  4. Есть где-нибудь хороший пример того, как использовать try_lock с буст-мьютексами?

Изменить: я понял, что я говорю о потоках, а не процессы.
Изменить: я участвую в создании приложения, которое может работать как на Linux, так и на Windows …

5

Решение

Вот упрощенный способ реализовать свой собственный «семафор» (так как я не думаю, что у стандартной библиотеки или надстройки есть такой). Это выбирает «кооперативный» подход, и рабочие будут ждать друг друга:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

void the_work(int id)
{
static int running = 0;
std::cout << "worker " << id << " entered (" << running << " running)\n";

static mutex mx;
static condition_variable cv;

// synchronize here, waiting until we can begin work
{
unique_lock<mutex> lk(mx);
cv.wait(lk, phoenix::cref(running) < 3);
running += 1;
}

std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::seconds(2));
std::cout << "worker " << id << " done\n";

// signal one other worker, if waiting
{
lock_guard<mutex> lk(mx);
running -= 1;
cv.notify_one();
}
}

int main()
{
thread_group pool;

for (int i = 0; i < 10; ++i)
pool.create_thread(bind(the_work, i));

pool.join_all();
}

Теперь я бы сказал, что, вероятно, лучше иметь выделенный пул из n рабочих, которые по очереди берут свою работу из очереди:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>

using namespace boost;
using namespace boost::phoenix::arg_names;

class thread_pool
{
private:
mutex mx;
condition_variable cv;

typedef function<void()> job_t;
std::deque<job_t> _queue;

thread_group pool;

boost::atomic_bool shutdown;
static void worker_thread(thread_pool& q)
{
while (auto job = q.dequeue())
(*job)();
}

public:
thread_pool() : shutdown(false) {
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
pool.create_thread(bind(worker_thread, ref(*this)));
}

void enqueue(job_t job)
{
lock_guard<mutex> lk(mx);
_queue.push_back(std::move(job));

cv.notify_one();
}

optional<job_t> dequeue()
{
unique_lock<mutex> lk(mx);
namespace phx = boost::phoenix;

cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));

if (_queue.empty())
return none;

auto job = std::move(_queue.front());
_queue.pop_front();

return std::move(job);
}

~thread_pool()
{
shutdown = true;
{
lock_guard<mutex> lk(mx);
cv.notify_all();
}

pool.join_all();
}
};

void the_work(int id)
{
std::cout << "worker " << id << " entered\n";

// no more synchronization; the pool size determines max concurrency
std::cout << "worker " << id << " start work\n";
this_thread::sleep_for(chrono::seconds(2));
std::cout << "worker " << id << " done\n";
}

int main()
{
thread_pool pool; // uses 1 thread per core

for (int i = 0; i < 10; ++i)
pool.enqueue(bind(the_work, i));
}

PS. Вы можете использовать лямбды C ++ 11 вместо boost :: phoenix там, если хотите.

2

Другие решения

ОБНОВИТЬ Мой другой ответ касается планирования ресурсов среди потоки (после того, как вопрос был уточнен).

Он показывает как семафорный подход к координации работы среди (многих) работников, так и thread_pool ограничить работников в первую очередь и поставить в очередь работу.

В Linux (и, возможно, в других ОС?) Вы можете использовать идиому файла блокировки (но она не поддерживается некоторыми файловыми системами и старыми ядрами).

Я бы предложил использовать межпроцессные объекты синхронизации.

Например, используя Boost Interprocess с именем семафор:

#include <boost/interprocess/sync/named_semaphore.hpp>
#include <boost/thread.hpp>
#include <cassert>

int main()
{
using namespace boost::interprocess;
named_semaphore sem(open_or_create, "ffed38bd-f0fc-4f79-8838-5301c328268c", 0ul);

if (sem.try_wait())
{
std::cout << "Oops, second instance\n";
}
else
{
sem.post();

// feign hard work for 30s
boost::this_thread::sleep_for(boost::chrono::seconds(30));

if (sem.try_wait())
{
sem.remove("ffed38bd-f0fc-4f79-8838-5301c328268c");
}
}
}

Если вы запустите одну копию на заднем плане, новые копии будут «отказываться» запускаться («К сожалению, второй экземпляр») в течение 30 секунд.

У меня есть ощущение, что здесь может быть проще изменить логику. Ммм. Дай мне попробовать.

проходит некоторое время

Хехе. Это было сложнее, чем я думал.

Дело в том, что вы хотите убедиться, что блокировка не сохраняется, когда ваше приложение прерывается или уничтожается. В интересах совместного использования методов переносной обработки сигналов:

#include <boost/interprocess/sync/named_semaphore.hpp>
#include <boost/thread.hpp>
#include <cassert>
#include <boost/asio.hpp>

#define MAX_PROCESS_INSTANCES 3

boost::interprocess::named_semaphore sem(
boost::interprocess::open_or_create,
"4de7ddfe-2bd5-428f-b74d-080970f980be",
MAX_PROCESS_INSTANCES);

// to handle signals:
boost::asio::io_service service;
boost::asio::signal_set sig(service);

int main()
{

if (sem.try_wait())
{
sig.add(SIGINT);
sig.add(SIGTERM);
sig.add(SIGABRT);
sig.async_wait([](boost::system::error_code,int sig){
std::cerr << "Exiting with signal " << sig << "...\n";
sem.post();
});
boost::thread sig_listener([&] { service.run(); });

boost::this_thread::sleep_for(boost::chrono::seconds(3));

service.post([&] { sig.cancel(); });
sig_listener.join();
}
else
{
std::cout << "More than " << MAX_PROCESS_INSTANCES << " instances not allowed\n";
}
}

Там можно многое объяснить. Дайте мне знать, если вы заинтересованы.

НОТА Должно быть совершенно очевидно, что если kill -9 используется в вашем приложении (принудительное завершение), тогда все ставки отключены, и вам придется либо удалить объект Name Semaphore, либо явно разблокировать его (post()).

Вот тестовый прогон в моей системе:

sehe@desktop:/tmp$ (for a in {1..6}; do ./test& done; time wait)
More than 3 instances not allowed
More than 3 instances not allowed
More than 3 instances not allowed
Exiting with signal 0...
Exiting with signal 0...
Exiting with signal 0...

real    0m3.005s
user    0m0.013s
sys 0m0.012s
5

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector