消息总线

消息总线一、MQ介绍消息队列MessageQueue,是基础数据结构中“先进先出”的一种数据机构。把要传输的数据(消息)放在队列中,队列机制来实现消息传递——生产者生产消息并把消息放入队列,然后由消费者处理;消费者可以到指定队列拉取消息,或者订阅响应的队列,由MQ服务端给其推送消息。参考:https

大家好,欢迎来到IT知识分享网。

一、MQ介绍

消息队列 Message Queue,是基础数据结构中“先进先出”的一种数据机构。把要传输的数据(消息)放在队列中,队列机制来实现消息传递——生产者生产消息并把消息放入队列,然后由消费者处理;消费者可以到指定队列拉取消息,或者订阅响应的队列,由MQ服务端给其推送消息。

 

参考:https://www.zhihu.com/question/54152397

消息队列可以简单理解为:把要传输的数据放到队列中

 

二、消息队列的好处

1.     解耦

消息总线

 

2.     异步

消息总线

 

 3.     限流

消息总线

三、队列(Queue)和主题(Topic)的区别

4.1 Queue消息递归模型,即点对点(Point-to-Point,简称PTP)

通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得。

4.2 Topic消息传递模型,即发布/订阅(publish/subscribe,简称pub/sub)

       通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。 

四、通过Python发送接收MQ消息

4.1 stomp协议库的安装

链接:https://pypi.org/project/stomp.py/

消息总线

 

4.2 创建监听名称

 消息总线

 

 

 4.3 建立连接

 消息总线

 

 

 如何查看stomp协议的端口号??

从配置中查看,查看路径 ..\AcitveMQ\conf\activemq.xml

 消息总线

4.4 发送/接收消息

4.4.1 queue队列

 消息总线

4.4.2 topic消息

 消息总线

Note:

  destination中通过 queue或topic来区分是队列还是主题

  主题模型下,需要提前订阅,所以要先接收主题消息,而且这个接收模式是在整个连接过程中生效的

 

五、代码

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import time
import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print('received an error {}'.format(message))
    def on_message(self, headers, message):
        print('received a message {}'.format(message))

conn = stomp.Connection([('30.1.1.57',22030)])   #第一个参数是MQIP,第二个参数是该MQ对应stomp协议的端口号
conn.set_listener('MyListener', MyListener())   #第一个参数为这个监听的名称,可以任意取值
conn.start()
conn.connect('admin','admin', wait=True)    #用户名和密码

# # 发送消息到testQueue队列,id可以任意填写
# conn.send(body=b'hahah', destination='/queue/testQueue')
# # 从testQueue队列中接收消息
# conn.subscribe(destination='/queue/testQueue', id='1')
# time.sleep(2)
# #断开连接
# conn.disconnect()

content = '''"incinf":{
        "calltime":"2020-07-16 08:30:00",
        "telnum":"15622811986",
        "eventaddress":"朗山路11号源证创业大厦5楼002",
        "stationhouse":"450312000000",
        "stationhousename":"漓江分局",
        "casetype":"治安案、事件",
        "caseport":"打架斗殴",
        "casekind":"聚众斗殴",
        "content":"实验室有人打架斗殴,发生流血事件002",
        "eventlevel":"二级警情",
        "operateseatno":"7001",
        "operateloginname":"漓江分局接警席2"} '''
# 从Topic.ProcessAlarmInfo主题中接收消息
conn.subscribe(destination='/topic/Topic.ProcessAlarmInfo',id='test')
# 发送消息到Topic.ProcessAlarmInfo主题
conn.send(body=content, destination='/topic/Topic.ProcessAlarmInfo')
# # 发送消息到Topic.ProcessAlarmInfo主题
# conn.send(body=content, destination='/topic/Topic.ProcessAlarmInfo')
time.sleep(2)
conn.disconnect()

 补充:临时队列

临时队列,SMP中全量更新信息保存在MQ的临时队列中,临时队列无法通过MQ页面查看,必须通过jconsole查看,临时队列在断开连接会释放掉。

消息总线

 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/31061.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信