RocketMQ之producer(二)

RocketMQ之producer(二)说过了producer的创建的和启动,这里再说下producer的发送。比如我们在A项目中开发需要和B项目进行调用,这时可以考虑使用MQ发送发送

大家好,欢迎来到IT知识分享网。

说过了producer的创建的和启动,这里再说下producer的发送。比如我们在A项目中开发需要和B项目进行调用,这时可以考虑使用MQ发送发送消息,将A和B进行解耦。

producer的发送代码很简单就是构造message,然后调用send方法就好了,在之前说过producer的发送有三种方式:同步、异步、单向,默认是同步发送,而这里也只介绍同步发送。

producer的发送代码比较长,这里就不贴出来了,只说一下底层方法的几个参数:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

Message msg:构造需要发送的消息

Communication communication:发送消息方式,这里默认为:SYNC

SendCallBack sendCallBack:发送回调,这个是给异步发送使用的,同步发送这个参数为null

long timeout:超时时间,默认为3000

消息发送大致分为以下几步:

  1. 校验状态
  2. 校验Message
  3. 查找topic路由
  4. 根据topic和broker获取消息队列
  5. 消息发送
  6. 修改故障条目

上面的六步都在org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl方法中,接下来就一一进行拆分

  • 校验状态
this.makeSureStateOK();

校验producer的状态是否为RUNNING。在producer的启动的时候会设置一个状态:START_FAILED,启动成功之后会将状态改为RUNNING

  • 校验Message

这里主要是校验topic和body。在发送消息时需要传入topic和消息体,这里会对这两个进行校验

  • 查找topic路由
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

根据传递过来的topic查询对应的topic路由。

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //根据topic为key查询topicPublishInfoTable中的topicPublishInfo数据
    //第一次发送,这个topicPublishInfoTable中没有producer传递过来的topic,只有一个默认的topic:TBW102
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        //更改NameSrv中的topic路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //如果isHaveTopicRouterInfo为false,还是去NameSrv中查找topic相关信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

在这个方法中就看到了topicPublicInfoTable参数,这个参数在上篇文章说到了后续发送消息时会使用到;而后面注释的默认topic:“TBW102”,这里暂时只需记住在我们生产环境是没有这个默认topic的,至于为什么后续会说明。

那么第一次进来topicPublicInfoTable中是没有数据的,得到的结果为null,就会更新NameSrv中的路由信息:this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);

updateTopicRouteInfoFromNameServer方法底层调用的是org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)

其中有个boolean isDefault参数,这个参数为false,只说为true的情况,为true的话会使用我们自己的topic从NameSrv中获取路由信息:topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);

这里会使用netty从NameSrv获取topic路由信息,requestCode为:RequestCode.GET_ROUTEINTO_BY_TOPIC

在介绍NameSrv时说过一个类:DefaultRequestProcessor,NameSrv接收到producer的请求,就会找到对应的requestCode进行处理,回过头来看看NameSrv中是如何处理的。

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetRouteInfoRequestHeader requestHeader =
            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

        //根据请求中的topic信息去RouteInfoManager中类获取topicRouteData数据
        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

        if (topicRouteData != null) {
            //从NameSrv中获取是否是顺序消息
            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                String orderTopicConf =
                    this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                        requestHeader.getTopic());
                topicRouteData.setOrderTopicConf(orderTopicConf);
            }

            byte[] content = topicRouteData.encode();
            response.setBody(content);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }

        //如果找不到topic相关信息就返回找不到路由信息
        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
            + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
        return response;
    }

这段代码就是NameSrv服务端查找topic路由信息。从目前我们讲的流程中,这里是无法获取到topic路由信息的,那么这里就会返回:”No topic route info in name server for the topic:”这么一个提示。

发现没有找到,就会使用默认的topic:“TBW102”进行查找,我们上面说明需要将这个默认的topic给关掉,结果就是肯定也找不到。

在查找路由这一步就停止了,返回的路由信息为null,在消息发送的时候就会报错:

“org.apache.rocketmq.client.exception.MQClientException: No route info of this topic, Topic-202103021”。这样就结束了。。

那什么时候会将topic路由注册到NameSrv上呢?我们在实际开发中如果想要发送消息一般都会先申请一个topic,然后再使用这个申请的topic的进行消息发送,如果像我上面那样随便找一个topic进行消息发送肯定是失败的。在申请topic的时候,会通过broker将topic信息和broker信息注册到NameSrv上。这部分会放在Broker篇进行介绍。

