大家好,欢迎来到IT知识分享网。
目录
1、架构介绍
namesrv集群:负责接受broker注册的路由信息,并为producer和consumer提供路由查询服务。路由信息包括了broker自身的一些信息,如IP地址,brokerId,brokerName,broker所属集群等,还包括该broker中管理的每个topic对应的多个queue信息。有了路由信息,生产者就可以知道消息该往哪些broker上发,消费者就可以知道从哪些broker的哪些queue上可以拉到消息了。namesrv集群由一个或者多个namesrv机器组成,机器之间相互独立,不产生任何通信。
broker集群:负责topic的创建,向namesrv注册路由信息,消息存储,消息查询,消息消费。broker由master broker和slave borker组成,master broker之间无通信,多个master broker用于分摊消息生产时的压力,每个topic对应的消息会被分配到一个broker集群下的所有master broker上,生产者在发送指定topic的消息时,会从所有可写queue中轮询一个queue进行消息发送,这里的queue对应于broker向namesrv注册的路由信息中的写队列数,不同能力的master broker机器可以配置不同数量的可写队列数,可以实现消息的权重分配。另外,由于topic上的消息可以被分配到任意个broker上,这就支持了方便的broker横向扩展。slave broker用于对master broker进行消息同步备份,对外只提供消息读取,不提供消息发送。
producer与consumer:生产者与消费者,其实都属于rocketMq的客户端,一个应用程序中往往都是生产者与消费者都有的。
4、索引
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-store</artifactId>
<version>4.1.0-incubating</version>
</dependency>
对应的类:
org.apache.rocketmq.store.index.IndexFile
org.apache.rocketmq.store.index.IndexHeader
org.apache.rocketmq.store.index.IndexService
4.1、index文件数据结构
Index Header:
- beginTimestamp:该索引文件的第一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 0-7) 8bytes
- endTimestamp:该索引文件的最后一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 8-15) 8bytes
- beginPhyoffset:该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息) 物理位置(pos: 16-23) 8bytes
- beginPhyoffset:该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量 (pos: 24-31) 8bytes
- hashSlotCount:该索引文件目前的hash slot的个数 (pos: 32-35) 4bytes
- indexCount:该索引文件目前的索引个数 (pos: 36-39) 4bytes
Slot:
- slot每个节点保存当前已经拥有多少个index数据了
Index List:
- key hash value:message key的hash值
- phyOffset:message在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
- timeDiff:message的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
- prevIndex:hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引,则prevIndex=0, 也是消息索引查找时的停止条件。)每个slot位置的第一个消息的prevIndex就是0的
4.2、存索引
public boolean putKey(String key, long phyOffset, long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
int keyHash = this.indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = 40 + slotPos * hashSlotSize;
Object fileLock = null;
boolean var14;
try {
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff /= 1000L;
if (this.indexHeader.getBeginTimestamp() <= 0L) {
timeDiff = 0L;
} else if (timeDiff > 2147483647L) {
timeDiff = 2147483647L;
} else if (timeDiff < 0L) {
timeDiff = 0L;
}
int absIndexPos = 40 + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize;
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int)timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
var14 = true;
} catch (Exception var24) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), var24);
return false;
} finally {
if (fileLock != null) {
try {
((FileLock)fileLock).release();
} catch (IOException var23) {
var23.printStackTrace();
}
}
}
return var14;
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum);
return false;
}
}
4.3、取索引
public void selectPhyOffset(List<Long> phyOffsets, String key, int maxNum, long begin, long end, boolean lock) {
if (this.mappedFile.hold()) {
int keyHash = this.indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = 40 + slotPos * hashSlotSize;
Object fileLock = null;
try {
if (lock) {
}
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
int prevIndexRead;
if (slotValue > invalidIndex && slotValue <= this.indexHeader.getIndexCount() && this.indexHeader.getIndexCount() > 1) {
for(int nextIndexToRead = slotValue; phyOffsets.size() < maxNum; nextIndexToRead = prevIndexRead) {
int absIndexPos = 40 + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long)this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0L) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = timeRead >= begin && timeRead <= end;
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
}
}
} catch (Exception var33) {
log.error("selectPhyOffset exception ", var33);
} finally {
if (fileLock != null) {
try {
((FileLock)fileLock).release();
} catch (IOException var32) {
var32.printStackTrace();
}
}
this.mappedFile.release();
}
}
}
5、文件清理
由于RocketMQ操作CommitLog、ConsumeQueue文件,都是基于内存映射方法并在启动的时候,会加载commitlog、ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要一种机制来删除已过期的文件。
RocketMQ顺序写Commitlog、ConsumeQueue文件,所有写操作全部落在最后一个CommitLog或ConsumeQueue文件上,之前的文件在下一个文件创建后,将不会再被更新。RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会管这个这个文件上的消息是否被全部消费。默认每个文件的过期时间为72小时。通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。接下来详细分析RocketMQ是如何设计与实现上述机制的。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
private void cleanFilesPeriodically() {
this.cleanCommitLogService.run();
this.cleanConsumeQueueService.run();
}
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
整个执行过程分为两个大的步骤,第一个步骤:尝试删除过期文件;第二个步骤:重试删除被hange(由于被其他线程引用在第一阶段未删除的文件),在这里再重试一次。
- fileReservedTime:文件保留时间,也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以被删除。
- deletePhysicFilesInterval:删除物理文件的间隔,因为在一次清除过程中,可能需要删除的文件不止一个,该值指定两次删除文件的间隔时间。
- destroyMapedFileIntervalForcibly:在清除过期文件时,如果该文件被其他线程所占用(引用次数大于0,比如读取消息),此时会阻止此次删除任务, 同时在第一次试图删除该文件时记录当前时间戳,destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能保留的最大时间,在此时间内,同样可以被拒绝删除,同时会将引用减少1000个,超过该时间间隔后,文件将被强制删除。
以CommonLog为例:RocketMQ在如下三种情况任意满足之一的情况下将继续执行删除文件操作
- 到了删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次删除过期文件操作,默认为凌晨4点。
- 判断磁盘空间是否充足,如果不充足,则返回true,表示应该触发过期文件删除操作。
- 预留,手工触发,可以通过调用excuteDeleteFilesManualy方法手工触发过期文件删除,目前RocketMQ暂未封装手工触发文件删除的命令。
对于判断磁盘空间是否充足的逻辑
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; // @1
cleanImmediately = false;
{
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); // @2
if (physicRatio > diskSpaceWarningLevelRatio) { // @3
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
}
cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
}
}
if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
return true;
}
}
6、producer
msgId(producer客户端产生):ip + 进程pid + MessageClientIDSetter.class.getClassLoader().hashCode() + time + counter(AtomicInteger自增变量)
offsetMsgId(broker产生):主机ip + 物理分区的offset
两个Id都可以在rocketqm-console的Message Id 搜到
7、相关问题
RocketMQ支持顺序消费。
RocketMQ解决消息重复消费:
RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。
- 记录下每个消息的msgID
- 新消息来的时候,查看该消息的msgID是否已记录,是则抛弃,否则消费
那么msgID记录在哪里呢?当然是缓存。所以我在解决这个问题的时候,使用了redis缓存。具体做法如下:
- 消费端接收到消息的时候,调用redis提供的incr方法,以msgID作为key(具有唯一性),value则默认从1开始递增。
- 当incr返回值为1时,设置其失效时间为两分钟以后(每个msgID保留两分钟足矣!),并且该消息需要被消费。
- 当incr返回值大于1时,则忽略该消息。
RocketMQ解决消息丢失:
- 判断消息丢失:1、分布式链路追踪系统:如果公司的基础技术建设比较完善,完全可以靠分布式链路追踪系统,来分析消息的全链路。常见有:Cat、Zipkin、Pinpoint、SkyWalking;2、发送有序性消息验证
消息生产端:保证有消息返回
消息存储:1、刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘(在收到消息后,将消息写入磁盘后再给
Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消
息,恢复后还可以继续消费。);2、broker为集群模式,配置消息复制(如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的Broker 可以替代宕机的 Broker,也不会发生消息丢失。)消息消费端:不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
RocketMQ高可用:
部署方式 | 优点 | 缺点 | 备注 |
---|---|---|---|
单个Master模式 | 一旦Broker重启或者宕机时,会导致整个服务不可用,不建议线上环境使用 | ||
多个Master模式 | 配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高。 | 单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会收到影响 |
当使用多master无slave的集群搭建方式时,master的brokerRole配置必须为ASYNC_MASTER。如果配置为SYNC_MASTER,则producer发送消息时,返回值的SendStatus会一直是SLAVE_NOT_AVAILABLE。 |
多Master多Slave模式——异步复制 | 即使磁盘损坏,消息丢失的非常少,但消息实时性不会受影响,因为Master宕机后,消费者仍然可以从Slave消费,此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样。 | Master宕机,磁盘损坏情况,会丢失少量信息。 | |
多Master多Slave模式——同步双写 | 数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高 | 性能比异步复制模式稍低,大约低10%左右,发送单个消息的RT会稍高,目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能 |
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/10989.html