进程间通信-消息队列

Message Queue 使用场景

今天介绍一下消息队列,对此做个总结,酝酿了有一段时间,因为消息队列在开发中占据了一个很重要的地位。

一般这种系统设施,如果你是在 Linux 系统用户空间进行开发(内核开发除外的所有开发),需要懂得其原理、接口使用、消息框架构建机制、封装系统提供的 MQ 接口来对上层提供服务。如果你是从事内核开发,那还需要将这些系统设施的实现了解清楚。

我现在做的是系统应用层开发,就以我现在公司为例,开发的是嵌入式系统设备,之前公司消息机制使用的是 Linux 系统自带的消息队列,System V 那一套,后来因为系统资源确实匮乏,如果直接使用原生的消息队列机制,并发量太高的话,系统性能会下降(因为芯片太低端,而使用消息队列的功能模块在增加),所以自己开发了一套消息机制,实现思路以及对外接口和原来差别不大。本次主要介绍 System VPosix 消息队列使用和与之相关的设计模式

消息队列(Message Queue),不仅是分布式系统中重要的组件, 也是我们客户机系统开发中的一个重要组件,特别是我们公司现在开发的系统,可以说完全是由消息驱动的系统,无论是进程间,还是进程内部,消息队列的发布/订阅模式,支撑起了整个系统框架

首先,消息队列的使用场景主要有以下几个:

  • 异步处理
    非核心流程异步化,提高系统响应性能。发布者只需将消息通知出去,然后就可以去做其他事情。

  • 程序解耦
    对于不强依赖于非本系统的核心流程,可以放到消息队列中让消息消费者去按需消费,而不影响核心主流程,做到业务功能模块解耦

  • 广播
    发布/订阅模式,一个消息,可以多个对象定阅处理

  • 流量削峰与流控
    对于网络秒杀活动,消息队列可做缓冲处理
    1.请求先入消息队列,而不是由业务处理系统直接处理,做了一次缓冲,极大地减少了业务处理系统的压力;
    2.队列长度可以做限制,事实上,秒杀时,后入队列的用户无法秒杀到商品,这些请求可以直接被抛弃,返回活动已结束或商品已售完信息;

  • 消息驱动的系统

    1. 避免直接调用下一个系统导致当前系统失败;
    2. 每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,可以选择定时处理,也可以划分时间段按不同处理速度处理;
    3. 通知其他系统做某些事情

以我现在公司的系统为例,上述场景除了流量削峰( 偏服务端 ),其他基本都有在系统中出现。公司开发的是通信终端系统,主要模块分为: GUI, 网络, 声音,协议等, 各模块都是系统中单独的一个进程, 各个进程间需要通信,则通过发消息的方式,十分方便; 同时进程内部也通过消息队列,对相关业务量比较重的核心模块,进行代码重构、解耦, 把一些非核心流程剥离出去,通过发消息的方式通知对应模块进行处理

封装的接口形式和 windows 端的消息机制接口类似:

1
2
3
4
msgPostMsgToThread() // 发送到特定线程
msgBroadpostThreadMsg() // 广播

etl_RegisterMsgHandle(TM_TIMER, TM_TIMER, &CStatusManager::OnTimerMsgProgress); //订阅

通过上述接口,我们可以很方便的通知其他进程,并携带上相应的数据,完成进程间通信。消息机制完全采用发布/订阅模式。系统一启动,首先各个进程模块进行初始化操作,每个线程/进程会创建一个消息队列,进程中的各个模块按需调用 etl_RegisterMsgHandle() 接口,订阅消息。然后系统中各进程/线程间,会进行消息的定点发送或者广播,收到消息的进程则调用初始化时注册的 handle() 接口进行处理。

发布/订阅模式 (Publish-Subscribe)

avatar

首先介绍一下不包含消息队列的发布/订阅模式,我们先引用一下 20 年前的一本经典著作 GoF 中对发布/订阅模式的介绍, 发布/订阅模式别名观察者(observer), 依赖(Dependents), 该模式是一种对象行为型模式.

