ActiveMQ CMS客户端多线程通过pthread_create确认消息

我对使用本地C ++ CMS Client 3.9.3的activeMQ 5.11代理有疑问。我修改了official site中的示例代码,以使用pthread_create函数生成新线程,并尝试从新线程确认消息(CLIENT_ACK模式)。事实证明存在分段错误。我们如何实现从新产生的线程而不是当前线程返回ack? activeMQ C ++ Client是否支持多线程来确认消息?

void* sendAckThreadFunc(void *pMessage) {
    sleep(1);
    const Message* message = (const Message*) pMessage;
    message->acknowledge();
    printf("ACK sent out.");        
    return NULL;
}



   virtual void onmessage(const Message* message) {

        static int count = 0;

        try {
            count++;
            const TextMessage* textMessage = dynamic_cast<const TextMessage*>(message);
            string text = "";

            if (textMessage != NULL) {
                text = textMessage->getText();
            } else {
                text = "NOT A TEXTMESSAGE!";
            }


            if (clientAck) {

                //message->acknowledge();  --> instead of ack the message in the onmessage function,they use pthread_create to generate a new thread and trying to ack the message from there. Is is a supported way??
                pthread_t sendAckThread;                
                if (pthread_create(&sendAckThread,NULL,sendAckThreadFunc,(void*) message)) {
                    printf("Error occured when create threads.");
                }
            }

            printf("A Message #%d Received: %s\n",count,text.c_str());
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

当我运行使用者时,它甚至无法尝试确认一条消息:

[root@amq6-283-1 examples]# ./simple_async_consumer
=====================================================
Starting the example:
-----------------------------------------------------
The Connection's Transport has been Restored.
Press 'q' to quit
A Message #1 Received: Hello world! from thread 140486368756208
Segmentation fault (core dumped)

这里的事情是,一旦消息对象退出Onmessage函数,所有资源都将消失,并且无法传递给其他线程。

CMS API文档非常清楚地说明了这一点:

        /**
         * Called asynchronously when a new message is received,the message
         * reference can be to any of the Message types. a dynamic cast is used
         * to find out what type of message this is.  The lifetime of this
         * object is only guaranteed to be for the life of the onmessage function
         * after this call-back returns the message may no longer exist. Users should
         * copy the data or clone the message if they wish to retain information that
         * was contained in this Message.
         *
         * It is considered a programming error for this method to throw an
         * exception.  The method has been tagged with the 'throw()' qualifier,* this implies that you application will segfault if you throw an error
         * from an implementation of this method.
         *
         * @param message
         *      Message object {const} pointer recipient does not own.
         */

我了解该示例仅用于串行处理,但是我恳切地要求进行并行处理,这意味着所有操作均不在单个线程中完成。如果它是串行的,则在当前消息得到处理并返回ack之前,当前线程无法接收更多批消息。确实不能满足客户的性能需求。

那么谁能说明CMS API如何设计来处理并行性?接收器线程仅专注于在Onmessage函数内部接收消息,而其他业务线程则专注于业务处理以及根据返回ack的结果。我只想知道CMS API如何处理并行性。这就是他们使用CLIENT ACK模式的方式。任何人都可以举一个并行的例子吗?

baojingxzwj 回答:ActiveMQ CMS客户端多线程通过pthread_create确认消息

不确定“ onMessage ” API文档的哪一部分在这里不清楚,但是为了获得帮助,我将其粘贴在这里:

    /**
     * Called asynchronously when a new message is received,the message
     * reference can be to any of the Message types. a dynamic cast is used
     * to find out what type of message this is.  The lifetime of this
     * object is only guaranteed to be for life of the onMessage function
     * after this call-back returns the message may no longer exist.  Users should
     * copy the data or clone the message if they wish to retain information that
     * was contained in this Message.
     *
     * It is considered a programming error for this method to throw an
     * exception.  The method has been tagged with the 'throw()' qualifier,* this implies that you application will segfault if you throw an error
     * from an implementation of this method.
     *
     * @param message
     *      Message object {const} pointer recipient does not own.
     */
    virtual void onMessage(const Message* message) = 0;

如此看来,如果您想存储消息以供以后确认,则需要使用内置的Message对象“ 克隆” API对其进行克隆。

本文链接:https://www.f2er.com/3117606.html

大家都在问