「技术干货」一文读懂消息队列的原理和机制

「技术干货」一文读懂消息队列的原理和机制Linux 系统给我们提供了一种可以发送格式化数据流的通信手段 这就是消息队列 使用消息队列无疑在某些场景的应用下可以大大减少工作量 相同的工作如果使用共享内存 除了需要自己手工构造一个可能不够高效的队列外 我们还要自己处理竞争条件和临界区代

大家好,欢迎来到IT知识分享网。

Linux系统给我们提供了一种可以发送格式化数据流的通信手段,这就是消息队列。使用消息队列无疑在某些场景的应用下可以大大减少工作量,相同的工作如果使用共享内存,除了需要自己手工构造一个可能不够高效的队列外,我们还要自己处理竞争条件和临界区代码。而内核给我们提供的消息队列,无疑大大方便了我们的工作。

Linux环境提供了system v和POSIX两套消息队列,本文主要是以下内容:

  • 消息队列的原理和工作机制
  • 如何使用system v消息队列。
  • 如何使用POSIX消息队列。
  • 它们的底层实现分别是什么样子的?
  • 它们分别有什么特点?以及相关资源限制。

一,什么是消息队列

对于消息队列,我们可以采用类比下图的一个过程:

「技术干货」一文读懂消息队列的原理和机制

A要把信息发送个B,因为某些原因,当时A不能把东西面交给B,这时候他们商量,你把东西放到一个地方先寄存起来,我到时候去那个地方去取

  • A按照协议商定的,就把信息放到协议约定好的地方,并约定好了提货方式
  • 对于B而言,要想成功拿到A的东西,就必须要保证去同一个地址,并且只有1号柜才是给自己的信息

对于双方约定好的地方,有几个重要的信息:地址、保险柜号码,只有同一个地址和同一个保险柜才能保证是对方托管给自己的东西。在消息队列操作中,

  • 键(Key)值相当于地址
  • 消息队列的标识符相当于具体的地址
  • 消息类型相当于保险柜的号码

同一个键(key)值可以保证是同一个消息队列,同一个消息队列标示符才能保证不同的进程可以相互通信,同一个消息类型才能保证某个进程取出是对方的信息。那么什么是消息队列呢?

消息队列,其实是一种信箱机制,是Linux的一种通讯机制,这种通讯机制传递数据具有某种结构,而不是简单的字节流,提供了从一个进程向另外一个进程发送一块数据的方法。

更多Linux内核视频教程文档资料免费领取后台私信【内核】自行获取。

「技术干货」一文读懂消息队列的原理和机制

二,消息队列的工作机制

在应用开发中,生产者、消费者的模型非常常见,一方产生数据并把数据放到队列中,而另外一方从队列中取数据,先进先出。

同样,在操作系统内核中,也实现了类似的功能,队列中存放的是“消息”,称之为消息队列,也可以理解为数据,如下图所示,主要的用途是进程间通讯(IPC)。

「技术干货」一文读懂消息队列的原理和机制

关于标准接口,Linux系统中提供了POSIX和 System V这两种不同的接口,POSIX为可移植的操作系统接口。System V 是 AT&T 的第一个商业UNIX版本(UNIX System III)的加强。

System V 时期的不同系统接口不一样,给移植带来了一定的麻烦,而POSIX将不同操作系统之间的上层API进行了统一,更换平台时便于移植应用程序。目前Linux中使用POSIX较多,但System V同样也存在。

三,消息队列system v API

3.1 键(Key)值

System V 提供的进程间通信机制需要一个 key 值,通过 key 值就可在系统内获得一个唯一的消息队列标识符。key 值可以是人为指定的,也可以通过 ftok() 函数获得。

#include <sys/types.h> #include <sys/ipc.h> key_t ftok(const char *pathname, int proj_id); //功能:: 获取键值 //参数: proj_id 项目ID,非0整数,只有低8位有效 pathname路径名 //返回值: 成功 key值 失败-1

3.2 消息队列的创建

