ActiveMQ入门概念

ActiveMQ入门概念语言:Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp ,REST,

大家好,欢迎来到IT知识分享网。

JMS是什么

JMS是SUN提出是为了统一MOM系统接口的规范,他包含点对点,以及发布订阅两种消息模型,提供可靠消息传输,事务和消息过滤等机制。

简单来说,JMS制定了一个发消息的规范,是一个与具体平台无关,绝大多数MOM提供商都对JMS提供支持 ,ActivaMQ是Apache出品的开源项目,他是JMS规范的一个实现。

MOM是什么

  • MOM:面向消息的中间件,使用消息中间件来协调消息传输操作。MOM需要提供API和管理工具
  • 客户端:调用api接口,把消息发送到目的地,在消息发送之后,客户端继续执行其他工作。
  • 接收方:收到这个消息确认之前,消息中间件一直保留该消息。

JMS的作用是什么

在不同应用之间进行通讯或者重一个系统传输数据到另一个系统,这两个应用之间,或分布式中发送消息,进行异步通讯,完成程序或者应用之间的解耦,

他主要用于在生产者和消费者之间进行消费传递,生产者负责产生消息,消费者消费消息,他在实际应用需求中,生产者生产消息,发布消息,消费者接受消息后,处理对应的业务逻辑。

JMS应用场景

使用在规模和复杂较高的分布式系统

  • 异步通讯
  • 客户和服务对象的生命周期解耦
  • 一对一或多对多通讯

JMS支持的模式

点对点模式

消息的生产者和消费者之间没有时间上的相关性,生产者把消息发布到Queue,可以有多个生产者,消息只能被一个消费者消费,一条消息只能消费一次,消费者无需订阅,当消费者没有消息消费时候,就会阻塞

发布/订阅模式

生产者和消费者是有时间上的关联,订阅一个主题的消费者只能消费自她订阅之后的消息生产者将消息发送到主题上消费者必须现订阅主题,JMS允许提供客户端创建持久订阅(持久订阅就是当网络断开后,消费服务器也记住所有持久化的订阅者,如果有消息,也会知道必定有人来消费)

这种两种发送模式各自有对应的Destination(目的地),Queue(队列),Topic(主题)

队列模式:是对应点对点模式,即一个生产者对应一个消费者,可以有多个生产者,但是只能是一个消费者

主题模式:是发布订阅模式,即一个生产者可以对应多个消费者,消费者只需要订阅指定生产者的消费即可,发布者/订阅者模式支持向特定消息主题发布消息,0个或多个订阅者可能对接受来自特定消息主题的消息感兴趣,在这种模式下,发布者和订阅者彼此不知道对方,这种模式好比匿名公告板,这种模式被概括为,多个消费者可以获得消息,在发布者和订阅者之间存在依赖性,发布者需要建立一个订阅,以便客户能够订阅,订阅者必须连续持续的活动状态接受消息,也可以持久化订阅,即当订阅者为连接时候,发布的消息,等重新连接上的时候会重新发布。

什么是ActiveMQ

MQ,即Message Queue,就是消息队列的意思

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规 范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的 地位。

特定

  • 1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp ,REST,WS Notification,XMPP,AMQP
  • 2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  • 3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  • 4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource
  • adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  • 5. 支持多种传输协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  • 6. 支持通过JDBC和journal提供高速的消息持久化
  • 7. 从设计上保证了高性能的集群
  • 8. 支持Ajax
  • 9. 支持与Axis的整合
  • 10. 可以很容易得调用内嵌JMS provider,进行测试

JMS事务

创建事务的方法,createSession(paramA,paramB)

如果paramA设置为true,则paramB的值就会被忽略,默认是Session.SESSION_TRANSACTED.

如果paramA设置为false,则paramB,可以为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE , Session.DUPS_OK_ACKNOWLEDGE 其中一个。

  • SESSION_TRANSACTED:指定事务,必须使用commit提交
  • AUTO_ACKNOWLEDGE:自动确认
  • CLIENT_ACKNOWLEDGE:客户端确认
  • DUPS_OK_ACKNOWLEDGE:延迟批量确认

