C Простой RingBuffer — Многопоточность — Поиск критических разделов

поэтому я написал простой C Ring Buffer, который я сейчас тестирую, используя несколько потоков, и мне трудно пытаться заставить код выйти из строя, чтобы я мог идентифицировать критические секции.

Примечание: код написан на C, но я тестирую его в файлах C ++, потому что проще создавать мьютексы потоков и т. Д.

Заголовочный файл:

#ifndef _C_TEST_H_
#define _C_TEST_H_

#include <stdio.h>
#include <mutex>

///////////////////////////////////////////////////////////////////////////////
// Defines and macros
///////////////////////////////////////////////////////////////////////////////
#ifndef __cplusplus
typedef enum { false, true } bool;
#endif

#define RING_BUFFER_SIZE 2000

///////////////////////////////////////////////////////////////////////////////
// Structures, Enumerations, Typedefs
///////////////////////////////////////////////////////////////////////////////

typedef struct Node
{
int val;
struct Node *next;
struct Node *previous;
} Node_T;

typedef enum RB_ERC
{
RB_ERC_NO_ERROR,
RB_ERC_NULL_PTR,
RB_ERC_UNDERFLOW,
RB_ERC_OVERFLOW
} RB_ERC_T;

typedef enum RB_HANDLE_OVERFLOW
{
RB_DECIMATE,
RB_IGNORE_AND_RETURN_ERROR
} RB_HANDLE_OVERFLOW_T;

typedef enum RB_READ_MODE
{
RB_FIFO,
RB_LIFO
} RB_READ_MODE_T;typedef struct RingBuffer
{
int curSize;
RB_HANDLE_OVERFLOW_T handleOverflow;
struct Node *Write;
struct Node *Read;
Node_T buffer[RING_BUFFER_SIZE];
} RING_BUFFER_T;///////////////////////////////////////////////////////////////////////////////
// Prototypes
///////////////////////////////////////////////////////////////////////////////

#ifdef __cplusplus
extern "C" {
#endif

RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T ifOverflow_);

//Return true if the queue has no elements; false if there are elements on the queue
bool RB_IsEmpty(RING_BUFFER_T *rb_);

//Return true if the queue is full; false if there are seats available
bool RB_IsFull(RING_BUFFER_T *rb_);

//Write N elements (length of the array) to the queue
//Note: array values will be read from element 0 to array length
RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_);

//Write 1 element
RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_);

//Dequeue and read N elements (length of the array) into an array
RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_);

//Dequeue and read 1 element
RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_);

#ifdef __cplusplus
}
#endif#endif //_C_TEST_H_

Источник:

#include "CTest.h"
static std::mutex m;

RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T handleOverflow_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
int i;

if(rb_ == 0)
{
return RB_ERC_NULL_PTR;
}

//Initialize this instance of the ring buffer
//Both the read/write pointers should start at the same location
rb_->curSize            = 0;
rb_->Read               = &rb_->buffer[0];
rb_->Write              = &rb_->buffer[0];
rb_->handleOverflow     = handleOverflow_;

//Build the circular doubly-linked list
for(i = 0; i < RING_BUFFER_SIZE; i++)
{
rb_->buffer[i].val = 0;
if(i == 0)
{
//Sentinal Node found. Point the first node to the last element of the array
rb_->buffer[i].previous = &rb_->buffer[(RING_BUFFER_SIZE - 1)];
rb_->buffer[i].next     = &rb_->buffer[i + 1];
}
else if(i < (RING_BUFFER_SIZE - 1) )
{
rb_->buffer[i].next     = &rb_->buffer[i + 1];
rb_->buffer[i].previous = &rb_->buffer[i - 1];
}
else
{
//Sentinal node found. Reached the last element in the array; Point the sentinal
//node to the first element in the array to create a circular linked list.
rb_->buffer[i].next     = &rb_->buffer[0];
rb_->buffer[i].previous = &rb_->buffer[i - 1];
}
}
//m.unlock();
return erc;
}

bool RB_IsEmpty(RING_BUFFER_T *rb_)
{
//m.lock();
//Note: assume rb is valid.

if(rb_->curSize == 0)
{
return true;
}
else
{
return false;
}
//m.unlock();
}

bool RB_IsFull(RING_BUFFER_T *rb_)
{
//m.lock();
//Note: assume rb is valid.

if(rb_->curSize == RING_BUFFER_SIZE)
{
return true;
}
else
{
return false;
}
//m.unlock();
}RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
int i;

if(rb_ == 0 || values_ == 0 || length_ == 0)
{
return RB_ERC_NULL_PTR;
}

switch(rb_->handleOverflow)
{
//Increment through the array and enqueue

//If attempting to write more elements than are available on the queue
//Decimate                  - overwrite old data
//Ignore and return error   - Don't write any data and throw an error
case RB_DECIMATE:
for(i = 0; i < length_; i++)
{
RB_Write(rb_, values_[i] );
}
break;
default:
case RB_IGNORE_AND_RETURN_ERROR:
{
int numSeatsAvailable = (RING_BUFFER_SIZE - rb_->curSize);
if( length_ <= numSeatsAvailable )
{
//Increment through the array and enqueue
for(i = 0; i < length_; i++)
{
RB_Write(rb_, values_[i] );
}
}
else
{
//Attempted to write more elements than are avaialable on the queue
erc = RB_ERC_OVERFLOW;
}
}
break;
}
//m.unlock();
return erc;
}

RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;

if(rb_ == 0)
{
return RB_ERC_NULL_PTR;
}