创建一个新的或者打开一个已经存在的消息队列。不同的进程调用此函数,只要用相同的 key 值就能得到同一个消息队列的标识符,就相当于双方约定好的地址。成功返回队列ID,失败返回-1。

#include <sys/msg.h> int msgget(key_t key, int msgflg);
  • key: ftok() 返回的 key 值,或者认为制定键值。多个进程可以通过它访问同一个消息队列
  • msgflg: 标识函数的行为及消息队列的权限,其取值如下:
  • IPC_CREAT:创建消息队列。
  • IPC_EXCL: 检测消息队列是否存在。

该函数可能返回以下错误代码:

  • EACCES:指定的消息队列已存在,但调用进程没有权限访问它
  • EEXIST:key指定的消息队列已存在,而msgflg中同时指定IPC_CREAT和IPC_EXCL标志
  • ENOENT:key指定的消息队列不存在同时msgflg中没有指定IPC_CREAT标志
  • ENOMEM:需要建立消息队列,但内存不足
  • ENOSPC:需要建立消息队列,但已达到系统的限制

3.3 消息队列的读写操作

//消息队列要发送的消息的结构体 #define MAX_MSG_LEN 1024 typedef struct MSGQUEUE { long msgType; char msgText[MAX_MSG_LEN]; }Msg;

首先是消息队列发送的结构体,由消息内容和消息类型组成。其中消息类型就是为了区分是哪个进程去读取这个消息而设置的,用来实现进程间的通信。

对于消息队列的读写,都是以消息类型为准。消息类型相当于保险柜号码,A 往 1 保险柜放东西,对方想取出 A 的东西必须也是从 1 号保险柜里取。同理,某一进程往消息队列添加 a 类型的消息,别的进程要想取出这进程添加的信息也必须取出 a 类型的消息。

msgsnd()函数:这个函数的主要作用就是将消息写入到消息队列,俗称发送一个消息。

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

函数传入值:

  • msqid:消息队列标识符。
  • msgp:发送给队列的消息。msgp可以是任何类型的结构体,但第一个字段必须为long类型,即表明此发送消息的类型,msgrcv()函数则根据此接收消息。

msgp定义的参照格式如下:

/*msgp定义的参照格式*/ struct s_msg{ long type; /* 必须大于0,消息类型 */ char mtext[1]; /* 消息正文,可以是其他任何类型 */ } msgp;

msgsz:要发送消息的大小,不包含消息类型占用的4个字节,即mtext的长度。

msgflg:如果为0则表示:当消息队列满时,msgsnd()函数将会阻塞,直到消息能写进消息队列;如果为IPC_NOWAIT则表示:当消息队列已满的时候,msgsnd()函数不等待立即返回;如果为IPC_NOERROR:若发送的消息大于size字节,则把该消息截断,截断部分将被丢弃,且不通知发送进程。

如果成功则返回0,如果失败则返回-1,并且错误原因存于error中。

错误代码:

  • EAGAIN:参数msgflg设为IPC_NOWAIT,而消息队列已满。
  • EIDRM:标识符为msqid的消息队列已被删除。
  • EACCESS:无权限写入消息队列。
  • EFAULT:参数msgp指向无效的内存地址。
  • EINTR:队列已满而处于等待情况下被信号中断。
  • EINVAL:无效的参数msqid、msgsz或参数消息类型type小于0。

msgsnd()为阻塞函数,当消息队列容量满或消息个数满会阻塞。消息队列已被删除,则返回EIDRM错误;被信号中断返回E_INTR错误。如果设置IPC_NOWAIT消息队列满或个数满时会返回-1,并且置EAGAIN错误。

msgsnd()解除阻塞的条件有以下三个条件:

  • 消息队列中有容纳该消息的空间。
  • msqid代表的消息队列被删除。
  • 调用msgsnd函数的进程被信号中断。

msgrcv()函数:从标识符为msqid的消息队列读取消息并将消息存储到msgp中,读取后把此消息从消息队列中删除,也就是俗话说的接收消息。

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
  • msqid:消息队列标识符。
  • msgp:存放消息的结构体,结构体类型要与msgsnd()函数发送的类型相同。
  • msgsz:要接收消息的大小,不包含消息类型占用的4个字节。

