4. 消息队列

Linux下的进程通信手段基本上是从Unix平台上的进程通信手段继承而来的。 而对Unix发展做出重大贡献的两大主力AT&T的贝尔实验室 以及 BSD(加州大学伯克利分校的伯克利软件发布中心), 他们在进程间通信方面的侧重点有所不同;

  • 前者对Unix早期的进程间通信手段进行了系统的改进和扩充, 形成了“system-V IPC”,通信进程局限在单个计算机内(同一个设备的不同进程间通讯);

  • 而后者则跳过了该限制,形成了基于套接字(socket)的进程间通信机制(多用于不同设备的进程间通讯)。 Linux则把两者继承了下来,所以说Linux才是最成功的,既有“system-V IPC”,又支持“socket”。

System-V IPC包含消息队列、共享内存、信号量三类对象,均通过key键值唯一标识, 内核用标识符(msgid/shmid/semid)管理,属于持续性资源,不会随进程退出自动销毁,需手动调用接口或命令释放。

4.1. 消息队列的基本概念

消息队列是内核维护的链式消息缓冲区,每个消息包含类型+数据体,进程可按消息类型精准收发, 无需遵循先进先出强制顺序,核心特性如下:

  • 异步通信:发送方无需等待接收方,消息存入队列后即可继续执行,接收方按需读取;

  • 消息结构化:支持带类型的自定义数据块,解决管道字节流粘包问题,开发更便捷;

  • 独立生命周期:进程退出后队列仍留存,直至手动删除或系统重启;

  • 按需读取:可指定消息类型接收,无需按入队顺序读取,灵活性远超管道;

  • 容量受限:受内核参数限制,单条消息、队列总容量均有上限;

  • 半双工传输:数据单向流转,双向通信需创建两个队列。

4.2. 消息队列应用场景

  • 多进程任务分发:主进程按任务类型发送消息,子进程按需接收处理;

  • 异步日志收集:业务进程发送日志消息,日志进程统一读取写入文件;

  • 跨进程指令传递:无亲缘进程间的结构化指令、数据交互;

  • 解耦模块通信:避免进程强依赖,实现异步模块化协作。

4.3. 消息队列原理

4.3.1. Key与消息队列标识符

消息队列通过key键值实现多进程访问同一队列,内核通过msgid管理队列实例,二者分工明确:

  • key键值:用户层标识,由ftok函数生成或手动指定,保证多进程访问同一队列;

  • msgid:内核层标识,msgget返回的队列ID,后续收发、控制操作均依赖该ID。

4.3.2. 生命周期与内核管理

消息队列由内核创建并维护,属于持续性IPC资源:

  1. 进程通过msgget创建/获取队列,内核分配msgid并初始化消息链表;

  2. 进程通过msgsnd/msgrcv完成消息收发,消息存储于内核缓冲区;

  3. 所有进程退出后,队列仍留存内核,直至调用msgctl(IPC_RMID)或ipcrm命令删除。

可通过ipcs命令查看系统IPC对象,ipcs -q仅查看消息队列,ipcrm -q <msgid>删除指定队列。

4.4. 消息队列函数说明

Linux内核提供了一系列函数来使用消息队列:

  • 其中创建或打开消息队列使用的函数是msgget(),这里创建的消息队列的数量会受到系统可支持的消息队列数量的限制;

  • 发送消息使用的函数是msgsnd()函数,它把消息发送到已打开的消息队列末尾;

  • 接收消息使用的函数是msgrcv(),它把消息从消息队列中取走,与FIFO 不同的是,这里可以指定取走某一条消息;

  • 最后控制消息队列使用的函数是msgctl(),它可以完成多项功能。

4.4.1. ftok函数

ftok用于通过文件路径+项目ID生成唯一key,避免手动指定key的冲突问题,是消息队列开发的规范前置操作。

1
2
3
4
5
6
7
8
9
#include <sys/types.h>
#include <sys/ipc.h>

key_t ftok(const char *pathname, int proj_id);

