Мне нужна помощь алгоритма с многопоточной программой, которую я пишу. Это в основном команда cp в unix, но с потоком чтения и потоком записи. Я использую семафоры для синхронизации потоков. У меня есть структуры для данных буфера и потока, определенных как
struct bufType {
char buf[BUFFER_SIZE];
int numBytes;
};
struct threadData {
int fd;
bufType buf;
};
и глобальный массив bufType. Код для моей основной
int main(int argc, const char * argv[])
{
int in, out;
pthread_t Producer, Consumer;
threadData producerData, consumerData;
if (argc != 3)
{
cout << "Error: incorrect number of params" << endl;
exit(0);
}
if ((in = open(argv[1], O_RDONLY, 0666)) == -1)
{
cout << "Error: cannot open input file" << endl;
exit(0);
}
if ((out = open(argv[2], O_WRONLY | O_CREAT, 0666)) == -1)
{
cout << "Cannot create output file" << endl;
exit(0);
}
sem_init(&sem_empty, 0, NUM_BUFFERS);
sem_init(&sem_full, 0, 0);
pthread_create (&Producer, NULL, read_thread, (void *) &producerData);
pthread_create (&Consumer, NULL, write_thread, (void *) &consumerData);
pthread_join(Producer, NULL);
pthread_join(Consumer, NULL);
return 0;
}
и читать и писать темы:
void *read_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;
while((thread_data->buf.numBytes = slow_read(thread_data->fd, thread_data->buf.buf, BUFFER_SIZE)) != 0)
{
sem_post(&sem_full);
sem_wait(&sem_empty);
}
pthread_exit(0);
}
void *write_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;
sem_wait(&sem_full);
slow_write(thread_data->fd, thread_data->buf.buf, thread_data->buf.numBytes);
sem_post(&sem_empty);
pthread_exit(0);
}
Так что моя проблема в том, что назначить моим переменным threadData в main и моей логике семафора в потоках чтения и записи. Я ценю любую помощь, которую вы можете оказать
Будучи Windows-парнем, который не использует файловые дескрипторы, я могу ошибаться с входами и выходами, но я думаю, что это нужно сделать в главном для настройки структур threadData.
producerData.fd = in;
consumerData.fd = out;
Затем объявите один объект типа bufType для обеих структур. Измените, например, определение threadData на
struct threadData {
int fd;
bufType* buf;
};
а в своем основном ты пишешь
bufType buffer;
producerData.buf = &buffer;
consumerData.buf = &buffer;
Тогда оба потока будут использовать общий буфер. В противном случае вы будете выполнять запись в буфер providerData, но буфер consumerData останется пустым (и именно здесь ваш поток записи ищет данные)
Затем вам нужно изменить свою сигнальную логику. Сейчас ваша программа не может принять ввод, который превышает BUFFER_SIZE
потому что ваш поток записи будет писать только один раз. Там должна быть петля вокруг этого. И тогда вам нужен какой-то механизм, который сигнализирует потоку записи, что больше не будет отправлено данных. Например, вы могли бы сделать это
void *read_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;
while((thread_data->buf->numBytes = slow_read(thread_data->fd, thread_data->buf->buf, BUFFER_SIZE)) > 0)
{
sem_post(&sem_full);
sem_wait(&sem_empty);
}
sem_post(&sem_full); // Note that thread_data->buf->numBytes <= 0 now
pthread_exit(0);
}
void *write_thread(void *data)
{
threadData *thread_data;
thread_data = (threadData *) data;sem_wait(&sem_full);
while (thread_data->buf->numBytes > 0)
{
slow_write(thread_data->fd, thread_data->buf->buf, thread_data->buf->numBytes);
sem_post(&sem_empty);
sem_wait(&sem_full);
}
pthread_exit(0);
}
Надеюсь, что больше нет ошибок, не проверял решение. Но концепция должна быть тем, о чем вы просили.
Вы можете использовать общий буферный пул, либо круговой массив, либо связанные списки. Вот ссылка на zip-пример Windows, который похож на то, что вы просите, используя связанные списки как часть системы обмена сообщениями между потоками для буферизации данных. Помимо создания мьютексов, семафоров и потока записи, функции являются небольшими и простыми. mtcopy.zip .