大家好,欢迎来到IT知识分享网。
说过了producer的创建的和启动,这里再说下producer的发送。比如我们在A项目中开发需要和B项目进行调用,这时可以考虑使用MQ发送发送消息,将A和B进行解耦。
producer的发送代码很简单就是构造message,然后调用send方法就好了,在之前说过producer的发送有三种方式:同步、异步、单向,默认是同步发送,而这里也只介绍同步发送。
producer的发送代码比较长,这里就不贴出来了,只说一下底层方法的几个参数:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
Message msg:构造需要发送的消息
Communication communication:发送消息方式,这里默认为:SYNC
SendCallBack sendCallBack:发送回调,这个是给异步发送使用的,同步发送这个参数为null
long timeout:超时时间,默认为3000
消息发送大致分为以下几步:
- 校验状态
- 校验Message
- 查找topic路由
- 根据topic和broker获取消息队列
- 消息发送
- 修改故障条目
上面的六步都在org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl方法中,接下来就一一进行拆分
- 校验状态
this.makeSureStateOK();
校验producer的状态是否为RUNNING。在producer的启动的时候会设置一个状态:START_FAILED,启动成功之后会将状态改为RUNNING
- 校验Message
这里主要是校验topic和body。在发送消息时需要传入topic和消息体,这里会对这两个进行校验
- 查找topic路由
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
根据传递过来的topic查询对应的topic路由。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//根据topic为key查询topicPublishInfoTable中的topicPublishInfo数据
//第一次发送,这个topicPublishInfoTable中没有producer传递过来的topic,只有一个默认的topic:TBW102
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//更改NameSrv中的topic路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//如果isHaveTopicRouterInfo为false,还是去NameSrv中查找topic相关信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
在这个方法中就看到了topicPublicInfoTable参数,这个参数在上篇文章说到了后续发送消息时会使用到;而后面注释的默认topic:“TBW102”,这里暂时只需记住在我们生产环境是没有这个默认topic的,至于为什么后续会说明。
那么第一次进来topicPublicInfoTable中是没有数据的,得到的结果为null,就会更新NameSrv中的路由信息:this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
updateTopicRouteInfoFromNameServer方法底层调用的是org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
其中有个boolean isDefault参数,这个参数为false,只说为true的情况,为true的话会使用我们自己的topic从NameSrv中获取路由信息:topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
这里会使用netty从NameSrv获取topic路由信息,requestCode为:RequestCode.GET_ROUTEINTO_BY_TOPIC
在介绍NameSrv时说过一个类:DefaultRequestProcessor,NameSrv接收到producer的请求,就会找到对应的requestCode进行处理,回过头来看看NameSrv中是如何处理的。
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
//根据请求中的topic信息去RouteInfoManager中类获取topicRouteData数据
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
//从NameSrv中获取是否是顺序消息
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
//如果找不到topic相关信息就返回找不到路由信息
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
这段代码就是NameSrv服务端查找topic路由信息。从目前我们讲的流程中,这里是无法获取到topic路由信息的,那么这里就会返回:”No topic route info in name server for the topic:”这么一个提示。
发现没有找到,就会使用默认的topic:“TBW102”进行查找,我们上面说明需要将这个默认的topic给关掉,结果就是肯定也找不到。
在查找路由这一步就停止了,返回的路由信息为null,在消息发送的时候就会报错:
“org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, Topic-202103021”。这样就结束了。。
那什么时候会将topic路由注册到NameSrv上呢?我们在实际开发中如果想要发送消息一般都会先申请一个topic,然后再使用这个申请的topic的进行消息发送,如果像我上面那样随便找一个topic进行消息发送肯定是失败的。在申请topic的时候,会通过broker将topic信息和broker信息注册到NameSrv上。这部分会放在Broker篇进行介绍。
好了,说了那么多都是topic找不到导致无法消息发送的情况,接下来就是topic存在的情况,根据我们自己的topic从NameSrv中获取topic路由,producer端接收到topicRoutData数据之后会和之前来的对比,如果变更了就会进行更新,然后将数据转换为topicPublishInfoTable进行返回。
这里多说一句,topicRoutData中存放的是topic队列信息和broker信息,需要记住的是一个topic会对应多个broker,如果只对应一个broker的话,那么在发送消息的时候如果这台broker正好宕机了,那我们是不是就无法发送消息了?
到这,总算是给说回来了,真是环环相扣!producer找到topic路由,接下来计算发送次数,同步发送默认发送三次(一次正常发送,两次失败尝试)
- 根据topic和broker获取消息队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
这一步决定了该消息会发往哪台broker。参数中除了topicPublishInfo,还有一个lastBrokerName,这个参数是上一次发送broker的名称。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//sendLatencyFaultEnable:默认不启用 Broker故障延迟机制(消息投递延迟最小的策略)
if (this.sendLatencyFaultEnable) {
//broker故障延迟策略
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 验证该消息队列是否可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 从延迟容错broker列表中挑选一个容错性最好的一个 broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
// 取余挑选其中一个队列
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
这个方法是选取消息队列的,可以看到有这么一个参数:sendLatencyFaultEnable:是否使用延迟策略
1)不使用故障延迟策略
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//第一次选择消息队列时lastBrokerName为null
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
//getAndIncrement使用的是ThreadLocal来新增的
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
//与路由表中的消息队列取模
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
//如果失败的话,下次就会避免使用该broker
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
方法很简单也有相应注释,不做解释了,看第二种
2)使用故障延迟策略
获取方法和不使用故障延迟策略是一样的,但多了一个判断条件:if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
判断这个选中的brokerName是否可用
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
这几行代码现在说可能比较迷糊,这里记住,在说到第六点的时候就明白了
如果没有可用的,就从延迟容错broker列表中选择最好的一个返回
在说消息发送之前,需要说几个时间变量
long beginTimestampFirst = System.currentTimeMillis();
// 记录该时间主要是为了后面修改broker故障延迟策略时间
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
beginTimestampFirst:记录的是producer在调用send方法时记录的开始时间
beginTimestampPrev:记录的是选择完消息队列的时间,这个时间会在调用selectOneMessageQueue方法后进行更新
endTimestamp:是消息发送完之后记录的时间,这个会在第五步完成之后进行更新
- 消息发送
找到messageQueue之后就进行消息的发送,在producer端消息发送很简单,就是参数组装然后发送到broker端,真正的逻辑都在broker端,消息发送的requestCode:RequestCode.SEND_MESSAGE。记住这个,broker端接收到producer的请求会根据这个requestCode进行相应的逻辑处理
- 修改故障条目
这个点是和第四点相辅相成的,说完这点再回过头去看第四点如果启用了故障延迟策略进行messageQueue的选择就简单明了
beginTimestampPrev = System.currentTimeMillis();
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
如果callTimeout为true,就抛出异常:”sendDefaultImpl call timeout”
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
updateFaultItem方法最终会调用
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
/**
* latencyMax:50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L
* notAvailableDuration:0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L
* @param currentLatency
* @return
*/
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
首先判断是否启用了故障延迟策略(默认为false)。如果启用了就会计算不可用时间,根据传递过来的过来从latencyMax进行挑选一个最近大于latencyMax数据对应的小标,从这个下标中从notAvailableDuration找到对应不可用时间,将这个不可用时间更新到latencyFaultTolerance中,后续再次选择消息队列时就会使用到
消息发送到这就结束了,最后放一张producer发送的时序图
后面还有两个重要的模块:Broker和Consumer。Broker是MQ存储设计的核心;Consumer和producer一样是我们程序员打交道的client。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/60576.html