简介
mq是linux系统的一种进程、线程间通信方式。它的效率还可以,一般会在mq发送时设定消息内容为内存地址进行通信,这样效率更高。如进程间mq通信,就发送共享内存的地址,进程内mq通信,就发送进程内普通地址。
在实际的应用中,遇到mq消息队列满的,接收方mq不接收,接收方任务不被唤醒的情况。 由于应用软件实现未知,所以需要根据现象反推应用软件的行为。要从底层反推上层的行为,就需要了解底层的流程。
应用模型
图 1 mq发送接收应用模型?
应用模型如上图所示,线程3阻塞休眠等待消息,线程1和线程2会不间断的发送消息给线程3。
源码查看
Linux mq的实现源码在ipc/mqueue.c中
系统调用
mq_send
glibc源码mq_send或mq_timedsend最终会经过linux内核系统调用mq_timedsend进入内核。
系统调用源码如下:
SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
size_t, msg_len, unsigned int, msg_prio,
const struct __kernel_timespec __user *, u_abs_timeout)
{
struct timespec64 ts, *p = NULL;
if (u_abs_timeout) {
int res = prepare_timeout(u_abs_timeout, &ts);
if (res)
return res;
p = &ts;
}
return do_mq_timedsend(mqdes, u_msg_ptr, msg_len, msg_prio, p);
}
由上面可知,mq发送的参数主要有mqdes(其实就是一个fd)、u_msg_ptr内容指针、msg_len内容长度、msg_prio消息优先级、u_abs_timeout超时时间。
其中,在glibc中,调用mq_timedsend系统调用的有两个接口:mq_send和mq_timedsend。
其中这两个函数的区别在于mq_send的u_abs_timeout参数为NULL。
上面会产生一个错误情况,当u_abs_timeout的地址不可用时返回-EFAULT,当超时值非法时,会返回-EINVAL。
mq_receive
glibc源码mq_receive或mq_timedreceive最终会经过linux内核系统调用mq_timedreceive进入内核。
SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
size_t, msg_len, unsigned int __user *, u_msg_prio,
const struct __kernel_timespec __user *, u_abs_timeout)
{
struct timespec64 ts, *p = NULL;
if (u_abs_timeout) {
int res = prepare_timeout(u_abs_timeout, &ts);
if (res)
return res;
p = &ts;
}
return do_mq_timedreceive(mqdes, u_msg_ptr, msg_len, u_msg_prio, p);
}
数据结构
?图2 mq发送、唤醒关键数据结构
每一个mq文件的i_node数据中都有一个struct mqueue_inode_info,这个结构中存储着mq通信的所有信息,如attr属性、msg存储、线程同步唤醒信息。
每一个mq为发送和接收都准备了struct ext_wait_queue,当线程1调用阻塞模式mq_receive休眠等待是,会添加一个struct ext_wait_queue的结构到struct mqueue_inode_info的接收队列中。当线程2调用mq_send发送消息到线程1时,会通过共享的struct mqueue_inode_info中的接收队列获取到线程1的信息,并唤醒线程1,线程1唤醒后收到消息。
内核发送接收流程
图3 mq发送、唤醒流程简图?
上图为mq发送、接收流程简图,发送接收主要依托于mq的公共结构struct mqueue_inode_info,mq_receive的时候会创建一个struct ext_wait_queue结构,并挂入struct mqueue_inode_info的接收队列中。当线程2发送消息到线程1时,会从struct mqueue_inode_info中获取之前线程1挂入的struct ext_wait_queue结构,并从中获取线程1的task_struct,并把发送的消息msg也填入struct ext_wait_queue结构,然后调用wake_up_q唤醒线程1,线程1被唤醒后,读取msg,并copy到接收的用户态地址中。
发送代码:
static int do_mq_timedsend(mqd_t mqdes, const char __user *u_msg_ptr,
size_t msg_len, unsigned int msg_prio,
struct timespec64 *ts)
{
struct fd f;
struct inode *inode;
struct ext_wait_queue wait;
struct ext_wait_queue *receiver;
struct msg_msg *msg_ptr;
struct mqueue_inode_info *info;
ktime_t expires, *timeout = NULL;
struct posix_msg_tree_node *new_leaf = NULL;
int ret = 0;
DEFINE_WAKE_Q(wake_q);
if (unlikely(msg_prio >= (unsigned long) MQ_PRIO_MAX))
return -EINVAL;
if (ts) {
expires = timespec64_to_ktime(*ts);
timeout = &expires;
}
audit_mq_sendrecv(mqdes, msg_len, msg_prio, ts);
f = fdget(mqdes);
if (unlikely(!f.file)) {
ret = -EBADF;
goto out;
}
inode = file_inode(f.file);
if (unlikely(f.file->f_op != &mqueue_file_operations)) {
ret = -EBADF;
goto out_fput;
}
info = MQUEUE_I(inode);
audit_file(f.file);
if (unlikely(!(f.file->f_mode & FMODE_WRITE))) {
ret = -EBADF;
goto out_fput;
}
if (unlikely(msg_len > info->attr.mq_msgsize)) {
ret = -EMSGSIZE;
goto out_fput;
}
/* First try to allocate memory, before doing anything with
* existing queues. */
msg_ptr = load_msg(u_msg_ptr, msg_len);
if (IS_ERR(msg_ptr)) {
ret = PTR_ERR(msg_ptr);
goto out_fput;
}
msg_ptr->m_ts = msg_len;
msg_ptr->m_type = msg_prio;
/*
* msg_insert really wants us to have a valid, spare node struct so
* it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
* fall back to that if necessary.
*/
if (!info->node_cache)
new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
spin_lock(&info->lock);
if (!info->node_cache && new_leaf) {
/* Save our speculative allocation into the cache */
INIT_LIST_HEAD(&new_leaf->msg_list);
info->node_cache = new_leaf;
new_leaf = NULL;
} else {
kfree(new_leaf);
}
if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) {
if (f.file->f_flags & O_NONBLOCK) {
ret = -EAGAIN;
} else {
wait.task = current;
wait.msg = (void *) msg_ptr;
wait.state = STATE_NONE;
ret = wq_sleep(info, SEND, timeout, &wait);
/*
* wq_sleep must be called with info->lock held, and
* returns with the lock released
*/
goto out_free;
}
} else {
receiver = wq_get_first_waiter(info, RECV);
if (receiver) {
pipelined_send(&wake_q, info, msg_ptr, receiver);
} else {
/* adds message to the queue */
ret = msg_insert(msg_ptr, info);
if (ret)
goto out_unlock;
__do_notify(info);
}
inode->i_atime = inode->i_mtime = inode->i_ctime =
current_time(inode);
}
out_unlock:
spin_unlock(&info->lock);
wake_up_q(&wake_q);
out_free:
if (ret)
free_msg(msg_ptr);
out_fput:
fdput(f);
out:
return ret;
}
走读上面代码,返回错误值的情况如下:
返回值 | 出现条件 | 备注 | -EINVAL | msg_prio >= MQ_PRIO_MAX 32768 | | -EBADF | 1. mqdes是被关闭的fd 2. mqdes不是mq文件的fd 3. mq_open时,mode不包含可写 | 如:线程1 mq_open的mqdes为10,在软件运行过程中,线程2 close 10,线程3打开一个普通的文件,重新分配到fd 10,这时线程1使用mqdes为10 ,mq_send 发送消息,这时就会返回-EBADF | -EMSGSIZE | 发送mq的内容长度大于attr.mq_msgsize消息最大长度 | | -ENOMEM | 内核分配msg内存失败时 | | -EFAULT | copy_from_user 失败时 | | -EAGAIN | mq属于O_NONBLOCK,且消息队列满时 | |
发送关键流程如下:
- receiver = wq_get_first_waiter(info, RECV);获取等待的struct ext_wait_queue
- pipelined_send(&wake_q, info, msg_ptr, receiver);摘取链表,并把msg指针填入
- wake_up_q(&wake_q);唤醒休眠的线程
接收代码?
?
static int do_mq_timedreceive(mqd_t mqdes, char __user *u_msg_ptr,
size_t msg_len, unsigned int __user *u_msg_prio,
struct timespec64 *ts)
{
ssize_t ret;
struct msg_msg *msg_ptr;
struct fd f;
struct inode *inode;
struct mqueue_inode_info *info;
struct ext_wait_queue wait;
ktime_t expires, *timeout = NULL;
struct posix_msg_tree_node *new_leaf = NULL;
if (ts) {
expires = timespec64_to_ktime(*ts);
timeout = &expires;
}
audit_mq_sendrecv(mqdes, msg_len, 0, ts);
f = fdget(mqdes);
if (unlikely(!f.file)) {
ret = -EBADF;
goto out;
}
inode = file_inode(f.file);
if (unlikely(f.file->f_op != &mqueue_file_operations)) {
ret = -EBADF;
goto out_fput;
}
info = MQUEUE_I(inode);
audit_file(f.file);
if (unlikely(!(f.file->f_mode & FMODE_READ))) {
ret = -EBADF;
goto out_fput;
}
/* checks if buffer is big enough */
if (unlikely(msg_len < info->attr.mq_msgsize)) {
ret = -EMSGSIZE;
goto out_fput;
}
/*
* msg_insert really wants us to have a valid, spare node struct so
* it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
* fall back to that if necessary.
*/
if (!info->node_cache)
new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);
spin_lock(&info->lock);
if (!info->node_cache && new_leaf) {
/* Save our speculative allocation into the cache */
INIT_LIST_HEAD(&new_leaf->msg_list);
info->node_cache = new_leaf;
} else {
kfree(new_leaf);
}
if (info->attr.mq_curmsgs == 0) {
if (f.file->f_flags & O_NONBLOCK) {
spin_unlock(&info->lock);
ret = -EAGAIN;
} else {
wait.task = current;
wait.state = STATE_NONE;
ret = wq_sleep(info, RECV, timeout, &wait);
msg_ptr = wait.msg;
}
} else {
DEFINE_WAKE_Q(wake_q);
msg_ptr = msg_get(info);
inode->i_atime = inode->i_mtime = inode->i_ctime =
current_time(inode);
/* There is now free space in queue. */
pipelined_receive(&wake_q, info);
spin_unlock(&info->lock);
wake_up_q(&wake_q);
ret = 0;
}
if (ret == 0) {
ret = msg_ptr->m_ts;
if ((u_msg_prio && put_user(msg_ptr->m_type, u_msg_prio)) ||
store_msg(u_msg_ptr, msg_ptr, msg_ptr->m_ts)) {
ret = -EFAULT;
}
free_msg(msg_ptr);
}
out_fput:
fdput(f);
out:
return ret;
}
接收关键流程如下:
1.????? ret = wq_sleep(info, RECV, timeout, &wait); 挂入struct ext_wait_queue结构,进程休眠
2.????? 等待mq发送线程的唤醒
3.????? store_msg(u_msg_ptr, msg_ptr, msg_ptr->m_ts)接收消息。
问题分析
在实际的应用中,遇到mq问题,结合图1的mq发送接收应用模型
情况1:消息队列满,接收方线程3 mq接收不到消息,线程3不调度,最后一次调用在wq_sleep
情况2:会有多个线程、进程给同一个mq发送的情况
情况3:可以使用信号触发线程3任务被调度,并接收完队列中的消息
情况4:使用信号触发接收完消息后,线程1、线程2持续发送消息,线程3停止接收消息,消息队列消息数增长直到最大值
排查如下:
- 排查mq对应的struct mqueue_inode_info的struct ext_wait_queue e_wait_q[RECV]中是否有链入的成员,发现没有,说明出问题时,发送不会找到接收任务,不会去唤醒
- 停止线程1、线程2发送消息,信号触发线程3接收所有消息后,线程3重新进入休眠等待状态
- 再次查一下struct mqueue_inode_info的struct ext_wait_queue e_wait_q[RECV]中是否有链入的成员,发现有,说明线程3状态正常回到了等待唤醒状态
- 手动触发线程1发送消息,发现线程3还时没有收到消息,线程3调度次数不增加
- 再次查一下struct mqueue_inode_info的struct ext_wait_queue e_wait_q[RECV]中是否有链入的成员,发现没有,说明线程1在发送消息时,已经把步骤3中的struct ext_wait_queue成员摘取,但是没有成功唤醒线程3
- 结合图3的流程图,发现问题可能出在唤醒线程3的操作wake_up_q,wake_up_q与wake_q_add是成对出现的。
- 尝试手动调用wake_up_process(),可以唤醒线程3,说明唤醒本身没有问题,那么问题就出在wake_up_q的那个while (node != WAKE_Q_TAIL)的判断中了。
- 根据wake_up_q代码,当线程3的wake_q->next为NULL时,wake_up_q的那个while才会进去,同时会把wake_q->next设置为WAKE_Q_TAIL。当线程3的wake_q->next为WAKE_Q_TAIL时,说明已经有正在唤醒等待任务的执行流,wake_up_q的那个while不会进去,唤醒线程3的操作就不会被执行。
- 查看线程3的wake_q->next,发现为WAKE_Q_TAIL,这样的话,每次发往线程3的消息都不会被唤醒。
?wake_up_q与wake_q_add一般是成对出现的。
void wake_q_add(struct wake_q_head *head, struct task_struct *task)
{
struct wake_q_node *node = &task->wake_q;
/*
* Atomically grab the task, if ->wake_q is !nil already it means
* its already queued (either by us or someone else) and will get the
* wakeup due to that.
*
* In order to ensure that a pending wakeup will observe our pending
* state, even in the failed case, an explicit smp_mb() must be used.
*/
smp_mb__before_atomic();
if (cmpxchg_relaxed(&node->next, NULL, WAKE_Q_TAIL))
return;
get_task_struct(task);
/*
* The head is context local, there can be no concurrency.
*/
*head->lastp = node;
head->lastp = &node->next;
}
void wake_up_q(struct wake_q_head *head)
{
struct wake_q_node *node = head->first;
while (node != WAKE_Q_TAIL) {
struct task_struct *task;
task = container_of(node, struct task_struct, wake_q);
BUG_ON(!task);
/* Task can safely be re-inserted now: */
node = node->next;
task->wake_q.next = NULL;
/*
* wake_up_process() implies a wmb() to pair with the queueing
* in wake_q_add() so as not to miss wakeups.
*/
wake_up_process(task);
put_task_struct(task);
}
}
?
在do_mq_timedsend开始时,声明了局部变量wake_q并初始化,在wake_q_add的正常流程中,等待任务的wake_q的next会被设置为WAKE_Q_TAIL,局部变量wake_q的first会指向等待任务的wake_q。在wake_up_q函数中,会判断局部变量wake_q的first是否为WAKE_Q_TAIL,如果是会唤醒等待任务,并设置等待任务的wake_q的next会被设置为NULL。
所以上面问题的结论是等待任务线程3 的task_struct->wake_q->next被设置为WAKE_Q_TAIL,但是没有执行wake_up_q中的task_struct->wake_q->next设置为NULL,没有成对执行。
可能导致上面task_struct->wake_q->next被设置为WAKE_Q_TAIL和设置为NULL没有被成对执行的原因可能有:
- 线程1在mq_send过程中已经设置线程3的task_struct->wake_q->next被设置为WAKE_Q_TAIL,但是线程1被抢占,没有机会执行下面的task_struct->wake_q->next被设置为NULL
- 其他流程中设置线程3的task_struct->wake_q->next被设置为WAKE_Q_TAIL,但是没有给其设置NULL,wake_up_q与wake_q_add没有成对出现的。
- cmpxchg_relaxed(&node->next, NULL, WAKE_Q_TAIL)的原子性出错了,这种情况我还没想通
?
?
?
|