RocketMq 原理

RocketMq 原理https://blog.csdn.net/meilong_whpu/article/details/76921583https://yq.aliyun.com/articles/666439https://blog.csdn.net/rodbate/article/details/78763379https://blog.csdn.net/meilong_whpu/article/d…

大家好,欢迎来到IT知识分享网。

目录

1、架构介绍

4、索引

4.1、index文件数据结构

4.2、存索引

4.3、取索引

5、文件清理

6、producer

7、相关问题


1、架构介绍

RocketMq 原理

namesrv集群:负责接受broker注册的路由信息,并为producer和consumer提供路由查询服务。路由信息包括了broker自身的一些信息,如IP地址,brokerId,brokerName,broker所属集群等,还包括该broker中管理的每个topic对应的多个queue信息。有了路由信息,生产者就可以知道消息该往哪些broker上发,消费者就可以知道从哪些broker的哪些queue上可以拉到消息了。namesrv集群由一个或者多个namesrv机器组成,机器之间相互独立,不产生任何通信

broker集群:负责topic的创建,向namesrv注册路由信息,消息存储,消息查询,消息消费。broker由master broker和slave borker组成,master broker之间无通信,多个master broker用于分摊消息生产时的压力,每个topic对应的消息会被分配到一个broker集群下的所有master broker上,生产者在发送指定topic的消息时,会从所有可写queue中轮询一个queue进行消息发送,这里的queue对应于broker向namesrv注册的路由信息中的写队列数,不同能力的master broker机器可以配置不同数量的可写队列数,可以实现消息的权重分配。另外,由于topic上的消息可以被分配到任意个broker上,这就支持了方便的broker横向扩展。slave broker用于对master broker进行消息同步备份,对外只提供消息读取,不提供消息发送。

producer与consumer:生产者与消费者,其实都属于rocketMq的客户端,一个应用程序中往往都是生产者与消费者都有的。

4、索引

RocketMq 原理

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-store</artifactId>
            <version>4.1.0-incubating</version>
        </dependency>

对应的类:

org.apache.rocketmq.store.index.IndexFile
org.apache.rocketmq.store.index.IndexHeader
org.apache.rocketmq.store.index.IndexService

4.1、index文件数据结构

RocketMq 原理

Index Header:

  • beginTimestamp:该索引文件的第一个消息(Message)的存储时间(落盘时间)  物理位置(pos: 0-7) 8bytes
  • endTimestamp:该索引文件的最后一个消息(Message)的存储时间(落盘时间)  物理位置(pos: 8-15) 8bytes
  • beginPhyoffset:该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息) 物理位置(pos: 16-23) 8bytes
  • beginPhyoffset:该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量 (pos: 24-31) 8bytes
  • hashSlotCount:该索引文件目前的hash slot的个数 (pos: 32-35) 4bytes
  • indexCount:该索引文件目前的索引个数 (pos: 36-39) 4bytes

Slot:

  • slot每个节点保存当前已经拥有多少个index数据了

Index List:

  • key hash value:message key的hash值
  • phyOffset:message在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
  • timeDiff:message的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
  • prevIndex:hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引,则prevIndex=0, 也是消息索引查找时的停止条件。)每个slot位置的第一个消息的prevIndex就是0的

4.2、存索引

    public boolean putKey(String key, long phyOffset, long storeTimestamp) {
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            int keyHash = this.indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = 40 + slotPos * hashSlotSize;
            Object fileLock = null;

            boolean var14;
            try {
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
                timeDiff /= 1000L;
                if (this.indexHeader.getBeginTimestamp() <= 0L) {
                    timeDiff = 0L;
                } else if (timeDiff > 2147483647L) {
                    timeDiff = 2147483647L;
                } else if (timeDiff < 0L) {
                    timeDiff = 0L;
                }

                int absIndexPos = 40 + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize;
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int)timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }

                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);
                var14 = true;
            } catch (Exception var24) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), var24);
                return false;
            } finally {
                if (fileLock != null) {
                    try {
                        ((FileLock)fileLock).release();
                    } catch (IOException var23) {
                        var23.printStackTrace();
                    }
                }

            }

            return var14;
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum);
            return false;
        }
    }

4.3、取索引

    public void selectPhyOffset(List<Long> phyOffsets, String key, int maxNum, long begin, long end, boolean lock) {
        if (this.mappedFile.hold()) {
            int keyHash = this.indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = 40 + slotPos * hashSlotSize;
            Object fileLock = null;

            try {
                if (lock) {
                }

                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                int prevIndexRead;
                if (slotValue > invalidIndex && slotValue <= this.indexHeader.getIndexCount() && this.indexHeader.getIndexCount() > 1) {
                    for(int nextIndexToRead = slotValue; phyOffsets.size() < maxNum; nextIndexToRead = prevIndexRead) {
                        int absIndexPos = 40 + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize;
                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
                        long timeDiff = (long)this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
                        if (timeDiff < 0L) {
                            break;
                        }

                        timeDiff *= 1000L;
                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = timeRead >= begin && timeRead <= end;
                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }

                        if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }
                    }
                }
            } catch (Exception var33) {
                log.error("selectPhyOffset exception ", var33);
            } finally {
                if (fileLock != null) {
                    try {
                        ((FileLock)fileLock).release();
                    } catch (IOException var32) {
                        var32.printStackTrace();
                    }
                }

                this.mappedFile.release();
            }
        }

    }

