Java,ActiveMQ,发布订阅模式,手动应答模式案例[通俗易懂]

Java,ActiveMQ,发布订阅模式,手动应答模式案例[通俗易懂]Session.CLIENT_ACKNOWLEDGE(手动应答)创建会话session,需要两个参数,第一个事务,第二个签收:Session s

大家好,欢迎来到IT知识分享网。

Session.CLIENT_ACKNOWLEDGE(手动应答)

创建会话session,需要两个参数,第一个事务,第二个签收:

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

Session.CLIENT_ACKNOWLEDGE(手动应答)

当客户端接收消息:receiver()或onMessage(),通过调用消息(Message)的acknowledge()方法签收消息,在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。

案例代码:

生产者:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQProducerTopic {

    //activemq的服务地址  这里使用的是tcp协议,源码里可以看到
    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final String TOPIC_NAME = "topic02";

    public static void main(String[] args) throws JMSException {
        //创建连接工厂 ,,按照定的url地址给定默认的用户名和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        //通过连接工厂获取connection连接,并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //创建会话session  需要两个参数,第一个事务,第二个签收
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        //创建目的地(选择是队列还是主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        //创建消息的生产者
        MessageProducer messageProducer = session.createProducer(topic);
        //通过使用消息生产者messageProducer生产3条消息发送到队列中
        for (int i = 1; i <= 10; i++) {
            //创建消息   一个字符串消息
            TextMessage textMessage = session.createTextMessage("topic---->" + i);
            //通过messageProducer 发布消息
            messageProducer.send(textMessage);
        }
        //关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("topic消息发送到MQ成功");
    }
}

IT知识分享网

消费者1:

IT知识分享网import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;


public class ActiveMQConsumerTopic {

    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final String TOPIC_NAME = "topic02";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        //通过连接工厂获取connection连接 并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //创建会话session,需要两个参数,第一个事务,第二个签收
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        //创建目的地(选择是队列还是主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        //创建消息的消费者
        MessageConsumer messageConsumer = session.createConsumer(topic);
        //通过监听的机制消费消息
        messageConsumer.setMessageListener((message) -> {
            if (message != null && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费者接受到消息topic---->" + textMessage.getText());
                    //消费者手工去签收消息,另起一个线程(TCP)去通知MQ服务确认消息签收
                    textMessage.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // 不关闭控制台,如果不加这句话,在下面可能在连接的时候直接关闭了,造成无法消费的问题
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

消费者2:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;


public class ActiveMQConsumerTopic2 {

    public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
    public static final String TOPIC_NAME = "topic02";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
        //通过连接工厂获取connection连接 并启动访问
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //创建会话session,需要两个参数,第一个事务,第二个签收
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        //创建目的地(选择是队列还是主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        //创建消息的消费者
        MessageConsumer messageConsumer = session.createConsumer(topic);
        // 接收消息
        while (true) {
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            if (textMessage != null) {
                try {
                    System.out.println("消费者接受到消息topic---->" + textMessage.getText());
                    //消费者手工去签收消息,另起一个线程(TCP)去通知MQ服务确认消息签收
                    textMessage.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            } else {
                break;
            }
        }
        // 不关闭控制台,如果不加这句话,在下面可能在连接的时候直接关闭了,造成无法消费的问题
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}
Java,ActiveMQ,发布订阅模式,手动应答模式案例[通俗易懂]

Number Of Consumers,消费者的数量;

Messages Enqueued,进入队列的消息,进入队列的总数量;

Messages Dequeued,出了队列的消息,消费掉的总数量。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/7082.html

(0)
上一篇 2023-01-03 09:53
下一篇 2023-01-03 09:53

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信