Я использую ActiveMQ-CPP 3.4.5 для подключения из программы C ++ к брокеру сообщений.
Проблема, с которой я столкнулся, связана с политикой повторной доставки.
Сообщения, которые возвращаются в очередь, доставляются немедленно. Это не
поведение я ожидаю. Я ожидаю, что сообщения будут возвращены после определенного периода
времени, которое устанавливается через политику повторной доставки.
Ниже приведен код, демонстрирующий способ настройки политики повторной доставки:
policy = new activemq::core::policies::DefaultRedeliveryPolicy();
policy->setInitialRedeliveryDelay(0);
policy->setRedeliveryDelay(10000);
policy->setMaximumRedeliveries((int)activemq::core::RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES);
connectionFactory.setRedeliveryPolicy(policy);
Как я уже говорил ранее, я исключаю сообщения, которые будут доставлены после 10000 мс, но это не так.
Они возвращаются к потребителю немедленно.
Кто-нибудь знает, что может быть причиной такого поведения?
Вы устанавливаете начальную задержку равной нулю, поэтому они будут доставлены немедленно при первом откате транзакции. Если вы хотите, чтобы они были задержаны в первом цикле повторной доставки, вам также необходимо установить начальную задержку равной 10000.
Когда я посмотрел источники ActiveMQ-CPP, я обнаружил следующий фрагмент кода в файле ActiveMQConsumer.cpp:
if( internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed() ) {
// TODO - Can't do this until we can control object lifetime.
// Start up the delivery again a little later.
// this->internal->scheduler->executeAfterDelay(
// new StartConsumerTask(this), internal->redeliveryDelay);
start();
} else {
start();
}
Таким образом, похоже, что redeliveryDelay вообще не учитывается после отката.
Вот почему, я полагаю, мои сообщения приходят сразу после отката.
Метод onMessage:
void BaseProdListener::onMessage( const cms::Message* message ){
log4cxx::Logger::getLogger("BaseProdListener")->info("onMessage");
_message = message;
try {
const cms::TextMessage* textMessage = dynamic_cast< const cms::TextMessage* >( message );
std::string text = "";
if( textMessage != NULL ) {
text = textMessage->getText();
log4cxx::Logger::getLogger("BaseProdListener")->debug("Received message:" + text);
handleMessage(text);
}
} catch (cms::CMSException& e){
log4cxx::Logger::getLogger("BaseProdListener")->error(e.getStackTraceString());
}
}