redis的消息发布、订阅服务与websocket结合推送消息给多个客户端

redis的消息发布、订阅服务与websocket结合推送消息给多个客户端前言多拓展一下了解的东西,主动学习比较感兴趣的websocket和消息队列相关概念。然后,就发现了redis竟然也可以进行消息的传递与分发、还有

大家好,欢迎来到IT知识分享网。

前言

多拓展一下了解的东西,主动学习比较感兴趣的websocket和消息队列相关概念。然后,就发现了redis竟然也可以进行消息的传递与分发、还有订阅某类消息的机制,这很像站点广播、全局或者局部通知的功能。对此产生了不少兴趣,决定建个demo,尝试一下。

一、环境介绍

本次编写demo程序,参考的博客地址如下:

Websocket技术的Java实现(上篇)–https://blog.csdn.net/KeepStruggling/article/details/

建议先理解websocket的概念:

MDN的websocket的概念 — https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket

廖雪峰的官方网站的websocket说明 — https://www.liaoxuefeng.com/wiki/49312/24096

pom依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.5.4</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.5.4</version>
</dependency>
<dependency>
    <!--websocket支持包-->
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.5.4</version>
</dependency>
<dependency>
    <!--html-->
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-thymeleaf</artifactId>
    <version>2.5.4</version>
</dependency>

二、redis的序列化配置

可以存中文的key和value

 @Bean public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, String> redisTemplate = new RedisTemplate<String, String>(); redisTemplate.setConnectionFactory(factory); //key序列化方式;(不然会出现乱码;),但是如果方法上有Long等非String类型的话,会报类型转换错误; //所以在没有自己定义key生成策略的时候,以下这个代码建议不要这么写,可以不配置或者自己实现ObjectRedisSerializer //JdkSerializationRedisSerializer序列化方式 是没办法直接可视化存储数据的; //Jackson2JsonRedisSerializer序列化方式 有一个缺点,存储字符串时,可能会给字符串额外地加一对双引号,如""abc""; //实测推荐StringRedisSerializer序列化方式; redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new StringRedisSerializer()); redisTemplate.setHashValueSerializer(new StringRedisSerializer()); return redisTemplate; } @Override @Bean public KeyGenerator keyGenerator() { return new SimpleKeyGenerator() { @Override public Object generate(Object target, Method method, Object... params) { StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getSimpleName());//获得类的简单类名,不包括前缀包路径 sb.append(".").append(method.getName()); StringBuilder paramsSb = new StringBuilder(); for (Object param : params) { // 如果不指定,默认生成包含到键值中 if (param != null) { paramsSb.append(param); } } if (paramsSb.length() > 0) { sb.append("_").append(paramsSb); } return sb.toString(); } }; }

三、redis的消息监听设置

主要是监听到某个渠道有了消息之后,怎么处理。

