大家好,欢迎来到IT知识分享网。
消息队列
MQ(Message Queue)消息队列,是基础数据结构中”先进先出”的一种数据结构。
消息,是在两台计算机间传送的数据单位;
消息队列:消息的传输过程中保存消息的容器;
消息队列管理器,将消息从它的源中继到它的目标时充当的中间人;
队列的主要目的是提供路由并保证消息的传递,如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。
消息中间件
消息中间件就是管理消息队列的服务器;
消息中间件,利用高效可靠的消息传递机制进行异步的数据传输,并基于数据通信进行分布式系统的集成;
通过提供消息队列模型和消息传递机制,可以在分布式环境下扩展进程间的通信;
异步处理模式:消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(主题 或 队列)上,消息接收者则订阅或是监听该通道。一条信息可能最终转发给一个或多个消息接收者,这些接收者都无需对消息发送者做出同步回应,整个过程都是异步的。
使用较多的消息中间件:ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ、MetaMQ等;
消息中间件的使用场景
在分布式环境:应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等等功能,其作为分布式系统架构中的一个重要组件,有着举足轻重的地位。
应用系统之间解耦合:
发送者和接受者不必了解对方、只需要确认消息;发送者和接受者不必同时在线。
举例:在线交易系统为了保证数据的最终一致,在支付系统处理完成后会把支付结果放到消息中间件里,通知 订单系统修改订单支付状态;两个系统是通过消息中间件解耦的。
实现流量削峰:
站点中的服务与服务之间是”直接调用”关系;在流量大的情况下,采用”MQ推送”,上游将消息发给MQ,MQ将消息推送给下游;那么,上游队列缓冲,限速发送;下游队列缓冲,限速执行;
ActiveMQ介绍
ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;完全支持JMS1.1;地址:http://activemq.apache.org/;
单机安装
前置条件:JDK已安装,JAVA_HOME环境变量已配置
安装环境:CentOS7 + apache-activemq-5.16.0-bin.tar.gz
# tar -zxvf apache-activemq-5.16.0-bin.tar.gz
# cd apache-activemq-5.16.0/bin/
IT知识分享网
配置:
IT知识分享网地址配置(conf/jetty.xml):
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<property name="host" value="192.168.1.18"/>
<property name="port" value="8161"/>
</bean>
用户配置(conf/users.properties):
admin=admin
启动:
# ./activemq start
访问地址
http://192.168.1.18:8161/index.html (admin,admin)
Master Slave部署(主从)
Share storage master slave(共享存储):Shared File System Master Slave模式(只适合单台主机部署,不适合多台主机部署),JDBC Store Master Slave模式(适合多台主机部署)
1、前置条件:MySQL数据库已准备好,上传MySQL驱动(%activemq_home%/lib):
mysql-connector-java-8.0.21.jar
2、持久化配置(vim activemq.xml),默认:
IT知识分享网<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
持久化配置(vim activemq.xml),MySQL:
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.cj.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.1.11:3306/activemq?useSSL=false&useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&transformedBitIsBoolean=true&tinyInt1isBit=false&allowMultiQueries=true&serverTimezone=GMT%2B8"/>
<property name="username" value="root"/>
<property name="password" value="root1234"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
3、启动activemq:
数据库自动生成3张表:ACTIVEMQ_ACKS、ACTIVEMQ_LOCK、ACTIVEMQ_MSGS;
4、效果
客户端连接代码
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
public class ConnectionFactoryTest {
public static void main(String[] args) {
String brokerURL = "failover://(tcp://192.168.1.16:61616,tcp://192.168.1.17:61616,tcp://192.168.1.17:61616)";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, brokerURL);
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
} catch (JMSException e) {
e.printStackTrace();
}finally {
try {
connection.close();
} catch (JMSException ignore) {
ignore.printStackTrace();
}
}
// INFO | Successfully connected to tcp://192.168.1.17:61616
}
}
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/7078.html