if( !RB_IsFull(rb_) )
{
//Write the value to the current location, then increment the write pointer
//so that the write pointer is always pointing 1 element ahead of the queue
rb_->Write->val = val_;
rb_->Write      = rb_->Write->next;

rb_->curSize++;
}
else
{
//Overflow
switch(rb_->handleOverflow)
{
case RB_DECIMATE:
//Set the value and increment both the read/write pointers
rb_->Write->val = val_;
rb_->Write      = rb_->Write->next;
rb_->Read       = rb_->Read->next;
break;
default:
case RB_IGNORE_AND_RETURN_ERROR:
erc = RB_ERC_OVERFLOW;
break;
}
}
//m.unlock();
return erc;
}RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;

if(values_ == 0)
{
return RB_ERC_NULL_PTR;
}

//Verify that the amount of data to be read is actually available on the queue
if( length_ <= rb_->curSize )
{
//Increment through the array and dequeue
int i;
for(i = 0; i < length_; i++)
{
//Note: Error conditions have already been checked. Skip the ERC check
(void) RB_Read(rb_, &values_[i], readMode_);
}
}
else
{
//Attempted to read more data than is available on the queue
erc = RB_ERC_UNDERFLOW;
}
//m.unlock();
return erc;
}RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;

if(rb_ == 0 || readVal_ == 0)
{
return RB_ERC_NULL_PTR;
}

if( !RB_IsEmpty(rb_) )
{
switch(readMode_)
{
case RB_LIFO:
//Use the head (Write) to read the most recently written value (newest data)

//Note: The write pointer is always pointing 1 position ahead of the current queue.
rb_->Write = rb_->Write->previous;      //Decrement write pointer

//Read the data
*readVal_       = rb_->Write->val;
rb_->Write->val = 0;                    //Reset read values to 0
break;
default:
case RB_FIFO:
*readVal_       = rb_->Read->val;
rb_->Read->val  = 0;                    //Reset read values to 0

rb_->Read       = rb_->Read->next;      //Increment read pointer
break;
}

rb_->curSize--;
}
else
{
//Attempted to read more data but there is no data available on the queue
erc = RB_ERC_UNDERFLOW;
}
//m.unlock();
return erc;
}

Основное использование CPP для тестов:

#include "CTest.h"#include <iostream>
#include "windows.h"
#include <thread>
using namespace std;

static RING_BUFFER_T test1;

const int dataSize = 300;
const int dataSizeout = 1000;
int sharedValue = 0;
static std::mutex m;void function1()
{
int data[dataSize];
RB_ERC_T erc = RB_ERC_NO_ERROR;

for (int i = 0; i < dataSizeout; i++)
{
erc = RB_Write(&test1, i);
if (erc != RB_ERC_NO_ERROR)
{
printf("Count down errrror %d\n", erc);
}
}

//RB_WriteArray(&test1, data, dataSize);
}void function2()
{
RB_ERC_T erc = RB_ERC_NO_ERROR;

for (int i = 0; i > -dataSizeout; i--)
{
erc = RB_Write(&test1, i);
if (erc != RB_ERC_NO_ERROR)
{
printf("Count down errrror %d\n", erc);
}
}

}

int main()
{
RB_InitRingBuffer(&test1, RB_DECIMATE);

thread p1(function1);
//Sleep(1000);
thread p2(function2);
p1.join();
p2.join();

//Read out 5 at a time
int out;
int cnt = 0;

while(cnt < (2 * dataSizeout) )
{
if (RB_Read(&test1, &out, RB_LIFO) == RB_ERC_NO_ERROR)
{
printf("out[%d] = %d\n", cnt, out);

cnt += 1;
}
}system("Pause");

return 0;
}

Я думаю, что все в основном экземпляре RING_BUFFER_T было бы общими переменными, поэтому везде, где они используются, а это почти везде, они должны быть заключены в мьютексы.

typedef struct RingBuffer
{
int curSize;
RB_HANDLE_OVERFLOW_T handleOverflow;
struct Node *Write;
struct Node *Read;
Node_T buffer[RING_BUFFER_SIZE];
} RING_BUFFER_T;

Я полагаю, что NODE_T также будет, но только для инициализации. Я ошибаюсь или не следует размещать элементы, помещенные в кольцевой буфер, не по порядку, поскольку сейчас не используется мьютекс?

-2

Решение

Ты не будешь выставлять функции RB_IsEmpty а также RB_IsFull так как возвращаемые значения могут быть недействительными сразу. Если вы вызываете их только из чтения / записи, нет необходимости выполнять защиту внутри этих функций.

Как правило, вы должны защищать свою структуру в предоставляемых извне функциях чтения и записи от первого доступа к последнему доступу. Нет необходимости защищать проверку параметров.

Вы не должны двойной замок. Не звони RB_Read от RB_ReadArray, Обеспечить внутреннюю функцию чтения, используемую обоими. То же самое для функций записи.

1

Другие решения

Чтобы узнать о современной реализации C без кольцевого буфера без блокировки, посмотрите исходный код ядра Linux. Это должно дать вам некоторое представление о том, как это делают эксперты, и это проверенный на практике код. Смотрите linux / kfifo.h и соответствующие файлы C.

описание дизайна кольцевого буфера Linux, не знаю, насколько он актуален

Для идей, как сделать это в C ++, вы можете посмотреть на

Статья в Linux Journal о C ++ без очереди

или возможно посмотрите на boost :: lockfree :: queue. Использование C ++, конечно, позволяет вам использовать универсальные типы (шаблоны) и, например, замените указатели функций вызовами с привязкой во время компиляции, что обеспечит еще лучшую производительность, чем C. И вы можете избежать этих надоедливых указателей void *.

2

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector