ActiveMQ主从故障转移将丢失消息

当涉及故障转移时(仅在主题上),activeMQ丢失大量消息。 生产者在(同时)消费者从同一主题中读取内容时,在该主题中写了1000条消息。在过程的中间,我关闭activeMQ主服务器,然后继续执行该过程,并从activeMQ从属服务器继续。进行转换后,会丢失很多消息(约100条消息)。我正在使用的产品不会丢失消息。 为了持续保持话题我该怎么办? 生产者:

#include <activemq\library\activeMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\activeMQConnection.h>
#include <activemq\core\activeMQConnectionFactory.h>
#include <activemq\core\activeMQSession.h>
#include <activemq\core\activeMQConsumer.h>
#include <activemq\core\activeMqqueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\activeMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\activeMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <fstream>
#include <decaf\lang\Throwable.h>

std::string _amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");
const std::string _username("user");
const std::string _password("pass");
const std::string _host("localhost");
const std::string _destination("Test.AMQ.bogcretu.Topic");

std::string _garbageMessage("GARBAGE0_GARBAGE1_GARBAGE2_GARBAGE3_GARBAGE4_GARBAGE5_GARBAGE6_GARBAGE7_GARBAGE8_GARBAGE9");
int _countMessages = 1000;
int _multiplyFactor = 100;
std::string _bodyMessage = "";

void CreateMessage()
{
    for (int i = 0; i < _multiplyFactor; i++) {
        _bodyMessage += _garbageMessage;
    }
}

int main()
{
    activemq::library::activeMQCPP::initializelibrary();
    CreateMessage();
    activemq::core::activeMQConnectionFactory factory;
    factory.setBrokerURI(_amqURI);
    std::auto_ptr<cms::TextMessage> message;
    std::auto_ptr<cms::Connection> connection(factory.createConnection(_username,_password));

    connection->start();

    std::auto_ptr<cms::Session> session(connection->createSession());
    std::auto_ptr<cms::Destination> destionation(session->createTopic(_destination));
    std::auto_ptr<cms::MessageProducer> producer(session->createProducer(destionation.get()));

    producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);

    for (int i = 0; i < _countMessages; i++) {
        std::stringstream ss;
        ss << i;
        std::string number = ss.str();
        message.reset(session->createTextMessage(number));
        producer->send(message.get());
        std::cout << i << std::endl;
    }

    //message.reset(session->createTextMessage("DONE"));
    //producer->send(message.get());

    //connection->close();

    //activemq::library::activeMQCPP::shutdownlibrary();

    return 0;
}

消费者:

#include <activemq\library\activeMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\activeMQConnection.h>
#include <activemq\core\activeMQConnectionFactory.h>
#include <activemq\core\activeMQSession.h>
#include <activemq\core\activeMQConsumer.h>
#include <activemq\core\activeMqqueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\activeMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\activeMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>

std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");

class MsgListener : public cms::MessageListener
{
public:
    std::string _amqURI;
    cms::Connection *_connection;
    cms::Session* _session;
    cms::Destination* _destination;
    cms::MessageConsumer* _consumer;
    bool _sessionTransacted;
    bool _useTopic;

    MsgListener(std::string amqURI,bool sessionTransacted,bool useTopic = false) : _amqURI(amqURI),_sessionTransacted(sessionTransacted),_useTopic(useTopic),_connection(NULL),_session(NULL),_destination(NULL),_consumer(NULL)
    {
        this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
        this->_connection->start();

        /*if (this->_sessionTransacted == true) {
            this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSactED);
        }
        else {
            this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
        }*/

        this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);

        if (useTopic) {
            this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
        }
        else {
            this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
        }

        this->_consumer = this->_session->createConsumer(this->_destination);
        this->_consumer->setMessageListener(this);

        /*std::cout.flush();
        std::cerr.flush();*/


    }

    ~MsgListener()
    {

    }

    void onmessage(const cms::Message* CMSMessage)
    {
        static int count = 0;

        try
        {

            const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
            std::string text = "";
            if (textMessage != NULL) {
                text = textMessage->getText();
            }
            else {
                text = "NOT A TEXTMESSAGE!";

            }

            std::cout << "(" << count << "," << text << ")" << std::endl;
            count++;

        }
        catch (cms::CMSException& e)
        {
            e.printStackTrace();
        }

        if (this->_sessionTransacted) {
            this->_session->commit();
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
};

int main()
{
    activemq::library::activeMQCPP::initializelibrary();
    MsgListener consumer(amqURI,true,true);
    while (true);
    //activemq::library::activeMQCPP::shutdownlibrary();
}

耐用消费者:

#include <activemq\library\activeMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\activeMQConnection.h>
#include <activemq\core\activeMQConnectionFactory.h>
#include <activemq\core\activeMQSession.h>
#include <activemq\core\activeMQConsumer.h>
#include <activemq\core\activeMqqueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\activeMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\activeMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>

std::string amqURI("failover:(tcp://host1:61616,_consumer(NULL)
    {
        this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
        this->_connection->start();

        /*if (this->_sessionTransacted == true) {
            this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSactED);
        }
        else {
            this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
        }*/

        this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);

        if (useTopic) {
            this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
        }
        else {
            this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
        }

        //this->_consumer = this->_session->createConsumer(this->_destination);



        static const cms::Topic * topic = dynamic_cast<const cms::Topic*>(this->_destination);
        this->_consumer = this->_session->createDurableConsumer(topic,"sub_name","");
        this->_consumer->setMessageListener(this);

        /*std::cout.flush();
        std::cerr.flush();*/


    }

    ~MsgListener()
    {

    }

    void onmessage(const cms::Message* CMSMessage)
    {
        static int count = 0;

        try
        {

            const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
            std::string text = "";
            if (textMessage != NULL) {
                text = textMessage->getText();
            }
            else {
                text = "NOT A TEXTMESSAGE!";
            }

            std::cout << "(" << count << ",true);
    while (true);
    //activemq::library::activeMQCPP::shutdownlibrary();
}
xxnj916 回答:ActiveMQ主从故障转移将丢失消息

如果您希望消息持久化,那么您应该使用队列,或者使用持久主题订阅。无论生产者的持久模式如何,主题本身都不会持久化消息,实际上,如果没有订阅者,并且您向主题发送消息,则该消息将被丢弃,同样,用于控制主题的恒定待处理消息限制的ActiveMQ配置也会丢弃主题无法满足消费者需求的主题上的旧消息,因为主题的服务保证水平较低。

您需要使用队列并在生产者上设置持久性,或者如果您希望将消息写入商店并使用持久性主题订阅,并确保使用分配持久性标志的生产者发送消息,则该生产者会分配持久性标志。在代理故障转移后恢复。

本文链接:https://www.f2er.com/3151715.html

大家都在问