Я работаю над моим анализатором пакетов. есть функция workerConnection (), которая выполняет основные процессы над пакетами следующим образом.
Он читает пакет из _Connection_buffer, который определен как std::string* _Connection_buffer= new std::string[_Max_Element_Connection_buffer]
, а затем разбивает пакеты по пробелам и выполняет с ними некоторые действия.
все работает хорошо, кроме выполнения параллельной обработки workerConnection ().
Как видно из workerConnection (), в первом цикле каждый поток принимает уникальный номер в качестве своего собственного «MyID». затем использует его MyID в качестве смещения для чтения пакетов из _Connection_buffer.
Я уверен, что при параллельной обработке workerConnection () каждый поток обрабатывает не все пакеты, а только пакеты (_Max_Element_Connection_buffer / _ConnectionThreadCount).
void
workerConnection() {
short MyID = 0;
_Connection_tn->_mtx.lock();
for (int n = 0; n < _ConnectionThreadCount; n++) {
if (_Connection_tn->getMyNumber[n] != 100) {
MyID = _Connection_tn->getMyNumber[n];
_Connection_tn->getMyNumber[n] = 100;
break;
}
}
_Connection_tn->_mtx.unlock();
LOG_INFO("Session Worker :%d started", MyID);
long int index = 0;
s_EntryItem* entryItem = NULL;
uint32_t srcIP_num;
uint64_t p_counter=0;
std::string connectionPayload1 = ""; connectionPayload1.reserve(40);
std::string connectionPayload2 = ""; connectionPayload2.reserve(40);
std::string connectionPayload = ""; connectionPayload.reserve(150);
std::string part; part.reserve(30);
std::string srcIP_str; srcIP_str.reserve(15);
std::string t; t.reserve(50);
std::string line; line.reserve(_snapLengthConnection + 1);
std::string Protocol; Protocol.reserve(3);
std::string srcIP_port; Protocol.reserve(7);
std::vector<std::string> _CStorage(_storage_Count_Connection);
struct timeval start, end;
long mtime, secs, usecs;
gettimeofday(&start, NULL);
for (index = MyID; index < _Max_Element_Connection_buffer; index+=_ConnectionThreadCount) {
if (unlikely(p_counter++ >= (_Connection_flag->_Number - MyID)) ) {
LOG_INFO("sleeped");
boost::this_thread::sleep(boost::posix_time::milliseconds(20));
continue;
}
line = _Connection_buffer[index];
if(line.size()<_storage_Count_Connection)
continue;
part = line.substr(_column_ConnectionConditionLocation, ConnectionCondition.length() + 5);
if (part.find("My Packet") == std::string::npos) { //assume always false.
LOG_INFO("Part: %s, Condition: %s", part.c_str(), ConnectionCondition.c_str());
continue;
}
boost::split(_CStorage,line,boost::is_any_of(" "));
t = _CStorage[_column_ConnectionUserIP];
auto endSource = t.find("/", 0);
srcIP_str = t.substr(0, endSource);
try {
srcIP_num = boost::asio::ip::address_v4::from_string(srcIP_str).to_ulong();
} catch (...) {
continue;
}
entryItem = searchIPTable(srcIP_num);
if (entryItem == NULL) {
continue;
}
int ok = processConnection(srcIP_num, connectionPayload1, connectionPayload2, &_CStorage);
if (!ok) {
continue;
}
auto startSource = t.find("/", 8) + 1;
endSource = t.find("-", startSource);
connectionPayload2+=t.substr(startSource, endSource - startSource);
connectionPayload2+=_CStorage[_column_ConnectionProtocol];
entryItem->_mtx.lock();
if (entryItem->InUse != false) {
connectionPayload = entryItem->payload1;
connectionPayload += connectionPayload1;
connectionPayload += entryItem->payload2;
connectionPayload += connectionPayload2;
}
entryItem->_mtx.unlock();
}
gettimeofday(&end, NULL);
secs = end.tv_sec - start.tv_sec;
usecs = end.tv_usec - start.tv_usec;
mtime = ((secs) * 1000 + usecs/1000.0) + 0.5;
LOG_INFO("Worker %d :Elapsed time for %lu times running Connection Worker is %ld millisecs\n",MyID,index, mtime);
LOG_INFO("Connection Worker %d :stopped",MyID);
}
так что теперь есть проблема, которая сводит меня с ума. обработка всех пакетов с использованием одного или 2 или n потоков занимает одинаковое время.
Как уже говорилось ранее, при многопоточности я уверен, что каждый поток обрабатывает только пакеты _Max_Element_Connection_buffer / _ConnectionThreadCount.
Я запустил свою программу на виртуальной Ubuntu 14.04, которая имеет 20G RAM и 4 процессора. ОС хоста Windows 8 и имеет 8 процессоров.
спецификация одного из моих витальных процессоров Ubuntu показана ниже:
процессор: 3
vendor_id: GenuineIntel
семья процессора: 6
модель: 60
Название модели: Intel (R) Core (TM) i5-4460 CPU @ 3,20 ГГц
степпинг: 3
процессор МГц: 3192,718
размер кеша: 6144 КБ
физический идентификатор: 0
братьев и сестер: 4
основной идентификатор: 3
процессорных ядер: 4
результаты обработки моей программы с 1, 2, 4 и 8 потоками следующие:
Количество элементов _Connection_buffer = 10000
используя один поток:
Worker 0 :Elapsed time for 10000 times running Connection Worker is 149 millisecs
используя 2 потока:
Worker 0 :Elapsed time for 10000 times running Connection Worker is 122 millisecs
Worker 1 :Elapsed time for 10001 times running Connection Worker is 127 millisecs
using 4 threads:
Worker 0 :Elapsed time for 10000 times running Connection Worker is 127 millisecs
Worker 1 :Elapsed time for 10002 times running Connection Worker is 129 millisecs
Worker 2 :Elapsed time for 10001 times running Connection Worker is 138 millisecs
Worker 3 :Elapsed time for 10003 times running Connection Worker is 140 millisecs
используя 8 потоков.
Worker 0 :Elapsed time for 10007 times running Connection Worker is 135 millisecs
Worker 1 :Elapsed time for 10000 times running Connection Worker is 154 millisecs
Worker 2 :Elapsed time for 10002 times running Connection Worker is 153 millisecs
Worker 3 :Elapsed time for 10003 times running Connection Worker is 158 millisecs
Worker 4 :Elapsed time for 10006 times running Connection Worker is 169 millisecs
Worker 5 :Elapsed time for 10004 times running Connection Worker is 170 millisecs
Worker 6 :Elapsed time for 10005 times running Connection Worker is 176 millisecs
Worker 7 :Elapsed time for 10001 times running Connection Worker is 178 millisecs
Задача ещё не решена.
Других решений пока нет …