// 参数:
// pathname:存在的文件路径
// proj_id:项目ID,8位非0值
// 返回值:成功返回key_t类型键值,失败返回-1
  • 路径的作用:提供一个 “稳定的唯一标识源”——路径对应的文件/目录在文件系统中有唯一的inode号;

  • proj_id的作用:同一文件可以通过不同proj_id生成不同key,比如一个文件对应多个IPC资源。

4.4.2. msgget函数

msgget函数的作用是创建或获取一个消息队列对象, 并返回消息队列标识符。函数原型如下:

1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

若执行成功返回队列ID,失败返回-1。 它的两个输入参数说明如下:

  • key:消息队列的关键字值,多个进程可以通过它访问同一个消息队列。 例如收发进程都使用同一个键值即可使用同一个消息队列进行通讯。 其中有个特殊值IPC_PRIVATE,它用于创建当前进程的私有消息队列。

  • msgflg:表示创建的消息队列的模式标志参数,主要有IPC_CREAT,IPC_EXCL和权限mode,

    • 如果是 IPC_CREAT 为真表示:如果内核中不存在关键字与key相等的消息队列,则新建一个消息队列; 如果存在这样的消息队列,返回此消息队列的标识符。

    • 而如果为 IPC_CREAT | IPC_EXCL 表示如果内核中不存在键值与key相等的消息队列,则新建一个消息队列; 如果存在这样的消息队列则报错。

    • mode指IPC对象存取权限,它使用Linux文件的数字权限表示方式,如0600,0666等。

    这些参数是可以通过“|”运算符联合起来的,因为它始终是int类型的参数。如msgflag使用参数 IPC_CREAT | 0666 时表示, 创建或返回已经存在的消息队列的标识符,且该消息队列的存取权限为0666, 即消息的所有者,所属组用户,其他用户均可对该消息进行读写。

注意

  • 选项 msgflg 是一个位掩码,因此 IPC_CREAT、IPC_EXCL 和权限 mode 可以用位或的方式叠加起来, 比如: msgget(key, IPC_CREAT | 0666); 表示如果 key 对应的消息队列不存在就创建, 且权限指定为 0666,若已存在则直接获取消息队列ID,此处的0666使用的是Linux文件权限的数字表示方式。

  • 权限只有读和写,执行权限是无效的,例如 0777 跟 0666 是等价的。

  • 当 key 被指定为 IPC_PRIVATE 时,系统会自动产生一个未用的 key 来对应一个新的消息队列对象, 这个消息队列一般用于进程内部间的通信。

  • 该函数可能返回以下错误代码:

    • EACCES:指定的消息队列已存在,但调用进程没有权限访问它

    • EEXIST:key指定的消息队列已存在,而msgflg中同时指定IPC_CREAT和IPC_EXCL标志

    • ENOENT:key指定的消息队列不存在同时msgflg中没有指定IPC_CREAT标志

    • ENOMEM:需要建立消息队列,但内存不足

    • ENOSPC:需要建立消息队列,但已达到系统的限制

4.4.3. msgsnd函数

这个函数的主要作用就是将消息写入到消息队列,俗称发送一个消息。函数原型如下:

1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

参数说明:

  • msqid:消息队列标识符。

  • msgp:发送给队列的消息。msgp可以是任何类型的结构体,但第一个字段必须为long类型, 即表明此发送消息的类型,msgrcv()函数则根据此接收消息。msgp定义的参照格式如下:

    /*msgp定义的参照格式*/
    struct s_msg{
        long type;  /* 必须大于0,消息类型 */
        char mtext[];  /* 消息正文,可以是其他任何类型 */
    } msgp;
    
    • msgsz:要发送消息的大小,不包含消息类型占用的4个字节,即mtext的长度。

    • msgflg:如果为0则表示:当消息队列满时,msgsnd()函数将会阻塞,直到消息能写进消息队列; 如果为IPC_NOWAIT则表示:当消息队列已满的时候,msgsnd()函数不等待立即返回; 如果为IPC_NOERROR:若发送的消息大于size字节,则把该消息截断,截断部分将被丢弃,且不通知发送进程。

  • 返回值:如果成功则返回0,如果失败则返回-1,并且错误原因存于error中。错误代码:

    • EAGAIN:参数msgflg设为IPC_NOWAIT,而消息队列已满。

    • EIDRM:标识符为msqid的消息队列已被删除。

    • EACCESS:无权限写入消息队列。

    • EFAULT:参数msgp指向无效的内存地址。

    • EINTR:队列已满而处于等待情况下被信号中断。

    • EINVAL:无效的参数msqid、msgsz或参数消息类型type小于0。

