В моем текущем приложении я получаю спектральные данные через спектрометр. Эти данные накапливаются в течение одной секунды, а затем помещаются в кольцевой буфер. На данный момент у меня есть один потребитель, который выталкивает записи из буфера, а затем сохраняет все на диск. Хорошо, все это работает. Теперь мне нужно добавить еще одного потребителя, который параллельно со сбережением выполняет некоторую обработку со спектрами. Таким образом, у меня есть два потребителя, которым нужны одни и те же данные (примечание: они только читают и не изменяют). Хорошо, но это не работает, потому что, если один из потребителей вставит одну запись в буфер, он исчезнет, а другой не получит его. Я предполагаю, что самое простое решение этой проблемы — предоставить каждому потребителю свой круговой буфер. Хорошо, но единственная проблема: записи данных большие. Максимальный размер одной записи составляет около 80 МБ, поэтому для экономии памяти было бы здорово, чтобы в ней не было одинаковых данных дважды. Есть ли лучшее решение?
Примечание: я использую кольцевой буфер, поэтому он гарантирует, что буфер имеет растущий предел.
Надеюсь, ты получаешь свои данные прямо в очередь и не копируешь их слишком много ….
Любое правильное решение, которое будет хранить единственную копию данных, должно было бы синхронизировать всех потребителей, так что только после того, как они все сделали с записью, ее можно извлечь.
Вы можете сохранить свой круговой буфер. Вам нужен только один съемник удалить запись, когда читатели сделали с ней. Я настоятельно рекомендую это съемник быть писатель данных. Таким образом, это был бы единственный парень с правами на запись в очередь, и это все упрощает.
Удалитель может быть подан от потребителей, рассказывающих, что они сделали.
Потребители могут делиться своими смещениями чтения со съемником. Вы можете использовать atomic_store на стороне потребителя и atomic_load на стороне удаления.
Это должно быть что-то вроде этого:
struct Consumer {
...
long offset = 0;
...
Consumer() {
q.remover->add(this);
}
...
void run() {
for(;;) {
entry& e = q.read( offset );
process( e );
atomic_store( &offest, offset + e.size() );
}
}
};
struct Remover {
...
long remove_offset = 0;
std::list<Consumer*> cons;
...
void remove() {
// find lowest read point
long cons_offset = MAX_LONG;
for( auto p : cons ) {
cons_offset = std::min( cons_offset, atomic_load(&p->offset) );
}
// remove up to that point
while( cons_offset > remove_offset ) {
entry& e = q.read(remove_offset);
remove_offset += e.size();
q.remove( e.size() );
}
}
};
Других решений пока нет …