msgtyp有多个可选的值:如果为0则表示接收第一个消息,如果大于0则表示接收类型等于msgtyp的第一个消息,而如果小于0则表示接收类型等于或者小于msgtyp绝对值的第一个消息。

msgflg取值情况如下:

  • 0: 阻塞式接收消息,没有该类型的消息msgrcv函数一直阻塞等待
  • IPC_NOWAIT:若在消息队列中并没有相应类型的消息可以接收,则函数立即返回,此时错误码为ENOMSG
  • IPC_EXCEPT:与msgtype配合使用返回队列中第一个类型不为msgtype的消息
  • IPC_NOERROR:如果队列中满足条件的消息内容大于所请求的size字节,则把该消息截断,截断部分将被丢弃

msgrcv()函数如果接收消息成功则返回实际读取到的消息数据长度,否则返回-1,错误原因存于error中。

错误代码

  • E2BIG:消息数据长度大于msgsz而msgflag没有设置IPC_NOERROR
  • EIDRM:标识符为msqid的消息队列已被删除
  • EACCESS:无权限读取该消息队列
  • EFAULT:参数msgp指向无效的内存地址
  • ENOMSG:参数msgflg设为IPC_NOWAIT,而消息队列中无消息可读
  • EINTR:等待读取队列内的消息情况下被信号中断

msgrcv()函数解除阻塞的条件也有三个:

  • 消息队列中有了满足条件的消息。
  • msqid代表的消息队列被删除。
  • 调用msgrcv()函数的进程被信号中断。

3.3 操作消息队列

消息队列是可以被用户操作的,比如设置或者获取消息队列的相关属性,那么可以通过msgctl()函数去处理它。

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

函数传入值:

  • msqid:消息队列标识符。

cmd的取值有多个:

  • IPC_STAT 获取该 MSG 的信息,获取到的信息会储存在结构体 msqid_ds 类型的buf中。
  • IPC_SET 设置消息队列的属性,要设置的属性需先存储在结构体 msqid_ds类型的buf中,可设置的属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes,储存在结构体 msqid_ds。
  • IPC_RMID 立即删除该 MSG,并且唤醒所有阻塞在该 MSG 上的进程,同时忽略第三个参数。
  • IPC_INFO 获得关于当前系统中 MSG 的限制值信息。
  • MSG_INFO 获得关于当前系统中 MSG 的相关资源消耗信息。
  • MSG_STAT 同 IPC_STAT,但 msgid 为该消息队列在内核中记录所有消息队列信息的数组的下标,因此通过迭代所有的下标可以获得系统中所有消息队列的相关信息。
  • buf:相关信息结构体缓冲区。

函数返回值:

  • 成功:0
  • 出错:-1,错误原因存于error中

3.4 实例

使用msgsnd和msgrcv向消息队列发送和从队列中接收消息,我们先来看看如何访问一个已经存在的消息队列和向其发送消息。

发送端:

#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #define FILEPATH "/home/tony" #define PROJID 1234 #define MSG "hello world!" struct msgbuf { long mtype; char mtext[BUFSIZ]; }; int main() { int msgid; key_t key; struct msgbuf buf; key = ftok(FILEPATH, PROJID); if (key == -1) { perror("ftok()"); exit(1); } msgid = msgget(key, 0); if (msgid == -1) { perror("msgget()"); exit(1); } buf.mtype = 1; strncpy(buf.mtext, MSG, strlen(MSG)); if (msgsnd(msgid, &buf, strlen(buf.mtext), 0) == -1) { perror("msgsnd()"); exit(1); } }

接收端:

#include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <stdlib.h> #include <stdio.h> #include <string.h> #define FILEPATH "/home/tony" #define PROJID 1234 struct msgbuf { long mtype; char mtext[BUFSIZ]; }; int main() { int msgid; key_t key; struct msgbuf buf; key = ftok(FILEPATH, PROJID); if (key == -1) { perror("ftok()"); exit(1); } msgid = msgget(key, 0); if (msgid == -1) { perror("msgget()"); exit(1); } if (msgrcv(msgid, &buf, BUFSIZ, 1, 0) == -1) { perror("msgrcv()"); exit(1); } printf("mtype: %d\n", buf.mtype); printf("mtype: %s\n", buf.mtext); if (msgctl(msgid, IPC_RMID, NULL) == -1) { perror("msgctl()"); exit(1); } exit(0); }

四,POSIX消息队列 API

POSIX消息队列是独立于XSI消息队列的一套新的消息队列API,让进程可以用消息的方式进行数据交换。这套消息队列在Linux 2.6.6版本之后开始支持,还需要你的glibc版本必须高于2.3.4。POSIX消息队列和System V消息队列相比,有以下优点:

消息通知特性允许一个进程能在一条消息进入之前为空的消息队列时候,异步的通过信号或者线程的实例化接收通知

Linux下可以通过poll()、select()、epoll监控POSIX消息队列

4.1 打开、关闭、删除消息队列

mq_open()函数打开或创建一个posix消息队列。

#include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <mqueue.h> mqd_t mq_open(const char *name, int oflag); mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr); 

name:表示消息队列的名字,它符合POSIX IPC的名字规则。

oflag:表示打开的方式,和open函数的类似。有必须的选项:O_RDONLY,O_WRONLY,O_RDWR,还有可选的选项:O_NONBLOCK,O_CREAT,O_EXCL。

mode:是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时,才需要提供该参数。表示默认访问权限。可以参考ope

attr:也是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时才需要。该参数用于给新队列设定某些属性,如果是空指针,那么就采用默认属性。

mq_open返回值是mqd_t类型的值,被称为消息队列描述符。在Linux 2.6.18中该类型的定义为整型:

mq_close()函数关闭消息队列。关闭之后调用进程不在使用该描述符,但消息队列不会从系统中删除,进程终止时,会自动关闭已打开的消息队列,和调用mq_close一样。参数为mq_open()函数返回的值。

int mq_close(mqd_t mqdes); 

mq_unlink()函数从系统中删除某个消息队列。删除会马上发生,即使该队列的描述符引用计数仍然大于0。参数为mq_open()函数第一个参数。

int mq_unlink(const char *name); 

mq_setattr()函数和mq_getattr()函数分别设置和和获取消息队列属性。

int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr); 

参数attr、newattr、oldattr为消息队列属性结构体指针;

 struct mq_attr { long mq_flags; /* Flags: 0 or O_NONBLOCK */ long mq_maxmsg; /* Max. # of messages on queue */ long mq_msgsize; /* Max. message size (bytes) */ long mq_curmsgs; /* # of messages currently in queue */ }; 
  • 参数mq_flags在mq_open时被初始化(oflag参数),其值为0 或者 O_NONBLOCK。
  • 参数mq_maxmsg和mq_msgsize在mq_open时在参数attr中初始化设置,
  • mq_maxmsg是指队列的消息个数最大值;
  • mq_msgsize为队列每个消息的最大值。
  • 参数mq_curmsgs为当前队列消息。

4.2. 发送和接收消息队列

mq_send() 函数 和mq_receive()函数分别用于向消息队列放置和取走消息。

int mq_send(mqd_t mqdes,const char *msg_ptr,size_t msg_len,unsigned int msg_prio); //函数将msg_ptr指向的缓冲区中的消息添加到消息描述符mqdes所引用的消息队列中 //msg_len表示消息缓冲区中消息的长度,其值要小于或等于队列的mq_msgsize属性 //msg_prio表示优先级,数字越大优先级越高(若优先级相同则先入先出) ssize_t mq_receive(mqd_t mqdes,char *msg_ptr, size_t msg_len,unsigned int *msg_prio); //接收消息的时候,msg_len表示消息缓冲区中消息的长度,其值要大于或等于队列的mq_msgsize属性 //优先级可以设为NULL