生产者

ActiveMQ支持两种传输模式,持久传输和非持久传输,默认使用持久传输

两者的区别

  • 采用持久传输,传输的消息会保存在磁盘中,即存储转发方式,先把消息存储到磁盘中,在把消息发送给订阅者而,当borker宕机,机器恢复后,消息还在.
  • 采用非持久传输,发送的消息不进行存储,当borker宕机,消息丢失。

可以通过下面代码设置

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENCE);

消息同步和异步发送

在不考虑事物的情况下,

  • producer发送持久化消息是同步发送,发送阻塞,知道收到确认。
  • producer发送非久化消息是异步发送,异步发送不会等待broker的确认

可以通过下面方式设置

设置ConnectionFactory时指定使用异步

cf=new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

修改ConnectionFactory的配置

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

实例化后的ActiveMQConnection对象中设置异步发送

((ActiveMQConnection)connection).setUseAsyncSend(true);

生产者的流量控制

在ActiveMQ版本中,生产者时刻进行流量控制,流量控制意味着当broker检测到目标的内存,或临时文件空间或文件存储空间超过了限制,消息的流量就会被限制,生产者将会被阻塞直至资源可用,或者抛出异常.

同步发送的消息将会默认自动对每一个生产者使用流量控制,除非你使用useraSyncSend=true标志,否则这将对发送的持久化消息都适用。

异步发送消息,不需要等待broker任何确认消息,所以如果内存超过了限制,生产者也不知道,如果生产者想知道broker的限制被超过了。你需要配置produceWindowsize这一连接选项,这样就算异步发动消息也会对每一个生产者进行流量控制。

produceWindowsize可以通过如下配置设置

方式一

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); factory.setProducerWindowSize(int producerWindowSize);

方式二

设置在brokerUrl中"tcp://localhost:61616?jms.producerWindowSize= "

方式三

在destinationUri中设置: " test-queue?producer.windowSize= ",此参数只会对使用此Destination实例的producer生效,将会覆盖brokerUrl中的producerWindowSize值。

配置说明

ProducerWindowSize是生产者在等在确认消息之前,可以发送给代理的数据的最大byte数,这个确认消息用来告诉生产者,代理已经收到之前发送的消息了.(且只异步发送有意义)

或者,如果你要发送非持久化的消息(该消息默认是异步发送的),并且想要得到队列或者主题的内存限制是否达到,你只需将连接工厂配置为“alwaysSyncSend”。虽然这样会变得稍微慢一点,但是这将保证当出现内存问题时,你的消息生产者能够及时得到通知

((ActiveMQConnectionFactory)connectionFactory).setAlwaysSyncSend();

如何提升消息发送效率

  • 消息持久化

持久化类型的消息,对broker端性能消耗远远大于非持久化类型;

这归结于ActiveMQ本身对持久化消息确保“最终一致性”,持久化意味着“消息不丢失”(无论过期,还是DLQ),即当broker接收到消息后需要一次强制性磁盘同步(fsync)[备注:不过基于日志的存储器kahadb/levelDB提供延迟写入的特性,如果开启延迟写入,将会在broker物理失效时有丢失数据的潜在风 险];

对于Consumer在消费消息后,也会触发磁盘写入(通常为标记消息已消费,或者移除消息的相关索引信息;这 个过程通常是延迟写入);

此外,通常broker端还会开启相关的“过期消息检测”线程,将存储器中的数据载入内存并检测,这个过程也 是内存、磁盘IO消耗的。由此可见,持久化类型的消息从始至终,都在“拖累”着系统的性能和吞吐能力

这就要求,开发者根据实际需要定夺消息的传输模式(持久化、非持久化),对于数据可靠性要求较低,容忍数据在 极端情况下丢失的场景中,我们需要果断的使用NON_PERSISTENT。

消息属性

通过producer发送的消息中,除了消息本身的负荷之外,还有大量的JMS属性和Properties可以设置,比如timestamo,priority等,因为jms中,支持对JMS属性和properties使用selector,那么这些内容将会加大和复杂化message,header我们尽可能的在properties中携带更少,更小的数据,

