Я пишу небольшое приложение, которое отслеживает определенный каталог для новых добавленных файлов.
Я хотел бы поместить код мониторинга в отдельный поток, чтобы я мог оставить основной поток свободным для других вещей и отменить поток мониторинга в случае необходимости.
Я был в состоянии кодировать все правильно, кроме одной вещи:
Я не могу выйти из темы мониторинга должным образом, отсюда и этот пост.
Я сигнализирую об объекте события в главном потоке, жду выхода из потока и затем выполняю очистку.
Проблема заключается в моем использовании ReadDirectoryChangesW
так как все работает нормально после того, как я закомментирую этот кусок кода.
Как только дескриптор события сигнализируется, ReadDirectoryChangesW
блокирует поток, который мешает ему «поймать» событие и выйти. Если я добавлю новый файл в каталог, он «разблокируется» ReadDirectoryChangesW
Поток «ловит» событие и выходит.
Чтобы помочь дальше, я сделал маленький MVCE ниже, это иллюстрирует то, что я сказал до сих пор.
#include <iostream>
#include <Windows.h>
#include <map>
struct SThreadParams
{
HANDLE hEvent;
HANDLE hDir;
int processDirectoryChanges(const char *buffer)
{
if (NULL == buffer) return -1;
DWORD offset = 0;
char fileName[MAX_PATH] = "";
FILE_NOTIFY_INFORMATION *fni = NULL;
do
{
fni = (FILE_NOTIFY_INFORMATION*)(&buffer[offset]);
// since we do not use UNICODE,
// we must convert fni->FileName from UNICODE to multibyte
int ret = ::WideCharToMultiByte(CP_ACP, 0, fni->FileName,
fni->FileNameLength / sizeof(WCHAR),
fileName, sizeof(fileName), NULL, NULL);
switch (fni->Action)
{
case FILE_ACTION_ADDED:
{
std::cout << "FILE_ACTION_ADDED " << fileName << std::endl;
}
break;
case FILE_ACTION_REMOVED:
{
std::cout << "FILE_ACTION_REMOVED " << fileName << std::endl;
}
break;
case FILE_ACTION_MODIFIED:
{
std::cout << "FILE_ACTION_MODIFIED " << fileName << std::endl;
}
break;
case FILE_ACTION_RENAMED_OLD_NAME:
{
std::cout << "FILE_ACTION_RENAMED_OLD_NAME " << fileName << std::endl;
}
break;
case FILE_ACTION_RENAMED_NEW_NAME:
{
std::cout << "FILE_ACTION_RENAMED_NEW_NAME " << fileName << std::endl;
}
break;
default:
break;
}
// clear string so we can reuse it
::memset(fileName, '\0', sizeof(fileName));
// advance to next entry
offset += fni->NextEntryOffset;
} while (fni->NextEntryOffset != 0);
return 0;
}
};
DWORD WINAPI thread(LPVOID arg)
{
SThreadParams p = *((SThreadParams *)arg);
OVERLAPPED ovl = { 0 };
DWORD bytesTransferred = 0, error = 0;
char buffer[1024];
if (NULL == (ovl.hEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL)))
{
std::cout << "CreateEvent error = " << ::GetLastError() << std::endl;
return ::GetLastError();
};
do {
if (::ReadDirectoryChangesW(p.hDir, buffer, sizeof(buffer), FALSE,
FILE_NOTIFY_CHANGE_FILE_NAME,
NULL, &ovl, NULL))
{
if (::GetOverlappedResult(p.hDir, &ovl, &bytesTransferred, TRUE))
{
for (int i = 0; i < 5; ++i) std::cout << '=';
std::cout << std::endl;
if (-1 == p.processDirectoryChanges(buffer))
std::cout << "processDirectoryChanges error = " << std::endl;
}
else
{
bytesTransferred = 0;
std::cout << "GetOverlappedResult error = " << ::GetLastError() << std::endl;
}
if (0 == ::ResetEvent(ovl.hEvent))
{
std::cout << "ResetEvent error = " << ::GetLastError() << std::endl;
::CloseHandle(ovl.hEvent);
return ::GetLastError();
}
}
else
{
// we shall just output the error, and try again...
std::cout << "ReadDirectoryChangesW error = " << ::GetLastError() << std::endl;
}
error = ::WaitForSingleObject(p.hEvent, 2000);
} while (WAIT_TIMEOUT == error);
::CloseHandle(ovl.hEvent);
return 0;
}
int main()
{
SThreadParams s;
s.hDir = ::CreateFile(SOME_DIRECTORY,
FILE_LIST_DIRECTORY, FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS, NULL);
if (INVALID_HANDLE_VALUE == s.hDir)
{
std::cout << "CreateFile error = " << ::GetLastError() << std::endl;
return 1;
}
s.hEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL);
if (NULL == s.hEvent)
{
std::cout << "CreateEvent error = " << ::GetLastError() << std::endl;
::CloseHandle(s.hDir);
return 1;
}
HANDLE hThread = ::CreateThread(NULL, 0, thread, (LPVOID)&s, 0, NULL);
if (NULL == hThread)
{
std::cout << "CreateThread error = " << ::GetLastError() << std::endl;
::CloseHandle(s.hDir);
::CloseHandle(s.hEvent);
return 1;
}
std::cout << "press any key to close program..." << std::endl;
std::cin.get();
if (0 == ::CancelIoEx(s.hDir, NULL))
{
std::cout << "CancelIoEx error = " << ::GetLastError() << std::endl;
::CloseHandle(s.hDir);
::CloseHandle(s.hEvent);
return 1;
}
if (0 == ::SetEvent(s.hEvent))
{
std::cout << "SetEvent error = " << ::GetLastError() << std::endl;
::CloseHandle(s.hDir);
::CloseHandle(s.hEvent);
return 1;
}
// wait for thread to exit
DWORD error = ::WaitForSingleObject(hThread, INFINITE);
std::cout << "Thread exited with error code = " << error << std::endl;
::CloseHandle(s.hEvent);
::CloseHandle(s.hDir);
::CloseHandle(hThread);
return 0;
}
Я переехал OVERLAPPED
структура из потока в структуру, которая была передана в поток. Тогда я установил OVERLAPPED.hEvent
принудительно «разблокировать» ReadDirectoryChangesW
, Это, кажется, работает, но пугает меня, потому что я думаю, что это не безопасно / подвержено ошибкам, так как это недокументировано.
Я пытался использовать процедуры завершения, но не добился успеха, так как я новичок во всем этом. Я был в состоянии получать уведомления, но содержимое буфера (тот, заполненный ReadDirectoryChangesW
) не был прочитан должным образом после первого прохода. Я все еще пытаюсь сделать это самостоятельно, но могу воспользоваться помощью.
Я мог бы использовать порт завершения ввода / вывода, но так как я буду контролировать только один каталог, я думаю, что это немного излишне. Если я ошибаюсь, пожалуйста, сообщите мне, как использовать порт завершения ввода / вывода для моего случая, я хотел бы попробовать их.
Учитывая приведенный выше MVCE, можете ли вы дать мне инструкции о том, как изменить код в процедуре потока, чтобы он правильно выходил (без ReadDirectoryChangesW
блокирование).
У меня есть ощущение, что мне придется использовать процедуры завершения. В этом случае я бы смиренно попросил какой-нибудь псевдокод или письменные инструкции, поскольку я впервые их использую.
Каждый раз, когда я чувствую, что добился прогресса, я буду обновлять этот пост соответствующими данными.
Если требуется дополнительная информация / уточнение, оставьте комментарий, и я отвечу.
Спасибо,
С уважением.
Существуют 3 способа сделать асинхронную операцию с файлом:
Вы выбираете худший вариант. Я буду использовать IoCompletionPort на вашем месте. в этом случае вам не нужно создавать события, темы, вызов GetOverlappedResult
, не нужно никаких петель.
все что нужно назвать BindIoCompletionCallback
(или же RtlSetIoCompletionCallback
) на файл и все!
об отмене — CancelIoEx
не существует в XP («Я пытаюсь поддерживать Windows XP»),
но вы можете просто закрыть дескриптор каталога — в этом случае IO будет отменен с STATUS_NOTIFY_CLEANUP
. поэтому код может выглядеть так:
RUNDOWN_REF_EVENT g_rundown; // Run-Down Protection
class SPYDATA :
#ifdef _USE_NT_VERSION_
IO_STATUS_BLOCK
#else
OVERLAPPED
#endif
{
HANDLE _hFile;
LONG _dwRef;
union {
FILE_NOTIFY_INFORMATION _fni;
UCHAR _buf[PAGE_SIZE];
};
void DumpDirectoryChanges()
{
union {
PVOID buf;
PBYTE pb;
PFILE_NOTIFY_INFORMATION pfni;
};
buf = _buf;
for (;;)
{
DbgPrint("%x <%.*S>\n", pfni->Action, pfni->FileNameLength >> 1, pfni->FileName);
ULONG NextEntryOffset = pfni->NextEntryOffset;
if (!NextEntryOffset)
{
break;
}
pb += NextEntryOffset;
}
}
#ifdef _USE_NT_VERSION_
static VOID WINAPI _OvCompRoutine(
_In_ NTSTATUS dwErrorCode,
_In_ ULONG_PTR dwNumberOfBytesTransfered,
_Inout_ PIO_STATUS_BLOCK Iosb
)
{
static_cast<SPYDATA*>(Iosb)->OvCompRoutine(dwErrorCode, (ULONG)dwNumberOfBytesTransfered);
}
#else
static VOID WINAPI _OvCompRoutine(
_In_ DWORD dwErrorCode, // really this is NTSTATUS
_In_ DWORD dwNumberOfBytesTransfered,
_Inout_ LPOVERLAPPED lpOverlapped
)
{
static_cast<SPYDATA*>(lpOverlapped)->OvCompRoutine(dwErrorCode, dwNumberOfBytesTransfered);
}
#endif
VOID OvCompRoutine(NTSTATUS status, DWORD dwNumberOfBytesTransfered)
{
DbgPrint("[%x,%x]\n", status, dwNumberOfBytesTransfered);
if (0 <= status)
{
if (status != STATUS_NOTIFY_CLEANUP)
{
if (dwNumberOfBytesTransfered) DumpDirectoryChanges();
DoRead();
}
else
{
DbgPrint("\n---- NOTIFY_CLEANUP -----\n");
}
}
Release();
g_rundown.ReleaseRundownProtection();
}
~SPYDATA()
{
Cancel();
}
public:
void DoRead()
{
if (g_rundown.AcquireRundownProtection())
{
AddRef();
#ifdef _USE_NT_VERSION_
NTSTATUS status = ZwNotifyChangeDirectoryFile(_hFile, 0, 0, this, this, &_fni, sizeof(_buf), FILE_NOTIFY_VALID_MASK, TRUE);
if (NT_ERROR(status))
{
OvCompRoutine(status, 0);
}
#else
if (!ReadDirectoryChangesW(_hFile, _buf, sizeof(_buf), TRUE, FILE_NOTIFY_VALID_MASK, (PDWORD)&InternalHigh, this, 0))
{
OvCompRoutine(RtlGetLastNtStatus(), 0);
}
#endif
}
}
SPYDATA()
{
_hFile = 0;// ! not INVALID_HANDLE_VALUE because use ntapi for open file
_dwRef = 1;
#ifndef _USE_NT_VERSION_
RtlZeroMemory(static_cast<OVERLAPPED*>(this), sizeof(OVERLAPPED));
#endif
}
void AddRef()
{
InterlockedIncrement(&_dwRef);
}
void Release()
{
if (!InterlockedDecrement(&_dwRef))
{
delete this;
}
}
BOOL Create(POBJECT_ATTRIBUTES poa)
{
IO_STATUS_BLOCK iosb;
NTSTATUS status = ZwOpenFile(&_hFile, FILE_GENERIC_READ, poa, &iosb, FILE_SHARE_VALID_FLAGS, FILE_DIRECTORY_FILE);
if (0 <= status)
{
return
#ifdef _USE_NT_VERSION_
0 <= RtlSetIoCompletionCallback(_hFile, _OvCompRoutine, 0);
#else
BindIoCompletionCallback(_hFile, _OvCompRoutine, 0);
#endif
}
return FALSE;
}
void Cancel()
{
if (HANDLE hFile = InterlockedExchangePointer(&_hFile, 0))
{
NtClose(hFile);
}
}
};
void DemoF()
{
if (g_rundown.Create())
{
STATIC_OBJECT_ATTRIBUTES(oa, "\\systemroot\\tmp");//SOME_DIRECTORY
if (SPYDATA* p = new SPYDATA)
{
if (p->Create(&oa))
{
p->DoRead();
}
MessageBoxW(0, L"wait close program...", L"", MB_OK);
p->Cancel();
p->Release();
}
g_rundown.ReleaseRundownProtection();
g_rundown.WaitForRundown();
}
}
для ожидания, когда все IO закончил, я использую Защита от падения. к сожалению, это не реализовано в пользовательском режиме, но не сложно реализовать эту очень полезную функцию самостоятельно. моя реализация:
class __declspec(novtable) RUNDOWN_REF
{
LONG _LockCount;
protected:
virtual void RundownCompleted() = 0;
public:
RUNDOWN_REF()
{
_LockCount = 1;
}
BOOL AcquireRundownProtection()
{
LONG LockCount = _LockCount, prevLockCount;
do
{
if (!LockCount)
{
return FALSE;
}
LockCount = InterlockedCompareExchange(&_LockCount, LockCount + 1, prevLockCount = LockCount);
} while (LockCount != prevLockCount);
return TRUE;
}
void ReleaseRundownProtection()
{
if (!InterlockedDecrement(&_LockCount))
{
RundownCompleted();
}
}
};
class RUNDOWN_REF_EVENT : public RUNDOWN_REF
{
HANDLE _hEvent;
virtual void RundownCompleted()
{
SetEvent(_hEvent);
}
public:
BOOL Create()
{
return (_hEvent = CreateEvent(0, TRUE, FALSE, 0)) != 0;
}
RUNDOWN_REF_EVENT()
{
_hEvent = 0;
}
~RUNDOWN_REF_EVENT()
{
if (_hEvent) CloseHandle(_hEvent);
}
void WaitForRundown()
{
if (WaitForSingleObject(_hEvent, INFINITE) != WAIT_OBJECT_0) __debugbreak();
}
};
Других решений пока нет …