Message Queue 使用场景
今天介绍一下消息队列,对此做个总结,酝酿了有一段时间,因为消息队列在开发中占据了一个很重要的地位。
一般这种系统设施,如果你是在 Linux
系统用户空间进行开发(内核开发除外的所有开发),需要懂得其原理、接口使用、消息框架构建机制、封装系统提供的 MQ
接口来对上层提供服务。如果你是从事内核开发,那还需要将这些系统设施的实现了解清楚。
我现在做的是系统应用层开发,就以我现在公司为例,开发的是嵌入式系统设备,之前公司消息机制使用的是 Linux
系统自带的消息队列,System V
那一套,后来因为系统资源确实匮乏,如果直接使用原生的消息队列机制,并发量太高的话,系统性能会下降(因为芯片太低端,而使用消息队列的功能模块在增加),所以自己开发了一套消息机制,实现思路以及对外接口和原来差别不大。本次主要介绍 System V
和 Posix
消息队列使用和与之相关的设计模式。
消息队列(Message Queue
),不仅是分布式系统中重要的组件, 也是我们客户机系统开发中的一个重要组件,特别是我们公司现在开发的系统,可以说完全是由消息驱动的系统,无论是进程间,还是进程内部,消息队列的发布/订阅
模式,支撑起了整个系统框架。
首先,消息队列的使用场景主要有以下几个:
异步处理
非核心流程异步化,提高系统响应性能。发布者只需将消息通知出去,然后就可以去做其他事情。程序解耦
对于不强依赖于非本系统的核心流程,可以放到消息队列中让消息消费者去按需消费,而不影响核心主流程,做到业务功能模块解耦广播
发布/订阅
模式,一个消息,可以多个对象定阅处理流量削峰与流控
对于网络秒杀活动,消息队列可做缓冲处理
1.请求先入消息队列,而不是由业务处理系统直接处理,做了一次缓冲,极大地减少了业务处理系统的压力;
2.队列长度可以做限制,事实上,秒杀时,后入队列的用户无法秒杀到商品,这些请求可以直接被抛弃,返回活动已结束或商品已售完信息;消息驱动的系统
- 避免直接调用下一个系统导致当前系统失败;
- 每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,可以选择定时处理,也可以划分时间段按不同处理速度处理;
- 通知其他系统做某些事情
以我现在公司的系统为例,上述场景除了流量削峰( 偏服务端 ),其他基本都有在系统中出现。公司开发的是通信终端系统,主要模块分为: GUI
, 网络, 声音,协议等, 各模块都是系统中单独的一个进程, 各个进程间需要通信,则通过发消息的方式,十分方便; 同时进程内部也通过消息队列,对相关业务量比较重的核心模块,进行代码重构、解耦, 把一些非核心流程剥离出去,通过发消息的方式通知对应模块进行处理。
封装的接口形式和 windows
端的消息机制接口类似:1
2
3
4msgPostMsgToThread() // 发送到特定线程
msgBroadpostThreadMsg() // 广播
etl_RegisterMsgHandle(TM_TIMER, TM_TIMER, &CStatusManager::OnTimerMsgProgress); //订阅
通过上述接口,我们可以很方便的通知其他进程,并携带上相应的数据,完成进程间通信。消息机制完全采用发布/订阅模式
。系统一启动,首先各个进程模块进行初始化操作,每个线程/进程会创建一个消息队列,进程中的各个模块按需调用 etl_RegisterMsgHandle()
接口,订阅消息。然后系统中各进程/线程间,会进行消息的定点发送或者广播,收到消息的进程则调用初始化时注册的 handle()
接口进行处理。
发布/订阅模式 (Publish-Subscribe)
首先介绍一下不包含消息队列的发布/订阅模式
,我们先引用一下 20 年前的一本经典著作 GoF 中对发布/订阅模式
的介绍, 发布/订阅模式
别名观察者(observer), 依赖(Dependents), 该模式是一种对象行为型模式.
模式意图:
- 定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并自动更新。
模式结构类图 (引用自: GOF设计模式解析)
主要角色:
- Subject: 目标
- Observer: 观察者
传统的观察者模式的优点:
- 观察者模式可以实现表示层和数据逻辑层的分离,并定义了稳定的消息更新传递机制,抽象了更新接口,使得可以有各种各样不同的表示层作为具体观察者角色。
- 观察者模式在观察目标和观察者之间建立一个抽象的耦合。
- 观察者模式支持广播通信。
- 观察者模式符合”开闭原则”的要求。
观察目标和观察者之间不是紧密耦合,可以说建立了一个抽象耦合,观察者是知道Subject的,Subject也一直保持对观察者进行记录。然而在包含消息队列的发布/订阅模型
中,发布者和订阅者并不知道对方的存在,它们只通过消息代理进行通信,组件是松散耦合的,而且观察者模式大多数时候是同步的,比如当事件触发,Subject就会去调用观察者的方法。而包含消息队列的发布-订阅模式
大多数时候是异步的(使用消息队列)。观察者模式需要在单个应用程序地址空间中实现,而发布-订阅更像交叉应用模式。
传统的观察者模式与包含消息队列的发布/订阅模型
模式关系类似下图:
POSIX 消息队列简单使用
System V API 和 POSIX API 提供了消息队列,我们主要介绍一下 POSIX
特点:
- POSIX 和 System V 实现中的消息队列都具有内核级持续性
POSIX
主要接口信息:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
main types:
1. mqd_t: 消息队列描述符
2. struct mq_attr: 消息队列属性结构,定义如下:
struct mq_attr {
long int mq_flags; /* Message queue flags. */
long int mq_maxmsg; /* Maximum number of messages. */
long int mq_msgsize; /* Maximum message size. */
long int mq_curmsgs; /* Number of messages currently queued. */
}
functions: mqd_t mq_open(const char *name, int flags, ... [ mode_t mode, struct mq_attr *mq_attr ])
description:name消息队列名字; flags 用来表示打开消息队列的权限组合; 如果是以创建方式打开,则需要设置相应
的访问权限mode; 设置消息队列属性 mq_attr.
return:返回消息描述符,或者(mqd_t)-1出错
functions: int mq_close(mdq_t mqdes)
return: 成功返回 0,错误返回 -1
functions: int mq_unlink(const char *name)
return: 成功返回 0,错误返回 -1
functions:int mq_send(mqd_t mqdes, const char *msgbuf, size_t len, unsigned int prio)
return: 成功返回 0,错误返回 -1
functions:size_t mq_receive(mqd_t mqdes, char *buf, size_t len, unsigned *prio)
return: 成功返回 0,或者(mqd_t)-1出错
functions: int mq_getattr(mqd_t mqdes, struct mq_attr *mq_attr)
return: 成功返回 0,或者(mqd_t)-1出错
functions:int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
return: 成功返回 0,或者(mqd_t)-1出错
functions:int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
return: 成功返回 0,或者(mqd_t)-1出错
functions:int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
description: 在有消息时注册通知
return:成功返回 0,或者(mqd_t)-1出错
注: 编译时需要链接 rt 库 (Link with -lrt.)
模拟使用场景:1
2
3
4* 模拟场景
* 1. 总共三个进程,一个程序负责创建消息队列,一个发送消息,最后一个负责接收
* 2. 接收程序使用 mq_notify 注册信号来处理消息从无到有的场景
*
具体实现代码如下:
cmake 程序
1
2
3
4
5
6
7
8
9
10
11
12
13cmake_minimum_required(VERSION 2.8)
add_definitions(-std=c++11)
project(message_queue C CXX)
aux_source_directory(. SRC_FILES)
add_executable(message_queue ${SRC_FILES})
target_link_libraries(message_queue
rt
)mq_create:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while (0);
int main(int argc, char * argv[])
{
mqd_t mqid;
mqid = mq_open("/zed", O_CREAT | O_RDWR, 0666, NULL);
if (mqid == (mqd_t) - 1)
{
ERR_EXIT("mq_open");
}
struct mq_attr attr;
mq_getattr(mqid, &attr);
printf("max msg numb: [%ld], mq_msgsize: [%ld] byte, current msg size: [%ld], mq_flags[%ld]",
attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs, attr.mq_flags);
mq_close(mqid);
return 0;
}mq_send:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
do \
{ \
perror(m); \
exit(EXIT_FAILURE); \
} while (0);
typedef struct stu
{
char data[32];
int index;
}STU;
int main(int argc, char * argv[])
{
mqd_t mqid;
mqid = mq_open("/zed", O_RDONLY);
if (mqid == (mqd_t) - 1)
{
ERR_EXIT("mq_open");
}
STU stu;
stu.index = 20;
strcpy(stu.data, "my name is yejy");
printf("stu.data [%s], stu.index [%d]\n", stu.data, stu.index);
mq_send(mqid, (const char*)&stu, sizeof(stu), 1);
struct mq_attr attr;
mq_getattr(mqid, &attr);
printf("max msg numb: [%ld], mq_msgsize: [%ld] byte, current msg size: [%ld], mq_flags[%ld] \n",
attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs, attr.mq_flags);
mq_close(mqid);
return 0;
}mq_receive:
1 |
|
总结
对于相关设计模式需要懂得其原理与实现,毕竟这些是前人总结出来的宝贵经验,对我们的软件设计思维非常有帮助;至于消息队列,主要介绍了一下其思想与用途,以及对 POSIX 的消息队列接口做了一个简单模拟实现,了解了消息队列的基本使用。当然,作为计算机系统中的一个重要组件,在后端开发中,有很多优秀的消息队列中间件,后续有机会从事相关开发,再进行深入了解,通过此次文章总结,如果下次遇到消息队列相关问题和开发任务,应该会比较得心应手。