msgsnd()为阻塞函数,当消息队列容量满或消息个数满会阻塞。若消息队列已被删除,则返回EIDRM错误; 若被信号中断返回E_INTR错误。

如果设置IPC_NOWAIT消息队列满或个数满时会返回-1,并且置EAGAIN错误。

msgsnd()解除阻塞的条件有以下三个条件:

  • 消息队列中有容纳该消息的空间。

  • msqid代表的消息队列被删除。

  • 调用msgsnd函数的进程被信号中断。

4.4.4. msgrcv函数

msgrcv函数是从标识符为msqid的消息队列读取消息并将消息存储到msgp中, 读取后把此消息从消息队列中删除,也就是俗话说的接收消息。函数原型:

1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

参数说明:

  • msqid:消息队列标识符。

  • msgp:存放消息的结构体,结构体类型要与msgsnd()函数发送的类型相同。

  • msgsz:要接收消息的大小,不包含消息类型占用的4个字节。

  • msgtyp有多个可选的值:如果为0则表示接收第一个消息,如果大于0则表示接收类型等于msgtyp的第一个消息, 而如果小于0则表示接收类型等于或者小于msgtyp绝对值的第一个消息。

  • msgflg用于设置接收的处理方式,取值情况如下:

    • 0: 阻塞式接收消息,没有该类型的消息msgrcv函数一直阻塞等待

    • IPC_NOWAIT:若在消息队列中并没有相应类型的消息可以接收,则函数立即返回,此时错误码为ENOMSG

    • IPC_EXCEPT:与msgtype配合使用返回队列中第一个类型不为msgtype的消息

    • IPC_NOERROR:如果队列中满足条件的消息内容大于所请求的size字节,则把该消息截断,截断部分将被丢弃

  • 返回值:msgrcv()函数如果接收消息成功则返回实际读取到的消息数据长度,否则返回-1,错误原因存于error中。错误代码:

    • E2BIG:消息数据长度大于msgsz而msgflag没有设置IPC_NOERROR

    • EIDRM:标识符为msqid的消息队列已被删除

    • EACCESS:无权限读取该消息队列

    • EFAULT:参数msgp指向无效的内存地址

    • ENOMSG:参数msgflg设为IPC_NOWAIT,而消息队列中无消息可读

    • EINTR:等待读取队列内的消息情况下被信号中断

msgrcv()函数解除阻塞的条件也有三个:

  • 消息队列中有了满足条件的消息。

  • msqid代表的消息队列被删除。

  • 调用msgrcv()函数的进程被信号中断。

4.4.5. msgctl函数

消息队列是可以被用户操作的,比如设置或者获取消息队列的相关属性,那么可以通过msgctl()函数去处理它。函数原型:

1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

参数说明:

  • msqid:消息队列标识符。

  • cmd 用于设置使用什么操作命令,它的取值有多个:

    • IPC_STAT 获取该 MSG 的信息,获取到的信息会储存在结构体 msqid_ds类型的buf中。

    • IPC_SET 设置消息队列的属性,要设置的属性需先存储在结构体msqid_ds类型的buf中, 可设置的属性包括:msg_perm.uid、msg_perm.gid、msg_perm.mode以及msg_qbytes,储存在结构体msqid_ds中。

    • IPC_RMID 立即删除该 MSG,并且唤醒所有阻塞在该 MSG上的进程,同时忽略第三个参数。

    • IPC_INFO 获得关于当前系统中 MSG 的限制值信息。

    • MSG_INFO 获得关于当前系统中 MSG 的相关资源消耗信息。

    • MSG_STAT 同 IPC_STAT,但 msgid为该消息队列在内核中记录所有消息队列信息的数组的下标, 因此通过迭代所有的下标可以获得系统中所有消息队列的相关信息。

  • buf:相关信息结构体缓冲区。

    • 返回值:

    • 成功:0

    • 出错:-1,错误原因存于error中,错误代码:

      • EACCESS:参数cmd为IPC_STAT,确无权限读取该消息队列。

      • EFAULT:参数buf指向无效的内存地址。

      • EIDRM:标识符为msqid的消息队列已被删除。

      • EINVAL:无效的参数cmd或msqid。

      • EPERM:参数cmd为IPC_SET或IPC_RMID,却无足够的权限执行。