5、文件清理

由于RocketMQ操作CommitLog、ConsumeQueue文件,都是基于内存映射方法并在启动的时候,会加载commitlog、ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要一种机制来删除已过期的文件。

RocketMQ顺序写Commitlog、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后,将不会再被更新。RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会管这个这个文件上的消息是否被全部消费。默认每个文件的过期时间为72小时。通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。接下来详细分析RocketMQ是如何设计与实现上述机制的。

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DefaultMessageStore.this.cleanFilesPeriodically();
    }
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);


private void cleanFilesPeriodically() {
    this.cleanCommitLogService.run();
    this.cleanConsumeQueueService.run();
}


long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

整个执行过程分为两个大的步骤,第一个步骤:尝试删除过期文件;第二个步骤:重试删除被hange(由于被其他线程引用在第一阶段未删除的文件),在这里再重试一次。

  • fileReservedTime:文件保留时间,也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以被删除。 
  • deletePhysicFilesInterval:删除物理文件的间隔,因为在一次清除过程中,可能需要删除的文件不止一个,该值指定两次删除文件的间隔时间。 
  • destroyMapedFileIntervalForcibly:在清除过期文件时,如果该文件被其他线程所占用(引用次数大于0,比如读取消息),此时会阻止此次删除任务, 同时在第一次试图删除该文件时记录当前时间戳,destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能保留的最大时间,在此时间内,同样可以被拒绝删除,同时会将引用减少1000个,超过该时间间隔后,文件将被强制删除。 

以CommonLog为例:RocketMQ在如下三种情况任意满足之一的情况下将继续执行删除文件操作

  • 到了删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认为凌晨4点。 
  • 判断磁盘空间是否充足,如果不充足,则返回true,表示应该触发过期文件删除操作。 
  • 预留,手工触发,可以通过调用excuteDeleteFilesManualy方法手工触发过期文件删除,目前RocketMQ暂未封装手工触发文件删除的命令。

对于判断磁盘空间是否充足的逻辑

double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;   // @1
cleanImmediately = false;
{
    String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);   // @2
    if (physicRatio > diskSpaceWarningLevelRatio) {  // @3
           boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
           if (diskok) {
                 DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
           }
             cleanImmediately = true;
     } else if (physicRatio > diskSpaceCleanForciblyRatio) { 
           cleanImmediately = true;
     } else {
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
                   DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
             }
      }
      if (physicRatio < 0 || physicRatio > ratio) {
            DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
            return true;
      }
}

6、producer

msgId(producer客户端产生):ip + 进程pid + MessageClientIDSetter.class.getClassLoader().hashCode() + time + counter(AtomicInteger自增变量)

offsetMsgId(broker产生):主机ip + 物理分区的offset

两个Id都可以在rocketqm-console的Message Id 搜到

7、相关问题

RocketMQ支持顺序消费。

RocketMQ解决消息重复消费:

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

  • 记录下每个消息的msgID
  • 新消息来的时候,查看该消息的msgID是否已记录,是则抛弃,否则消费

那么msgID记录在哪里呢?当然是缓存。所以我在解决这个问题的时候,使用了redis缓存。具体做法如下:

  • 消费端接收到消息的时候,调用redis提供的incr方法,以msgID作为key(具有唯一性),value则默认从1开始递增。
  • 当incr返回值为1时,设置其失效时间为两分钟以后(每个msgID保留两分钟足矣!),并且该消息需要被消费。
  • 当incr返回值大于1时,则忽略该消息。

RocketMQ解决消息丢失:

  • 判断消息丢失:1、分布式链路追踪系统:如果公司的基础技术建设比较完善,完全可以靠分布式链路追踪系统,来分析消息的全链路。常见有:Cat、Zipkin、Pinpoint、SkyWalking;2、发送有序性消息验证
  • 消息生产端:保证有消息返回

  • 消息存储:1、刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘(在收到消息后,将消息写入磁盘后再给
    Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消
    息,恢复后还可以继续消费。);2、broker为集群模式,配置消息复制(如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的Broker 可以替代宕机的 Broker,也不会发生消息丢失。)

  • 消息消费端:不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

RocketMQ高可用:

部署方式 优点 缺点 备注
单个Master模式 一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用    
多个Master模式 配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高。 单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会收到影响

当使用多master无slave的集群搭建方式时,master的brokerRole配置必须为ASYNC_MASTER。如果配置为SYNC_MASTER,则producer发送消息时,返回值的SendStatus会一直是SLAVE_NOT_AVAILABLE。

多Master多Slave模式——异步复制 即使磁盘损坏,消息丢失的非常少,但消息实时性不会受影响,因为Master宕机后,消费者仍然可以从Slave消费,此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样。 Master宕机,磁盘损坏情况,会丢失少量信息。  
多Master多Slave模式——同步双写 数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高 性能比异步复制模式稍低,大约低10%左右,发送单个消息的RT会稍高,目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能  

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/10989.html

(0)

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信