import java.io.IOException; import java.nio.charset.Charset; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import com.feng.test.config.websocket.WebSocketServer; /** * @className: RedisMessageListener.java * @author: * @version: 1.0.0 * @description:redis消息监听:得到消息后的处理方法 onMessage() * @date: 2022/8/23 */ @Configuration public class RedisMessageListener implements MessageListener { public void onMessage(Message message, byte[] bytes) { // 消息体 System.out.println(message.getBody()); String body = new String(message.getBody(), Charset.forName("UTF-8")); // 传播渠道名称 String topic = new String(bytes); System.out.println("消息:" + body + ",渠道:" + topic); try { //此处是将redis监听到的消息,进行了websocket连接群发消息到浏览器端 WebSocketServer.sendInfo(body, null); } catch (IOException e) { e.printStackTrace(); } } }

四、redis的消息的监听容器

主要是设置如何监听消息,以及设置监听多个途径的消息,针对不同途径的消息可以有不同的处理逻辑,同一个途径的消息也可以有多个不同的处理逻辑。

 @Autowired private RedisTemplate redisTemplate; @Autowired private RedisConnectionFactory redisConnectionFactory; //引入一个redis的消息监听器 @Autowired private MessageListener redisMessageListener; //线程池 private ThreadPoolTaskScheduler taskScheduler; //创建线程池 @Bean public ThreadPoolTaskScheduler initPoolTaskScheduler() { if (taskScheduler != null) { return taskScheduler; } taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(20); return taskScheduler; } /** * @description:创建发布redis消息监听器的容器, * 一个监听容器可以设置监听多个渠道的消息,并进行个性化的监听处理逻辑 * @author: feng * @return RedisMessageListenerContainer */ @Bean public RedisMessageListenerContainer initContainer() { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); //设置连接 container.setConnectionFactory(redisConnectionFactory); //设置线程池 container.setTaskExecutor(initPoolTaskScheduler()); //定义监听的渠道 Topic a = new ChannelTopic("topic1"); Topic b = new ChannelTopic("topic2"); //使用监听器监听redis的消息 container.addMessageListener(redisMessageListener,a); container.addMessageListener(redisMessageListener,b); return container; }

这一步,框架封装的太厉害,仍然有很多东西不是很明白,比如,容器结合了线程池之后,究竟是如何多线程监听消息变化的?

五、承接上面websocket群发消息

在引入websocket的依赖后,需要配置一番,才可以使用websocket。
首先,配置类:

@Configuration public class WebSocketConfig { /** * 如果使用Springboot默认内置的tomcat容器,则必须注入ServerEndpoint的bean; * 如果使用外置的web容器,则不需要提供ServerEndpointExporter,下面的注入可以注解掉 * 本次使用springboot内置的tomcat容器,不注释掉下面的bean * @return */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }

再者,要定义一个websocket的连接路径(应该也可以多个路径):

@Component @ServerEndpoint("/ws/{sid}") public class WebSocketServer { private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class); public WebSocketServer() { } // 在线连接数统计 private static int onlineCount = 0; // 声明某个连接的session private Session session; // 存放无数个socket连接 private static ConcurrentHashMap<String, WebSocketServer> websocketMap = new ConcurrentHashMap<String, WebSocketServer>(); // 声明sid private String sid; /** * 建立socket的方法调用 * * @param session * @param sid */ @OnOpen public void onOpen(Session session, @PathParam("sid") String sid) { // 赋值当前的连接session this.session = session; this.sid = sid; if (websocketMap.get(this.sid) == null) { // 存放本次socket连接 websocketMap.put(sid, this); // 增加一个在线数 addOnlineCount(); log.info("新的socket连接出现并监听:" + sid + ",当前在线人数:" + getOnlineCount()); } } /** * 连接关闭时调用的方法 */ @OnClose public void onClose() { if (websocketMap.get(this.sid) != null) { websocketMap.remove(this.sid); subOnlineCount(); log.info("连接" + this.sid + "断开了"); } } /** * socket连接时,收到消息时的处理方法 * * @param message * @param session */ @OnMessage public void onMessage(String message, Session session) { log.info("连接收到消息:" + message); // 如果message有字符 if (StringUtils.hasLength(message)) { // 实际业务连接成千上万,应该用多线程处理 for (WebSocketServer server : websocketMap.values()) { try { server.sendMessage(message); log.info("websocket群发消息-sid=" + server.sid + ":" + message); } catch (Exception e) { } } } } @OnError public void onError(Session session, Throwable error) { log.error("发生错误了"); error.printStackTrace(); } /** * 获取在线连接数 * * @return */ public static synchronized int getOnlineCount() { return onlineCount; } /** * 增加在线连接数 * * @return */ public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } /** * 减少在线连接数 * * @return */ public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } /** * 实现服务器主动推送消息 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群发自定义消息(用set会方便些) */ public static void sendInfo(String message, @PathParam(value = "sid") String sid) throws IOException { log.info("推送消息到sid:" + sid + ",推送内容:" + message); if (StringUtils.hasLength(message)) { for (WebSocketServer server : websocketMap.values()) { try { // sid为null时群发,不为null则只发一个 if (sid == null) { server.sendMessage(message); } else if (server.sid.equals(sid)) { server.sendMessage(message); } } catch (IOException e) { e.printStackTrace(); continue; } } } } }

六、建立一个前端界面测试一下

<!DOCTYPE html>
<html>

<head>
    <meta charset="utf-8">
    <title>websocket通讯</title>
</head>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>
<script>
    var socket;
    function openSocket() {
        if (typeof (WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        } else {
            console.log("您的浏览器支持WebSocket");
            //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
            //等同于socket = new WebSocket("ws://localhost:9010/javatest/ws/25");
            //var socketUrl="${request.contextPath}/ws/"+$("#userId").val();
            var socketUrl = "http://localhost:8100/test/ws/" + $("#userId").val();
            socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
            console.log(socketUrl)
            socket = new WebSocket(socketUrl);
            //打开事件
            socket.onopen = function () {
                console.log("websocket已打开");
                //socket.send("这是来自客户端的消息" + location.href + new Date());
            };
            //获得消息事件
            socket.onmessage = function (msg) {
                console.log(msg.data);
                let obj = JSON.parse(msg.data);
                if ($("#userId").val() == obj.userId) {
                     //打印我发送的消息
                    $("#showInfo").append('<p>您发送了: ' + $("#contentText").val() + "</p>");
                }else if ($("#userId").val() == obj.toUserId) {
                     //打印我接收的消息
                    $("#showInfo").append('<p>用户'+ obj.userId+'给您发送了: ' + obj.content + "</p>");
                }else if (obj.toUserId == null){
                     //打印群发的消息
                    $("#showInfo").append('<p>群发消息:'+ obj.content + "</p>");
                }

            };
            //关闭事件
            socket.onclose = function () {
                console.log("websocket已关闭");
            };
            //发生了错误事件
            socket.onerror = function () {
                console.log("websocket发生了错误");
            }
        }
    }
    function sendMessage() {
        if (typeof (WebSocket) == "undefined") {
            console.log("您的浏览器不支持WebSocket");
        } else {
            console.log("您的浏览器支持WebSocket");
            let obj = {
                userId: $("#userId").val(),
                toUserId: $('#toUserId').val(),
                content: $("#contentText").val()
            };
            
            socket.send(JSON.stringify(obj) );
        }
    }
</script>

<body>
    <p>【userId】:
    <div><input id="userId" name="userId" type="text" value="1" /></div>
    <p>【toUserId】:
    <div><input id="toUserId" name="toUserId" type="text" value="2" /></div>
    <p>【toUserId内容】:
    <div><input id="contentText" name="contentText" type="text" value="abc" /></div>
    <p>【操作】:
    <div><button onclick="openSocket()" >开启socket</button></div>
    <p>【操作】:
    <div><button onclick="sendMessage()" >发送消息</button></div>
    <p id="showInfo"></p>
</body>

</html>

七、编写一个接口测试redis推送订阅消息

 @Autowired private RedisTemplate redisTemplate; @RequestMapping(value = "/getMessageData") public ResultData getMessageData(String message) { ObjectMapper om = new ObjectMapper(); try { Map<String, Object> map = om.readValue(message, HashMap.class); redisTemplate.opsForValue().set("a", message); redisTemplate.opsForValue().set("b", map.toString()); redisTemplate.convertAndSend("topic1", map); System.out.println(map); } catch (Exception e) { e.printStackTrace(); } return ResultData.success("操作成功"); }

测试接口:

redis的消息发布、订阅服务与websocket结合推送消息给多个客户端

接口发送要通知的消息测试


观察日志输出:

redis的消息发布、订阅服务与websocket结合推送消息给多个客户端

后端控制台输出


在看一下页面上建立websocket后,已经将redis发布的消息,从服务器端推送到了浏览器端:

redis的消息发布、订阅服务与websocket结合推送消息给多个客户端

浏览器接收到通知

至此,基本上这个demo已经实现了最初的目的。但是,这只能说技术思路上,可行,如果应用到生产系统上,必然需要考虑websocket连接数过多的问题、群发消息的遍历问题等等。

八、测试两个websocket之间互发消息

redis的消息发布、订阅服务与websocket结合推送消息给多个客户端

websocket之间互发消息

最后总结一下,从这个demo中,可以看出来框架已经做了很多的封装,服务端和浏览器的ip与端口不对,但是其中的互相建立连接的过程在后端已经被封装了,这样的优点是易于使用,缺点嘛,就是只会使用,但是稍一有新的业务需求,便不知道如何变通了。希望同行者能明白demo只是简单的参考,其中的协议原理才是知识的本质。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/54195.html

(0)
上一篇 2024-07-30 20:33
下一篇 2024-08-09 13:33

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信