клиент gRPC и т. д.

Этот вопрос включает в себя etcd конкретные вещи, но я думаю, что вопрос больше связан с работой с gRPC в общем.
Я пытаюсь создать etcd Watch для некоторых ключей, поскольку документация редкая, я посмотрел на Nokia реализация
Было легко адаптировать код к моим потребностям, и я придумал первую версию, которая работала просто отлично, создавая WatchCreateRequestи запуск обратного вызова при обновлении ключа. Все идет нормально. Затем я попытался добавить более одного ключа для просмотра. Фиаско! ClientAsyncReaderWriter не в состоянии читать / писать в таком случае. Теперь к вопросу.

Если у меня есть следующие участники в моем классе

Watch::Stub watchStub;
CompletionQueue completionQueue;
ClientContext context;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
WatchResponse reply;

и я хочу поддерживать несколько Watches добавленный в мой класс, я предполагаю, что я должен держать несколько переменных за часы, а не как члены класса.
Во-первых, я думаю, WatchResponse reply должен быть один за Watch, Я менее уверен в stream, я должен держать один за Watch? Я почти уверен, что context может быть использован для всех Watches и уверен на 100% stub а также completionQueue можно использовать повторно для всех Watches,
Так что вопрос в том, правильно ли я угадаю? Как насчет безопасности потоков? Не нашел никакой документации, описывающей, какие объекты безопасны для использования из нескольких потоков и где я должен синхронизировать доступ.
Любая ссылка на документацию (не этот) будет оценено!

Протестируйте код, прежде чем разделить участников на отдельные Watch имущество
(нет правильного выключения, я знаю)

using namespace grpc;
class Watcher
{
public:
using Callback = std::function<void(const std::string&, const std::string&)>;

Watcher(std::shared_ptr<Channel> channel) : watchStub(channel)
{
stream = watchStub.AsyncWatch(&context, &completionQueue, (void*) "create");
eventPoller = std::thread([this]() { WaitForEvent(); });
}

void AddWatch(const std::string& key, Callback callback)
{
AddWatch(key, callback, false);
}

void AddWatches(const std::string& key, Callback callback)
{
AddWatch(key, callback, true);
}

private:
void AddWatch(const std::string& key, Callback callback, bool isRecursive)
{
auto insertionResult = callbacks.emplace(key, callback);
if (!insertionResult.second) {
throw std::runtime_error("Event handle already exist.");
}
WatchRequest watch_req;
WatchCreateRequest watch_create_req;
watch_create_req.set_key(key);
if (isRecursive) {
watch_create_req.set_range_end(key + "\xFF");
}

watch_req.mutable_create_request()->CopyFrom(watch_create_req);
stream->Write(watch_req, (void*) insertionResult.first->first.c_str());

stream->Read(&reply, (void*) insertionResult.first->first.c_str());
}

void WaitForEvent()
{
void* got_tag;
bool ok = false;

while (completionQueue.Next(&got_tag, &ok)) {
if (ok == false) {
break;
}
if (got_tag == (void*) "writes done") {
// Signal shutdown
}
else if (got_tag == (void*) "create") {
}
else if (got_tag == (void*) "write") {
}
else {

auto tag = std::string(reinterpret_cast<char*>(got_tag));
auto findIt = callbacks.find(tag);
if (findIt == callbacks.end()) {
throw std::runtime_error("Key \"" + tag + "\"not found");
}

if (reply.events_size()) {
ParseResponse(findIt->second);
}
stream->Read(&reply, got_tag);
}
}
}

void ParseResponse(Callback& callback)
{
for (int i = 0; i < reply.events_size(); ++i) {
auto event = reply.events(i);
auto key = event.kv().key();
callback(event.kv().key(), event.kv().value());
}
}

Watch::Stub watchStub;
CompletionQueue completionQueue;
ClientContext context;
std::unique_ptr<ClientAsyncReaderWriter<WatchRequest, WatchResponse>> stream;
WatchResponse reply;
std::unordered_map<std::string, Callback> callbacks;
std::thread eventPoller;
};

0

Решение

Мне жаль, что я не очень уверен в правильности Watch дизайн здесь. Мне не очень понятно, хотите ли вы создать вызов gRPC для каждого Watch,

В любом случае, каждый вызов gRPC будет иметь свой ClientContext, ClientAsyncReaderWriter, Но stub а также CompletionQueue это не вещь для звонка.

Насколько я знаю, нет централизованного места для поиска потоковобезопасных классов. Вы можете прочитать документ API, чтобы иметь правильное ожидание.

Когда я писал служба отчетов о загрузке асинхронного сервера, единственное место, где я сам добавляю синхронизацию — CompletionQueue, чтобы я не ставил в очередь новые теги в cq, если он выключен.

1

Другие решения

Других решений пока нет …

По вопросам рекламы ammmcru@yandex.ru
Adblock
detector