好了,说了那么多都是topic找不到导致无法消息发送的情况,接下来就是topic存在的情况,根据我们自己的topic从NameSrv中获取topic路由,producer端接收到topicRoutData数据之后会和之前来的对比,如果变更了就会进行更新,然后将数据转换为topicPublishInfoTable进行返回。

这里多说一句,topicRoutData中存放的是topic队列信息和broker信息,需要记住的是一个topic会对应多个broker,如果只对应一个broker的话,那么在发送消息的时候如果这台broker正好宕机了,那我们是不是就无法发送消息了?

到这,总算是给说回来了,真是环环相扣!producer找到topic路由,接下来计算发送次数,同步发送默认发送三次(一次正常发送,两次失败尝试)

  • 根据topic和broker获取消息队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

这一步决定了该消息会发往哪台broker。参数中除了topicPublishInfo,还有一个lastBrokerName,这个参数是上一次发送broker的名称。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //sendLatencyFaultEnable:默认不启用 Broker故障延迟机制(消息投递延迟最小的策略)
    if (this.sendLatencyFaultEnable) {
        //broker故障延迟策略
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 验证该消息队列是否可用
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                }
            }

            // 从延迟容错broker列表中挑选一个容错性最好的一个 broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                // 取余挑选其中一个队列
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }

        return tpInfo.selectOneMessageQueue();
    }

    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

这个方法是选取消息队列的,可以看到有这么一个参数:sendLatencyFaultEnable:是否使用延迟策略

1)不使用故障延迟策略

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //第一次选择消息队列时lastBrokerName为null
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        //getAndIncrement使用的是ThreadLocal来新增的
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            //与路由表中的消息队列取模
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            //如果失败的话,下次就会避免使用该broker
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}

public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

方法很简单也有相应注释,不做解释了,看第二种

2)使用故障延迟策略

获取方法和不使用故障延迟策略是一样的,但多了一个判断条件:if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))

判断这个选中的brokerName是否可用

public boolean isAvailable(final String name) {
    final FaultItem faultItem = this.faultItemTable.get(name);
    if (faultItem != null) {
        return faultItem.isAvailable();
    }
    return true;
}

public boolean isAvailable() {
  return (System.currentTimeMillis() - startTimestamp) >= 0;
}

这几行代码现在说可能比较迷糊,这里记住,在说到第六点的时候就明白了

如果没有可用的,就从延迟容错broker列表中选择最好的一个返回

在说消息发送之前,需要说几个时间变量

long beginTimestampFirst = System.currentTimeMillis();
// 记录该时间主要是为了后面修改broker故障延迟策略时间
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;

beginTimestampFirst:记录的是producer在调用send方法时记录的开始时间

beginTimestampPrev:记录的是选择完消息队列的时间,这个时间会在调用selectOneMessageQueue方法后进行更新

endTimestamp:是消息发送完之后记录的时间,这个会在第五步完成之后进行更新

  • 消息发送

找到messageQueue之后就进行消息的发送,在producer端消息发送很简单,就是参数组装然后发送到broker端,真正的逻辑都在broker端,消息发送的requestCode:RequestCode.SEND_MESSAGE。记住这个,broker端接收到producer的请求会根据这个requestCode进行相应的逻辑处理

  • 修改故障条目

这个点是和第四点相辅相成的,说完这点再回过头去看第四点如果启用了故障延迟策略进行messageQueue的选择就简单明了

beginTimestampPrev = System.currentTimeMillis();
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
  callTimeout = true;
  break;
}

如果callTimeout为true,就抛出异常:”sendDefaultImpl call timeout”

endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

updateFaultItem方法最终会调用

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

/**
 * latencyMax:50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L
 * notAvailableDuration:0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L
 * @param currentLatency
 * @return
 */
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }

    return 0;
}

首先判断是否启用了故障延迟策略(默认为false)。如果启用了就会计算不可用时间,根据传递过来的过来从latencyMax进行挑选一个最近大于latencyMax数据对应的小标,从这个下标中从notAvailableDuration找到对应不可用时间,将这个不可用时间更新到latencyFaultTolerance中,后续再次选择消息队列时就会使用到

消息发送到这就结束了,最后放一张producer发送的时序图

RocketMQ之producer(二)

后面还有两个重要的模块:Broker和Consumer。Broker是MQ存储设计的核心;Consumer和producer一样是我们程序员打交道的client。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/60576.html

(0)

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信