1、消息队列
- 消息队列能够接收来自线程或中断服务例程中不固定长度的消息,并把消息缓存在自己的内存空间中。
- 其他线程也能够从消息队列中读取相应的消息(最先进入消息队列的消息,即先进先出原则 (FIFO))。
而当消息队列是空的时候,可以挂起读取线程,当有新的消息到达时,挂起的线程将被唤醒以接收并处理消息。
struct rt_messagequeue
{
struct rt_ipc_object parent;
void* msg_pool; /* 指向存放消息的缓冲区的指针 */
rt_uint16_t msg_size; /* 每个消息的最大长度 */
rt_uint16_t max_msgs; /* 最大能够容纳的消息数 */
rt_uint16_t entry; /* 队列中已有的消息数 */
void* msg_queue_head; /* 消息链表头 */
void* msg_queue_tail; /* 消息链表尾 */
void* msg_queue_free; /* 空闲消息链表 */
rt_list_t suspend_sender_thread; /* 发送线程的挂起等待队列 */
};
typedef struct rt_messagequeue* rt_mq_t;
2、消息队列API
2.1、创建和删除消息队列
rt_err_t rt_mq_init(rt_mq_t mq,
const char* name,
void *msgpool,
rt_size_t msg_size,
rt_size_t pool_size,
rt_uint8_t flag);
参数 | 描述 |
---|
mq | 消息队列对象的句柄 | name | 消息队列的名称 | msgpool | 指向存放消息的缓冲区的指针 | msg_size | 消息队列中一条消息的最大长度,单位字节 | pool_size | 存放消息的缓冲区大小 | flag | 消息队列采用的等待方式,它可以取如下数值: RT_IPC_FLAG_FIFO 或 RT_IPC_FLAG_PRIO | 返回 | —— | RT_EOK | 成功 |
rt_err_t rt_mq_detach(rt_mq_t mq);
参数 | 描述 |
---|
mq | 消息队列对象的句柄 | 返回 | —— | RT_EOK | 成功 |
rt_mq_t rt_mq_create(const char* name,
rt_size_t msg_size,
rt_size_t max_msgs,
rt_uint8_t flag);
参数 | 描述 |
---|
name | 消息队列的名称 | msg_size | 消息队列中一条消息的最大长度,单位字节 | max_msgs | 消息队列的最大个数 | flag | 消息队列采用的等待方式,它可以取如下数值: RT_IPC_FLAG_FIFO 或 RT_IPC_FLAG_PRIO | 返回 | —— | 消息队列对象的句柄 | 成功 | RT_NULL | 失败 |
关于消息缓冲区申请的内存大小:单条消息最大长度msg_size要对齐,结构体rt_mq_message里面是一个指针(4个字节);
rt_err_t rt_mq_delete(rt_mq_t mq);
参数说明同rt_mq_detach();?
?2.2、发送消息?
2.2.1、无等待发送消息
从空闲消息链表上取下一个空闲消息块,把消息内容复制到消息块上,然后把该消息块挂到消息队列的尾部。
- 有可用的空闲消息块时,发送者才能成功发送消息;
- 无可用的空闲消息块,说明消息队列已满,返回RT_EFULL
rt_err_t rt_mq_send (rt_mq_t mq, void* buffer, rt_size_t size);
参数 | 描述 |
---|
mq | 消息队列对象的句柄 | buffer | 消息内容 | size | 消息大小 | 返回 | —— | RT_EOK | 成功 | -RT_EFULL | 消息队列已满 | -RT_ERROR | 失败,表示发送的消息长度大于消息队列中消息的最大长度 |
2.2.2、等待发送消息
rt_err_t rt_mq_send_wait(rt_mq_t mq,
const void *buffer,
rt_size_t size,
rt_int32_t timeout);
2.2.3、发送紧急消息
rt_err_t rt_mq_urgent(rt_mq_t mq, void* buffer, rt_size_t size);
?
2.3、接收消息
接收一个消息后消息队列上的队首消息被转移到了空闲消息链表的尾部。
rt_err_t rt_mq_recv (rt_mq_t mq, void* buffer,
rt_size_t size, rt_int32_t timeout);
参数 | 描述 |
---|
mq | 消息队列对象的句柄 | buffer | 消息内容 | size | 消息大小 | timeout | 指定的超时时间 | 返回 | —— | RT_EOK | 成功收到 | -RT_ETIMEOUT | 超时 | -RT_ERROR | 失败,返回错误 |
3、官方例程
就是一个线程定时发送消息,一个线程定时接收消息。
在第8次的时候插入了紧急消息,所以那之后的下一次接收消息时会先接收到紧急消息
#include <rtthread.h>
/* 消息队列控制块 */
static struct rt_messagequeue mq;
/* 消息队列中用到的放置消息的内存池 */
static rt_uint8_t msg_pool[2048];
ALIGN(RT_ALIGN_SIZE)
static char thread1_stack[1024];
static struct rt_thread thread1;
/* 线程1入口函数 */
static void thread1_entry(void *parameter)
{
char buf = 0;
rt_uint8_t cnt = 0;
while (1){
/* 从消息队列中接收消息 */
if (rt_mq_recv(&mq, &buf, sizeof(buf), RT_WAITING_FOREVER) == RT_EOK){
rt_kprintf("thread1: recv msg from msg queue, the content:%c\n", buf);
if (cnt == 19){
break;
}
}
/* 延时50ms */
cnt++;
rt_thread_mdelay(50);
}
rt_kprintf("thread1: detach mq \n");
rt_mq_detach(&mq);
}
ALIGN(RT_ALIGN_SIZE)
static char thread2_stack[1024];
static struct rt_thread thread2;
/* 线程2入口 */
static void thread2_entry(void *parameter)
{
int result;
char buf = 'A';
rt_uint8_t cnt = 0;
while (1){
if (cnt == 8){
/* 发送紧急消息到消息队列中 */
result = rt_mq_urgent(&mq, &buf, 1);
if (result != RT_EOK){
rt_kprintf("rt_mq_urgent ERR\n");
}else{
rt_kprintf("thread2: send urgent message - %c\n", buf);
}
}else if (cnt >= 20){/* 发送20次消息之后退出 */
rt_kprintf("message queue stop send, thread2 quit\n");
break;
}else{
/* 发送消息到消息队列中 */
result = rt_mq_send(&mq, &buf, 1);
if (result != RT_EOK){
rt_kprintf("rt_mq_send ERR\n");
}
rt_kprintf("thread2: send message - %c\n", buf);
}
buf++;
cnt++;
/* 延时5ms */
rt_thread_mdelay(5);
}
}
/* 消息队列示例的初始化 */
int msgq_sample(void)
{
rt_err_t result;
/* 初始化消息队列 */
result = rt_mq_init(&mq,
"mqt",
&msg_pool[0], /* 内存池指向msg_pool */
1, /* 每个消息的大小是 1 字节 */
sizeof(msg_pool), /* 内存池的大小是msg_pool的大小 */
RT_IPC_FLAG_FIFO); /* 如果有多个线程等待,按照先来先得到的方法分配消息 */
if (result != RT_EOK){
rt_kprintf("init message queue failed.\n");
return -1;
}
rt_thread_init(&thread1,"thread1",thread1_entry,RT_NULL,
&thread1_stack[0],sizeof(thread1_stack), 25, 5);
rt_thread_startup(&thread1);
rt_thread_init(&thread2,"thread2",thread2_entry,RT_NULL,
&thread2_stack[0],sizeof(thread2_stack), 25, 5);
rt_thread_startup(&thread2);
return 0;
}
/* 导出到 msh 命令列表中 */
MSH_CMD_EXPORT(msgq_sample, msgq sample);
|