如果消息队列已满,mq_send()函数将阻塞,直到队列有可用空间再次允许放置消息或该调用被信号打断;如果O_NONBLOCK被指定,mq_send()那么将不会阻塞,而是返回EAGAIN错误。如果队列空,mq_receive()函数将阻塞,直到消息队列中有新的消息;如果O_NONBLOCK被指定,mq_receive()那么将不会阻塞,而是返回EAGAIN错误。

还有两个mq_timedsend()和mq_timedreceive()几乎作用于前两个一样,唯一差别在于可设置一个消息超时设置。

4.3 消息通知

POSIX消息队列可以让进程接受之前为空的队列上有可用消息的异步通知。一种方法通过信号接收通知,另一种方法通过线程接收通知。

 int mq_notify(mqd_t mqdes,const struct sigevent *notification); //mq_notify函数注册调用进程在一条消息进入描述符所引用的空队列时接收通知

使用的时候,需要注意以下几点:

任何一个时刻只有一个进程能够向一个指定消息队列注册接收通知。

只有当一条新消息进入之前为空的队列的时候注册进程才会收到通知。

当向一个注册进程发送一个通知之后就会删除注册信息,之后任何进程就可以向该队列注册接收通知了。

4.4 实例

发送mq_send.c

#include <pthread.h> #include <mqueue.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ #include <string.h> int main(int argc, char *argv[]) { mqd_t mq_cmd; struct mq_attr attr; char msgbuffer[5]; memcpy(msgbuffer,"boots",5); /* Open the message queue for reading */ attr.mq_flags = 0; attr.mq_maxmsg = 10; attr.mq_msgsize = 20; attr.mq_curmsgs = 0; mq_cmd = mq_open("/mq_test", O_WRONLY|O_CREAT, 0666, &attr); //为什么要加 / ,否则打开失败 if (mq_cmd < 0){ printf("mq_open error: %d \n",mq_cmd); }else{ printf("mq_open success: %d \n",mq_cmd); } int nbytes = mq_send(mq_cmd, (char *)msgbuffer, sizeof(msgbuffer), 0); if (nbytes < 0){ printf("mq_send error: %d \n",nbytes); }else{ printf("mq_send success: %d \n",nbytes); } if (mq_close(mq_cmd) < 0){ printf("mq_close error! \n"); }else{ printf("mq_close success! \n"); } //mq_unlink("/mq_test"); }

接收mq_rec.c

#include <pthread.h> #include <mqueue.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ int main(int argc, char *argv[]) { mqd_t mq_cmd; struct mq_attr attr; char msgbuffer[5]; /* Open the message queue for reading */ attr.mq_flags = 0; attr.mq_maxmsg = 10; attr.mq_msgsize = 20; attr.mq_curmsgs = 0; mq_cmd = mq_open("/mq_test", O_RDONLY|O_CREAT, 0666, &attr); //为什么要加 / ,否则打开失败 if (mq_cmd < 0){ printf("mq_open error: %d \n",mq_cmd); }else{ printf("mq_open success: %d \n",mq_cmd); } int nbytes = mq_receive(mq_cmd,msgbuffer, 20, NULL); //这里的20, 表示长度大于或等于mq_msgsize,否则返回 -1 if (nbytes < 0){ printf("mq_receive error: %d \n",nbytes); }else{ printf("mq_receive success: %s\n",msgbuffer); } if (mq_close(mq_cmd) < 0){ printf("mq_close error! \n"); }else{ printf("mq_close success! \n"); } }

编译:-lrt ,-l 指的是链接库文件,rt为库的名字rt。也即是librt.a/librt.so

若不加-lrt,链接过程会报错。

设备查看

root@public-OptiPlex-3046:~# cat /dev/mqueue/mq_test QSIZE:0 NOTIFY:0 SIGNO:0 NOTIFY_PID:

5. 总结

本章主要是学习了消息队列的原理和机制, 并对应用提供的两种消息队列的使用方法进行了学习,后面会继续学习内核对于这两种消息队列的处理机制和区别。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/97737.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信