模式意图

  • 定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并自动更新。

模式结构类图 (引用自: GOF设计模式解析)
主要角色:

  • Subject: 目标
  • Observer: 观察者
    avatar

传统的观察者模式的优点:

  • 观察者模式可以实现表示层和数据逻辑层的分离,并定义了稳定的消息更新传递机制,抽象了更新接口,使得可以有各种各样不同的表示层作为具体观察者角色。
  • 观察者模式在观察目标和观察者之间建立一个抽象的耦合。
  • 观察者模式支持广播通信。
  • 观察者模式符合”开闭原则”的要求。

观察目标和观察者之间不是紧密耦合,可以说建立了一个抽象耦合,观察者是知道Subject的,Subject也一直保持对观察者进行记录。然而在包含消息队列的发布/订阅模型中,发布者和订阅者并不知道对方的存在,它们只通过消息代理进行通信,组件是松散耦合的,而且观察者模式大多数时候是同步的,比如当事件触发,Subject就会去调用观察者的方法。而包含消息队列的发布-订阅模式大多数时候是异步的(使用消息队列)。观察者模式需要在单个应用程序地址空间中实现,而发布-订阅更像交叉应用模式

传统的观察者模式与包含消息队列的发布/订阅模型模式关系类似下图:
avatar

POSIX 消息队列简单使用

System V API 和 POSIX API 提供了消息队列,我们主要介绍一下 POSIX

特点:

  • POSIX 和 System V 实现中的消息队列都具有内核级持续性

POSIX 主要接口信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include: mqueue.h (sys/stat.h 用于在创建队列时使用权限宏)
main types:
1. mqd_t: 消息队列描述符
2. struct mq_attr: 消息队列属性结构,定义如下:
struct mq_attr {
long int mq_flags; /* Message queue flags. */
long int mq_maxmsg; /* Maximum number of messages. */
long int mq_msgsize; /* Maximum message size. */
long int mq_curmsgs; /* Number of messages currently queued. */
}