4.5. 消息队列示例

接下来通过示例来讲解消息队列的使用,使用方法一般是:

发送者:

  1. 获取消息队列的 ID

  2. 将数据放入一个附带有标识的特殊的结构体,发送给消息队列。

接收者:

  1. 获取消息队列的 ID

  2. 将指定标识的消息读出。

当发送者和接收者都不再使用消息队列时,及时删除它以释放系统资源。

本次实验主要是两个进程(无血缘关系的进程)通过消息队列进行消息的传递, 一个进程发送消息,一个进程接收消息,并将其打印出来。

4.5.1. 发送进程

本示例的发送进程代码如下:

消息队列发送进程(base_code/system_programing/msg/msg_send/sources/msg.c文件)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

#define BUFFER_SIZE 512
#define FTOK_PATH "/opt"    // 定义ftok的路径,发送和接收需一致,即inode号一致
#define FTOK_PROJ_ID 100  // 自定义项目标识

struct message
{
    long msg_type;
    char msg_text[BUFFER_SIZE];
};

int main()
{
    int qid;
    key_t key;
    struct message msg;

    /* 1. 用ftok生成和接收端相同的IPC键值 */
    if ((key = ftok(FTOK_PATH, FTOK_PROJ_ID)) == -1)
    {
        perror("ftok failed");
        exit(1);
    }
    printf("Generate key via ftok: %x\n", key);

    /* 2. 获取消息队列 */
    if ((qid = msgget(key, IPC_CREAT|0666)) == -1)
    {
        perror("msgget failed");
        exit(1);
    }

    printf("Open queue %d\n", qid);

    while(1)
    {
        printf("Enter some message to the queue: ");
        if ((fgets(msg.msg_text, BUFFER_SIZE, stdin)) == NULL)
        {
            printf("\nGet message end.\n");
            exit(1);
        }

        msg.msg_type = getpid();  // 消息类型设为发送进程的PID
        /* 添加消息到消息队列 */
        if ((msgsnd(qid, &msg, strlen(msg.msg_text), 0)) < 0)
        {
            perror("Send message error");
            exit(1);
        }
        else
        {
            printf("Send message success.\n");
        }

        if (strncmp(msg.msg_text, "quit", 4) == 0)
        {
            printf("\nQuit send message.\n");
            break;
        }
    }

    exit(0);
}

本代码重点说明如下:

  • 第34行,调用msgget()函数创建/获取了一个使用ftok生成的key值的消息队列,该队列的属性“0666”表示任何人都可读写, 创建/获取到的队列ID存储在变量qid中。

  • 第53行,调用msgsnd()函数把进程号以及前面用户输入的字符串,通过msg结构体添加到前面得到的qid队列中。

  • 第63行,若用户发送的消息为quit,那么退出循环结束进程。

4.5.2. 接收进程

接收进程示例如下:

消息队列接收进程(base_code/system_programing/msg/msg_recv/sources/msg.c文件)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

#define BUFFER_SIZE 512
#define FTOK_PATH "/opt"    // 定义ftok的路径,发送和接收需一致,即inode号一致
#define FTOK_PROJ_ID 100  // 自定义项目标识

struct message
{
    long msg_type;
    char msg_text[BUFFER_SIZE];
};