此外,我们还不能通过message传递较大的文本,流数据,尽管activemq支持这些特定,但是他会对broker带来更多的消息存储,控制成本,较大的数据传递,使用activemq是不理智的。我们也要慎重使用prioity,这会对底层的存储器带来性能开销。

异步发送

如果消息是非持久化的,或者是基于session事物的,建议开发者不要关闭异步发送。设置合适的windowsize,开启broker端Flow control等,即提高produer发送效率,还能避免以broker数据过大带来的不稳定性。

事务

对于Producer而言,使用事务并不会消耗Broker太多的性能,主要是会占用内存,所有未提交的事务消息, 都会保存在内存中,有些基于日志的存储存储器,事务类型的持久化消息暂存在额外的文件中,直到日志提交 或者回滚后清除。

所以,Producer端不要在事务中,积压太多的消息,尽可能早的提交事务。提高consumer消费能力

选择适合的结构设计,让消费者消费的更快

选择合适的存储器

activeMQ目前支持JDBC/kahadb/LevelDB三种主要的存储器:

JDBC主要面向基于RDBMS方向,通常如果消息不仅面向ActiveMQ,还可能被用于第三方平台的操作, JDBC的特点就是透明度高,可扩展方案较多(但扩展成本较高)。

kahadb和LevelDB,同属于日志存储 + BTree索引,性能很好,对于消息较多(单位尺寸较小),消费 速度较快的应用,是最好的选择,这两种存储器也是最常用的,其中LevelDB是被推荐使用的。

Broker server

流量控制

设置指定队列和主题失效

如果你喜欢,你可以通过在代理的配置中,将适当的目的地(destination)的策略(policy)中的 producerFlowControl标志设置为false,使代理上特定的JMS队列和主题不使用流量控制,例如:

<destinationPolicy><policyMap><policyEntries><policyEntry topic="FOO.>" producerFlowControl="false"/></policyEntries></policyMap></destinationPolicy>

生效内存限制

注意,自从ActiveMQ 5.x中引入新的文件游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持 久化消息传送使用的内存总量。结果就是,你可能会发现一个队列的内存限制永远达不到,因为游标不需要使用太 多的内存。如果你真的想把所有的非持久化消息存放在内存中,并在达到内存限制的时候停掉生产者,你需要配置。

<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"><pendingQueuePolicy><vmQueueCursor/></pendingQueuePolicy></policyEntry>

上面的片段可以保证所有的非持久化队列消息都保存在内存中,每一个队列的内存限制为1Mb

配置生产者客户端的异常

应对Broker代理空间不足,而导致不确定的阻塞 send()操作的一种替代方案,就是将其配置成客户端抛出的一个 异常。通过将sendFailIfNoSpace属性设置为true,代理将会引起send()方法失败,并抛出javax.jms.ResourceAllocationException异常,传播到客户端。下面是一个配置的示例:

<systemUsage sendFailIfNoSpace="true"><memoryUsage><memoryUsage limit="20 mb"/></memoryUsage></systemUsage>

这个属性的好处是,客户端可以捕获 javax.jms.ResourceAllocationException 异常,稍等一下,并重试 send()操作,而不是无限期地傻等下去。

从5.3.1版本之后, sendFailIfNoSpaceAfterTimeout 属性被加了进来。这个属性同样导致send()方法失败, 并在客户端抛出异常,但仅当等待了指定时间之后才触发。如果在配置的等待时间过去之后,代理上的空间仍然没 有被释放,仅当这个时候send()方法才会失败,并且在客户端抛出异常。下面是一个示例:

<systemUsage sendFailIfNoSpaceAfterTimeout="3"><memoryUsage><memoryUsage limit="20 mb"/></memoryUsage></systemUsage>

定义超时的 单位是毫秒 ,所以上面的例子将会在使send()方法失败并对客户端抛出异常之前,等待三秒。这个属性 的优点是,它仅仅阻塞配置指定的时间,而不是立即令发送失败,或者无限期阻塞。这个属性不仅在代理端提供了 一个改进,还对客户端提供了一个改进,使得客户端能捕获异常,等待一下并重试send()操作。

