极简消息队列

极简消息队列昨天因为要下载最新的RabbitMQ,碰到些问题:官网下载,碰到下载ErLang的时候,需要进入Github下载,屡次失败使用Docker下载,

大家好,欢迎来到IT知识分享网。极简消息队列"

昨天因为要下载最新的RabbitMQ,碰到些问题:

  1. 官网下载,碰到下载ErLang的时候,需要进入Github下载,屡次失败
  2. 使用Docker下载,发现要开始收费了,虽然个人免费版暂时还可以用,但未来怎样不知道
  3. 此次需求其实很简单,就是希望从数据库排队这种做法,改成内存消息队列,将服务响应时间从2秒提升到0.5秒

有很多人说可以用梯子等方法获取,这块不太会,好的镜像也没找到,本着轮子精神,再造轮子,更符合自己心意

思路:

  1. 基于TCP通讯,所以首先构建一个TCP服务端和客户端
  2. 安全处理,通过用户名和密码方式做基本的认证,并每次通讯携带口令
  3. 队列处理,基本的FIFO(先进先出)即可,也不需要持久化保存
  4. Web管理控制台,不需要
  5. 支持跨平台,用C# Net6.0即可

后续会直接发代码上来,现在第一步(TCP通讯)测试应该没问题了

服务端

using System.Net;
using System.Net.Sockets;
using System.Text;

namespace BearMQ
{
    internal class Program
    {
        static void Main(string[] args)
        {
            // 启动监听服务
            var server = StartServer();

            // 定义一个现成,携带参数,避免堵塞            
            ParameterizedThreadStart handlerThread = new ParameterizedThreadStart(ProcessHandler);
            Thread processHandlerThread = new Thread(handlerThread);
            processHandlerThread.Start(server);
            
            Console.WriteLine("Start server, press blank to end");
            while (true)
            {
                // 按空格退出循环并停止监听
                var key = Console.ReadKey();
                if (key.KeyChar == ' ')
                {
                    Console.WriteLine("Ending, please wait...");
                    server.Disconnect(false);
                    server.Close();
                    break;
                }
                
            }
            Console.WriteLine("End");
        }

        /// <summary>
        /// 启动监听服务
        /// </summary>
        /// <returns></returns>
        static Socket StartServer()
        {
            // 创建监听Socket (IPv4, 流式通讯,TCP协议)
            Socket listenerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            
            // 绑定IP和端口
            listenerSocket.Bind(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 12345));

            // 启动监听,最大100请求队列
            listenerSocket.Listen(100);

            return listenerSocket;
        }

        /// <summary>
        /// 处理器
        /// </summary>
        /// <param name="args"></param>
        static void ProcessHandler(object args)
        {
            Socket server = (Socket)args;
            while (true)
            {
                Socket remote = server.Accept();
                
                // 读取数据
                byte[] buffer = new byte[1024];
                var datalen = remote.Receive(buffer, 0, buffer.Length, SocketFlags.None);
                string request = Encoding.UTF8.GetString(buffer, 0, datalen);
                Console.WriteLine(#34;{DateTime.Now}:{request}");

                // 应答数据(TBD:此处应该执行专门的业务处理,比如队列)
                var response = "TBD:some process";
                buffer = Encoding.UTF8.GetBytes(response);
                remote.Send(buffer, 0, buffer.Length, SocketFlags.None);
                
                // 关闭远程链接
                remote.Close();

                // 设置延时
                Thread.Sleep(100);
            }

        }
    }
}

客户端:

using System.Net;
using System.Net.Sockets;
using System.Text;


namespace BearMQClient
{
    internal class Program
    {
        static void Main(string[] args)
        {

            Console.WriteLine("Start connect, press any text send to server, press 'end' to quit");
            while (true)
            {
                var request = Console.ReadLine();
                if (request == "end") break;

                // connect
                Socket client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                try
                {
                    client.Connect("localhost", 12345);
                }
                catch (Exception ex)
                {
                    Console.Write(#34;Connect failed:{ex.Message}");
                    return;
                }

                // ready send
                byte[] buffer = Encoding.UTF8.GetBytes(request??"");
                client.Send(buffer, 0, buffer.Length, SocketFlags.None);
                EndPoint remote = new IPEndPoint(IPAddress.Any, 0);
                
                buffer = new byte[1024];
                int responselen = client.ReceiveFrom(buffer, ref remote);
                var response = Encoding.UTF8.GetString(buffer, 0, responselen);
                Console.WriteLine(#34;Response is:{response}");
                client.Close();
            }

            Console.WriteLine("End");
        }
    }
}

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/52064.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信