大家好,欢迎来到IT知识分享网。
Session.CLIENT_ACKNOWLEDGE(手动应答)
创建会话session,需要两个参数,第一个事务,第二个签收:
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session.CLIENT_ACKNOWLEDGE(手动应答)
当客户端接收消息:receiver()或onMessage(),通过调用消息(Message)的acknowledge()方法签收消息,在这种情况下,签收发生在Session层面:签收一个已经消费的消息会自动地签收这个Session所有已消费的收条。
案例代码:
生产者:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ActiveMQProducerTopic {
//activemq的服务地址 这里使用的是tcp协议,源码里可以看到
public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final String TOPIC_NAME = "topic02";
public static void main(String[] args) throws JMSException {
//创建连接工厂 ,,按照定的url地址给定默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
//通过连接工厂获取connection连接,并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话session 需要两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//创建目的地(选择是队列还是主题)
Topic topic = session.createTopic(TOPIC_NAME);
//创建消息的生产者
MessageProducer messageProducer = session.createProducer(topic);
//通过使用消息生产者messageProducer生产3条消息发送到队列中
for (int i = 1; i <= 10; i++) {
//创建消息 一个字符串消息
TextMessage textMessage = session.createTextMessage("topic---->" + i);
//通过messageProducer 发布消息
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("topic消息发送到MQ成功");
}
}
IT知识分享网
消费者1:
IT知识分享网import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class ActiveMQConsumerTopic {
public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final String TOPIC_NAME = "topic02";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
//通过连接工厂获取connection连接 并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话session,需要两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//创建目的地(选择是队列还是主题)
Topic topic = session.createTopic(TOPIC_NAME);
//创建消息的消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
//通过监听的机制消费消息
messageConsumer.setMessageListener((message) -> {
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者接受到消息topic---->" + textMessage.getText());
//消费者手工去签收消息,另起一个线程(TCP)去通知MQ服务确认消息签收
textMessage.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 不关闭控制台,如果不加这句话,在下面可能在连接的时候直接关闭了,造成无法消费的问题
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
消费者2:
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class ActiveMQConsumerTopic2 {
public static final String ACTIVE_URL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
public static final String TOPIC_NAME = "topic02";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVE_URL);
//通过连接工厂获取connection连接 并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//创建会话session,需要两个参数,第一个事务,第二个签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//创建目的地(选择是队列还是主题)
Topic topic = session.createTopic(TOPIC_NAME);
//创建消息的消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
// 接收消息
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if (textMessage != null) {
try {
System.out.println("消费者接受到消息topic---->" + textMessage.getText());
//消费者手工去签收消息,另起一个线程(TCP)去通知MQ服务确认消息签收
textMessage.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
} else {
break;
}
}
// 不关闭控制台,如果不加这句话,在下面可能在连接的时候直接关闭了,造成无法消费的问题
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
Number Of Consumers,消费者的数量;
Messages Enqueued,进入队列的消息,进入队列的总数量;
Messages Dequeued,出了队列的消息,消费掉的总数量。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/7082.html