functions: mqd_t mq_open(const char *name, int flags, ... [ mode_t mode, struct mq_attr *mq_attr ])
description:name消息队列名字; flags 用来表示打开消息队列的权限组合; 如果是以创建方式打开,则需要设置相应
的访问权限mode; 设置消息队列属性 mq_attr.
return:返回消息描述符,或者(mqd_t-1出错

functions: int mq_close(mdq_t mqdes)
return: 成功返回 0,错误返回 -1

functions: int mq_unlink(const char *name)
return: 成功返回 0,错误返回 -1

functions:int mq_send(mqd_t mqdes, const char *msgbuf, size_t len, unsigned int prio)
return: 成功返回 0,错误返回 -1

functions:size_t mq_receive(mqd_t mqdes, char *buf, size_t len, unsigned *prio)
return: 成功返回 0,或者(mqd_t-1出错

functions: int mq_getattr(mqd_t mqdes, struct mq_attr *mq_attr)
return: 成功返回 0,或者(mqd_t-1出错

functions:int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
return: 成功返回 0,或者(mqd_t-1出错

functions:int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
return: 成功返回 0,或者(mqd_t-1出错

functions:int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
description: 在有消息时注册通知
return:成功返回 0,或者(mqd_t-1出错

注: 编译时需要链接 rt 库 (Link with -lrt.)

模拟使用场景:

1
2
3
4
* 模拟场景
* 1. 总共三个进程,一个程序负责创建消息队列,一个发送消息,最后一个负责接收
* 2. 接收程序使用 mq_notify 注册信号来处理消息从无到有的场景
*

具体实现代码如下:

  1. cmake 程序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    cmake_minimum_required(VERSION 2.8)

    add_definitions(-std=c++11)

    project(message_queue C CXX)

    aux_source_directory(. SRC_FILES)

    add_executable(message_queue ${SRC_FILES})

    target_link_libraries(message_queue
    rt
    )
  2. mq_create:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    #include <iostream>
    #include "errno.h"
    #include <fcntl.h> /* For O_* constants */
    #include <sys/stat.h> /* For mode constants */
    #include <mqueue.h>

    #define ERR_EXIT(m) \
    do \
    { \
    perror(m); \
    exit(EXIT_FAILURE); \
    } while (0);

    int main(int argc, char * argv[])
    {
    mqd_t mqid;
    mqid = mq_open("/zed", O_CREAT | O_RDWR, 0666, NULL);

    if (mqid == (mqd_t) - 1)
    {
    ERR_EXIT("mq_open");
    }

    struct mq_attr attr;
    mq_getattr(mqid, &attr);

    printf("max msg numb: [%ld], mq_msgsize: [%ld] byte, current msg size: [%ld], mq_flags[%ld]",
    attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs, attr.mq_flags);

    mq_close(mqid);
    return 0;
    }
  3. mq_send:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    #include <iostream>
    #include "errno.h"
    #include <fcntl.h> /* For O_* constants */
    #include <sys/stat.h> /* For mode constants */
    #include <mqueue.h>
    #include <stdio.h>
    #include <string.h>

    #define ERR_EXIT(m) \
    do \
    { \
    perror(m); \
    exit(EXIT_FAILURE); \
    } while (0);

    typedef struct stu
    {
    char data[32];
    int index;
    }STU;

    int main(int argc, char * argv[])
    {
    mqd_t mqid;
    mqid = mq_open("/zed", O_RDONLY);

    if (mqid == (mqd_t) - 1)
    {
    ERR_EXIT("mq_open");
    }

    STU stu;
    stu.index = 20;
    strcpy(stu.data, "my name is yejy");

    printf("stu.data [%s], stu.index [%d]\n", stu.data, stu.index);
    mq_send(mqid, (const char*)&stu, sizeof(stu), 1);

    struct mq_attr attr;
    mq_getattr(mqid, &attr);

    printf("max msg numb: [%ld], mq_msgsize: [%ld] byte, current msg size: [%ld], mq_flags[%ld] \n",
    attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs, attr.mq_flags);

    mq_close(mqid);
    return 0;
    }
  4. mq_receive:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <iostream>
#include "errno.h"
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>
#include <signal.h>

#define ERR_EXIT(m) \
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while (0);

typedef struct stu
{
char data[32];
int index;
}STU;

struct sigevent sigev;
mqd_t mqid;
size_t size;

void handle_sigusr1(int sig)
{
mq_notify(mqid, &sigev); // need registered again
STU stu;

unsigned int prio;
if(mq_receive(mqid, (char*)&stu, size, &prio) == (mqd_t) -1)
{
ERR_EXIT("mq_receive");
}

printf("data=[%s], index = [%d], prio = [%d]", stu.data, stu.index, prio);
}

int main(int argc, char * argv[])
{
mqid = mq_open("/zed", O_RDONLY);

if (mqid == (mqd_t) - 1)
{
ERR_EXIT("mq_open");
}

struct mq_attr attr;
mq_getattr(mqid, &attr);
size = attr.mq_msgsize;

printf("max msg numb: [%ld], mq_msgsize: [%ld] byte, current msg size: [%ld], mq_flags[%ld]\n",
attr.mq_maxmsg,
attr.mq_msgsize,
attr.mq_curmsgs,
attr.mq_flags);

// signal
signal(SIGUSR1, handle_sigusr1);

sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;

mq_notify(mqid, &sigev); // Message from nothing, notify handle

for (;;); // loop keep process execute

mq_close(mqid);
return 0;
}

总结

对于相关设计模式需要懂得其原理与实现,毕竟这些是前人总结出来的宝贵经验,对我们的软件设计思维非常有帮助;至于消息队列,主要介绍了一下其思想与用途,以及对 POSIX 的消息队列接口做了一个简单模拟实现,了解了消息队列的基本使用。当然,作为计算机系统中的一个重要组件,在后端开发中,有很多优秀的消息队列中间件,后续有机会从事相关开发,再进行深入了解,通过此次文章总结,如果下次遇到消息队列相关问题和开发任务,应该会比较得心应手。