int main()
{
    int qid;
    key_t key;  // 存储ftok生成的键值
    struct message msg;

    /* 1. 用ftok生成IPC键值 */
    if ((key = ftok(FTOK_PATH, FTOK_PROJ_ID)) == -1)
    {
        perror("ftok failed");
        exit(1);
    }
    printf("Generate key via ftok: %x\n", key);  // 打印生成的键值

    /* 2. 创建/获取消息队列 */
    if ((qid = msgget(key, IPC_CREAT|0666)) == -1)
    {
        perror("msgget failed");
        exit(1);
    }

    printf("Open queue %d\n", qid);

    do
    {
        /* 读取消息队列 */
        memset(msg.msg_text, 0, BUFFER_SIZE);

        if (msgrcv(qid, (void*)&msg, BUFFER_SIZE, 0, 0) < 0)
        {
            perror("msgrcv failed");
            exit(1);
        }

        printf("The message from process %ld : %s", msg.msg_type, msg.msg_text);

    } while(strncmp(msg.msg_text, "quit", 4));

    /* 从系统内核中删除消息队列 */
    if ((msgctl(qid, IPC_RMID, NULL)) < 0)
    {
        perror("msgctl failed");
        exit(1);
    }
    else
    {
        printf("Delete msg qid: %d.\n", qid);
    }

    exit(0);
}

本代码重点说明如下:

  • 第34行,调用msgget()函数创建/获取队列qid。可以注意到,此处跟发送进程是完全一样的,无论哪个进程先运行, 若某一key值的队列不存在则创建,把以实验时两个进程并没有先后启动顺序的要求。

  • 第47行,在循环中调用msgrcv()函数接收qid队列的msg结构体消息,此处使用阻塞方式接收, 若队列中没有消息,会停留在本行代码等待。

  • 第58行,若前面接收到用户的消息为quit,会退出循环,在本行代码调用msgctl()删除消息队列并退出本进程。

4.5.2.1. 编译及测试

示例代码分别位于配套代码仓库/system_programing/msg/的msg_send及msg_recv目录下, 将两个进程编译出来,分别运行即可,实验现象如下:

4.5.2.1.1. 发送进程

在发送消息进程运行的时候,会提示让你输入要发送的消息,随便什么消息都可以的,使用回车完成消息的输入。 输入quit或使用Ctrl+D、Ctrl+C可结束进程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# 以下操作在 system_programing/msg/msg_send 代码目录进行
make

# 运行
./build/msg_send_demo

# 输入消息测试
Generate key via ftok: 64020001
Open queue 5570562
Enter some message to the queue: 你好
Send message success.
Enter some message to the queue: test
Send message success.
Enter some message to the queue: hello world
Send message success.
Enter some message to the queue: quit
Send message success.

Quit send message.

可以通过 ipcs -q 命令来查看系统中存在的消息队列,若以上队列没有关闭,它的查看结果如下:

1
2
3
4
5
6
7
8
9
# 查询系统当前存在的队列
ipcs -q

# 信息输出如下
------ Message Queues --------
key        msqid      owner      perms      used-bytes   messages
0x64020001 5570562    guest      666        29           4

# 可查看到key键值64020001,qid 5570562与进程中创建的一致。
4.5.2.1.2. 接收进程

打开一个新终端,编译及运行接收消息进程,当你从发送消息进程输入消息时(按下回车键发送), 接收消息进程会打印出你输入的消息,若无消息则接收进程会阻塞等待,接收到quit消息会退出进程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 以下操作在 system_programing/msg/msg_recv 代码目录进行
make

# 运行
./build/msg_recv_demo

# 接收到的消息
Generate key via ftok: 64020001
Open queue 5570562
The message from process 21698 : 你好
The message from process 21698 : test
The message from process 21698 : hello world
The message from process 21698 : quit
Delete msg qid: 5570562.

小技巧

在本例子中,若发送进程不是通过quit消息退出(如Ctrl+C或Ctrl+D),则不会触发接收进程主动删除消息队列, 在这种情况下可通过 ipcs -q 命令查看到该消息队列依然存在,通过 ipcrm -q [消息队列qid] 即可删除。

4.6. 消息队列与信号、管道的对比

IPC方式对比

IPC方式

通信范围

数据格式

阻塞特性

生命周期

适用场景

信号

任意进程

仅信号编号,无自定义数据

异步无阻塞

随进程

异步通知、异常处理

匿名管道

亲缘进程

无格式字节流

默认阻塞

随进程

简单父子进程通信

命名管道

任意进程

无格式字节流

默认阻塞

随文件

非亲缘简单数据传输

消息队列

任意进程

带类型结构化消息

阻塞/非阻塞可选

内核持续性

异步、按需、结构化数据传输