系统占用

你还可以通过元素的一些属性来减慢生产者。来看一眼下面的例子:

<systemUsage><systemUsage><memoryUsage><memoryUsage limit="64 mb" /></memoryUsage><storeUsage><storeUsage limit="100 gb" /></storeUsage><tempUsage><tempUsage limit="10 gb" /></tempUsage></systemUsage></systemUsage>

你可以为非持久化的消息(NON_PERSISTENT)设置内存限制,为持久化消息(PERSISTENT)设置磁盘空间,以及为缓存区设置磁盘空间,代理将在减慢生产者之前使用这些空间。使用上述的默认设置,代理将会一直阻塞send()方法的调用,直至一些消息被消费,并且代理有了可用空间。默认值如上例所述,你可能需要根据你的环境增加这些值。

解决消费者消费缓慢以及无法消费的问题

其实broker中还可以单独配置生产者使用的producerSystemUsage和consumerSystemUsage格式跟systemUsage一样。

默认情况下,没有指定producerSystemUsage和consumerSystemUsage,则生产者和消费者都是用systemUsage.

这样会导致生产者把内存使用完,而消费者线程处理缓慢或者无法消费消息,这种情况下,添加消费端的机器,和消费者数量可能都无法增加消费的速度。

解决办法就是

在broker上添加splitSystemUsageForProducerConsumer=true,使得生产者线程和消费者线程个使用各自的内存,

默认是生产者线程内存:消费者线程内存=6:4,当然也可以设置参数使内存各使用一半,如下

<broker xmlns="<http://activemq.apache.org/schema/core>" brokerName="localhost" dataDirectory="${activemq.data}" splitSystemUsageForProducersConsumers="true"producerSystemUsagePortion="50" consumerSystemUsagePortion="50">

消费定时删除

<brokerxmlns="http://activemq.apache.org/schema/core"chedulePeriodForDestinationPurge="10000"><destinationPolicy><policyMap><policyEntries><policyEntry topic=">" gcInactiveDestinations="true"inactiveTimoutBeforeGC="30000"/></policyEntries></policyMap></destinationPolicy></broker>

实现定时自动清理无效的Topic和Queue需要设置三个属性。

  • schedulePeriodForDestinationPurge:执行清理任务的周期,单位是毫秒
  • gcInactiveDestinations=”true”:启用清理功能
  • inactiveTimoutBeforeGC=”30000″ :Topic或Queue超时时间,在规定的时间内,无有效订阅,没有入队 记录,超时后就会被清理

持久化存储方式

  1. kahadb基于文件存储

kahadb是从ActiveMQ5.4开始默认的使用持久化插件,kahadb恢复时间远远小于其前身AMQ并且使用更少的数据文件,多以可以完全替换AMQ,kahadb的持久化机制同样是基于日志文件,索引和缓存

配置方式

<persistenceAdapter><kahaDBdirectory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/></persistenceAdapter>
  • directory : 指定持久化消息的 存储目录
  • journalMaxFileLength : 指定保存消息的日志文件大小,具体根据你的实际应用配置

特点

1、日志形式存储消息;

2、消息索引以B-Tree结构存储,可以快速更新;

3、完全支持 JMS事务;

4、支持多种恢复机制;

Kahadb的结构

ActiveMQ入门概念

消息存储在基于文件的数据日志中,如果消息发送成功,便标记为可删除,系统会周期性的清除或者归档日志文件,消息文件的位置索引存储在内存中,这样能快速定位,定期将内纯中的消息索引保存到metadata store中,避免大量消息未发送时,消息多音占用过多内存空间

  • Data Logs:用于存储消息日志,消息的全部内容都在Data Logs中。
  • Metadata cache:缓存用于存放在线消费的消息,如果消息已经快速的消费完成,那么这些消息就不需要存储在磁盘中了,Btree会根据MessageID创建索引,用于快速的查找消息,这个索引同样维护持久订阅者与Destion的关系以及每个消费者消费消息的指针。
  • Metadata Store:在db.data文件中保存消息日志中消息的元数据,也是以btree结构存储的,定时从Metadata cache更新数据,Metadata store 中也会备份一些消息日志中存在的信息,这样可以让broker实例快速启动,即使metadata store文件被破坏或者删除了,broker 可以读取Data logs回复过来,只是速度会慢些。

