大家好,欢迎来到IT知识分享网。
APM.Server 消息推送服务的实现
消息推送服务
服务器推送目前流行就是私信、发布/订阅等模式,基本上都是基于会话映射,消息对列等技术实现的;高性能、分布式可以如下解决:会话映射可采用redis cluster等技术实现,消息对列可使用kafka等分布式消息队列方案实现。
APM.Server基于简单
1 static ConcurrentDictionary<string, Session> _sessionDic = new ConcurrentDictionary<string, Session>();
和
1 private static ConcurrentQueue<Message> _messageQueue = new ConcurrentQueue<Message>();
实现。
部分代码如下:
1 /// <summary> 2 /// 消息转发 3 /// </summary> 4 private void ForwardMsg() 5 { 6 try 7 { 8 var msg = MessageQueue.Dequeue(); 9 if (msg != null) 10 { 11 switch (msg.Type) 12 { 13 case (byte)MessageType.Sub: 14 if (!msg.IsMuti) 15 { 16 if (!SessionDic.Exists(msg.SessionID, msg.SessionID)) 17 SessionDic.Set(this._server, msg.SessionID, msg.SessionID); 18 } 19 if (!SessionDic.Exists(msg.SessionID, msg.Sender)) 20 SessionDic.Set(this._server, msg.Sender, msg.SessionID); 21 break; 22 case (byte)MessageType.Unsub: 23 if (!msg.IsMuti) 24 { 25 if (SessionDic.Exists(msg.SessionID, msg.SessionID)) 26 SessionDic.Del(msg.SessionID, msg.SessionID); 27 } 28 if (SessionDic.Exists(msg.SessionID, msg.Sender)) 29 SessionDic.Del(msg.Sender, msg.SessionID); 30 break; 31 default: 32 var session = SessionDic.Get(msg.SessionID); 33 if (session != null) 34 { 35 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList(); 36 if (remotes != null && remotes.Count > 0) 37 { 38 Parallel.For(0, remotes.Count, i => 39 { 40 this._server.SendMsg(remotes[i], Message.Serialize(msg)); 41 }); 42 } 43 } 44 this.OnMessage?.Invoke(msg); 45 break; 46 } 47 48 } 49 } 50 catch { } 51 }
1 /// <summary> 2 /// 消息转发 3 /// </summary> 4 private void ForwardMsg() 5 { 6 try 7 { 8 var msg = MessageQueue.Dequeue(); 9 if (msg != null) 10 { 11 switch (msg.Type) 12 { 13 case (byte)MessageType.Sub: 14 if (!msg.IsMuti) 15 { 16 if (!SessionDic.Exists(msg.SessionID, msg.SessionID)) 17 SessionDic.Set(this._server, msg.SessionID, msg.SessionID); 18 } 19 if (!SessionDic.Exists(msg.SessionID, msg.Sender)) 20 SessionDic.Set(this._server, msg.Sender, msg.SessionID); 21 break; 22 case (byte)MessageType.Unsub: 23 if (!msg.IsMuti) 24 { 25 if (SessionDic.Exists(msg.SessionID, msg.SessionID)) 26 SessionDic.Del(msg.SessionID, msg.SessionID); 27 } 28 if (SessionDic.Exists(msg.SessionID, msg.Sender)) 29 SessionDic.Del(msg.Sender, msg.SessionID); 30 break; 31 default: 32 var session = SessionDic.Get(msg.SessionID); 33 if (session != null) 34 { 35 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList(); 36 if (remotes != null && remotes.Count > 0) 37 { 38 Parallel.For(0, remotes.Count, i => 39 { 40 this._server.SendMsg(remotes[i], Message.Serialize(msg)); 41 }); 42 } 43 } 44 this.OnMessage?.Invoke(msg); 45 break; 46 } 47 48 } 49 } 50 catch { } 51 }
异步tcp通信——APM.Core 服务端概述
异步tcp通信——APM.Core 解包
异步tcp通信——APM.Server 消息推送服务的实现
异步tcp通信——APM.ConsoleDemo
转载请标明本文来源:消息推送服务
更多内容欢迎star作者的github:https://github.com/yswenli/APM
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/33464.html