AMQ基于文件存储

性能高于jdbc,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高,性能很高,为了提高性能,创建消息主键索引,并且提供缓存机制,进一步提升性能,每个日志的大小是有限制的(默认32M,可自行配置)。当超过这个大小,系统会自动重新建立一个文件,当所有消息都消费完成,系统会删除这个文件或者归档,

主要缺点是:

AMQ Message会每个Destination创建索引,如果使用大量的queue,索引文件的大小会占用很多磁盘空间,

而且由于索引巨大,一旦broker崩溃,重建索引会非常慢。

<persistenceAdapter><amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/></persistenceAdapter>

虽然AMQ性能略高于下面的Kaha DB方式,但是由于其重建索引时间过长,而且索引文件占用磁盘空间过大,所 以已经不推荐使用。

3JDBC基于数据库的存储

第一步

你首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar

第二步

修改配置文件

<persistenceAdapter><jdbcPersistenceAdapter createTablesOnStartup="true" dataSource="#mysql-ds"/></persistenceAdapter>

dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这 样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。

第三步

在配置文件中broker节点增加以下内容

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method ="close"><property name="driverClassName" value="com.mysql.jdbc.Driver"/><property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/><property name="username" value="root"/><property name="password" value="root"/><property name="maxActive" value="200"/><property name="poolPreparedStatements" value="true"/></bean>

第四步

重新启动,发现数据库会生成三张表

  • activemq_acks:存储持久化订阅消息
  • activemq_lock:锁表(用来做集群的时候,实现maste选举的表)
  • activemq_msgs:消息表

LevelDB

从ActiveMQ 5.8版本之后,又推出了LevelDB的持久化引擎。目前默认的持久化方式仍然是KahaDB,不过LevelDB持久化性能高于KahaDB,可能是以后的趋势。在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的 数据复制方式,用于Master-slave方式的首选数据复制方案。

<persistenceAdapter><levelDB directory="${activemq.data}/activemq-data"/></persistenceAdapter>

Memory

顾名思义,基于内存的消息存储,就是消息存储在内存中。persistent=”false” :表示不设置持久化存储,直 接存储到内存中 。

<beans><broker brokerName="test-broker" persistent="false"xmlns="http://activemq.apache.org/schema/core"><transportConnectors><transportConnector uri="tcp://localhost:61616"/>      </transportConnectors></broker></beans>

JDBC Message store with ActiveMQ Journal

这种方式克服JDBC Store的不足,JDBC存储每次消息,都要去写库和读库,ActiveMQ Journal,使用延迟存储数据的数据库,当消息来到时先缓存到文件中,延迟后才写入数据库中,

当消费者的消费速度能够及时跟上生产者的生产速度,journal文件能够大大减少需要写入到DB中的消息

<persistenceFactory><journalPersistenceAdapterFactory dataSource="#mysql-ds" dataDirectory="${activemq.data}/activemq-data"/></persistenceFactory>

希望此文对大家有所帮助,也希望大家持续关注转载。关注公众号获取相关资料请回复:typescript,springcloud,springboot,nodejs,nginx,mq,javaweb,java并发实战,java并发高级进阶,实战java并发,极客时间dubbo,kafka,java面试题,ES,zookeeper,java入门到精通,区块链,java优质视频,大数据,kotlin,瞬间之美,HTML与CSS,深入体验java开发,web开发CSS系列,javaweb开发详解,springmvc,java并发编程,spring源码,python,go,redis,docker,即获取相关资料。

欢迎关注rookie-dog(洁癖是一只狗)

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/71724.html

(0)
上一篇 2024-04-29 13:00
下一篇 2023-01-03 09:52

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信