IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> Linux mq在内核的发送、唤醒流程简介 -> 正文阅读

[系统运维]Linux mq在内核的发送、唤醒流程简介

简介

mq是linux系统的一种进程、线程间通信方式。它的效率还可以,一般会在mq发送时设定消息内容为内存地址进行通信,这样效率更高。如进程间mq通信,就发送共享内存的地址,进程内mq通信,就发送进程内普通地址。

在实际的应用中,遇到mq消息队列满的,接收方mq不接收,接收方任务不被唤醒的情况。 由于应用软件实现未知,所以需要根据现象反推应用软件的行为。要从底层反推上层的行为,就需要了解底层的流程。

应用模型

图 1 mq发送接收应用模型?

应用模型如上图所示,线程3阻塞休眠等待消息,线程1和线程2会不间断的发送消息给线程3。

源码查看

Linux mq的实现源码在ipc/mqueue.c中

系统调用

mq_send

glibc源码mq_send或mq_timedsend最终会经过linux内核系统调用mq_timedsend进入内核。

系统调用源码如下:

SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
		size_t, msg_len, unsigned int, msg_prio,
		const struct __kernel_timespec __user *, u_abs_timeout)
{
	struct timespec64 ts, *p = NULL;
	if (u_abs_timeout) {
		int res = prepare_timeout(u_abs_timeout, &ts);
		if (res)
			return res;
		p = &ts;
	}
	return do_mq_timedsend(mqdes, u_msg_ptr, msg_len, msg_prio, p);
}

由上面可知,mq发送的参数主要有mqdes(其实就是一个fd)、u_msg_ptr内容指针、msg_len内容长度、msg_prio消息优先级、u_abs_timeout超时时间。

其中,在glibc中,调用mq_timedsend系统调用的有两个接口:mq_send和mq_timedsend。

其中这两个函数的区别在于mq_send的u_abs_timeout参数为NULL。

上面会产生一个错误情况,当u_abs_timeout的地址不可用时返回-EFAULT,当超时值非法时,会返回-EINVAL。

mq_receive

glibc源码mq_receive或mq_timedreceive最终会经过linux内核系统调用mq_timedreceive进入内核。

SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
		size_t, msg_len, unsigned int __user *, u_msg_prio,
		const struct __kernel_timespec __user *, u_abs_timeout)
{
	struct timespec64 ts, *p = NULL;
	if (u_abs_timeout) {
		int res = prepare_timeout(u_abs_timeout, &ts);
		if (res)
			return res;
		p = &ts;
	}
	return do_mq_timedreceive(mqdes, u_msg_ptr, msg_len, u_msg_prio, p);
}

数据结构

?图2 mq发送、唤醒关键数据结构

每一个mq文件的i_node数据中都有一个struct mqueue_inode_info,这个结构中存储着mq通信的所有信息,如attr属性、msg存储、线程同步唤醒信息。

每一个mq为发送和接收都准备了struct ext_wait_queue,当线程1调用阻塞模式mq_receive休眠等待是,会添加一个struct ext_wait_queue的结构到struct mqueue_inode_info的接收队列中。当线程2调用mq_send发送消息到线程1时,会通过共享的struct mqueue_inode_info中的接收队列获取到线程1的信息,并唤醒线程1,线程1唤醒后收到消息。

内核发送接收流程

图3 mq发送、唤醒流程简图?

上图为mq发送、接收流程简图,发送接收主要依托于mq的公共结构struct mqueue_inode_info,mq_receive的时候会创建一个struct ext_wait_queue结构,并挂入struct mqueue_inode_info的接收队列中。当线程2发送消息到线程1时,会从struct mqueue_inode_info中获取之前线程1挂入的struct ext_wait_queue结构,并从中获取线程1的task_struct,并把发送的消息msg也填入struct ext_wait_queue结构,然后调用wake_up_q唤醒线程1,线程1被唤醒后,读取msg,并copy到接收的用户态地址中。

发送代码:

static int do_mq_timedsend(mqd_t mqdes, const char __user *u_msg_ptr,
		size_t msg_len, unsigned int msg_prio,
		struct timespec64 *ts)
{
	struct fd f;
	struct inode *inode;
	struct ext_wait_queue wait;
	struct ext_wait_queue *receiver;
	struct msg_msg *msg_ptr;
	struct mqueue_inode_info *info;
	ktime_t expires, *timeout = NULL;
	struct posix_msg_tree_node *new_leaf = NULL;
	int ret = 0;
	DEFINE_WAKE_Q(wake_q);

	if (unlikely(msg_prio >= (unsigned long) MQ_PRIO_MAX))
		return -EINVAL;

	if (ts) {
		expires = timespec64_to_ktime(*ts);
		timeout = &expires;
	}

	audit_mq_sendrecv(mqdes, msg_len, msg_prio, ts);

	f = fdget(mqdes);
	if (unlikely(!f.file)) {
		ret = -EBADF;
		goto out;
	}

	inode = file_inode(f.file);
	if (unlikely(f.file->f_op != &mqueue_file_operations)) {
		ret = -EBADF;
		goto out_fput;
	}
	info = MQUEUE_I(inode);
	audit_file(f.file);

	if (unlikely(!(f.file->f_mode & FMODE_WRITE))) {
		ret = -EBADF;
		goto out_fput;
	}

	if (unlikely(msg_len > info->attr.mq_msgsize)) {
		ret = -EMSGSIZE;
		goto out_fput;
	}

	/* First try to allocate memory, before doing anything with
	 * existing queues. */
	msg_ptr = load_msg(u_msg_ptr, msg_len);
	if (IS_ERR(msg_ptr)) {
		ret = PTR_ERR(msg_ptr);
		goto out_fput;
	}
	msg_ptr->m_ts = msg_len;
	msg_ptr->m_type = msg_prio;

	/*
	 * msg_insert really wants us to have a valid, spare node struct so
	 * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
	 * fall back to that if necessary.
	 */
	if (!info->node_cache)
		new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);

	spin_lock(&info->lock);

	if (!info->node_cache && new_leaf) {
		/* Save our speculative allocation into the cache */
		INIT_LIST_HEAD(&new_leaf->msg_list);
		info->node_cache = new_leaf;
		new_leaf = NULL;
	} else {
		kfree(new_leaf);
	}

	if (info->attr.mq_curmsgs == info->attr.mq_maxmsg) {
		if (f.file->f_flags & O_NONBLOCK) {
			ret = -EAGAIN;
		} else {
			wait.task = current;
			wait.msg = (void *) msg_ptr;
			wait.state = STATE_NONE;
			ret = wq_sleep(info, SEND, timeout, &wait);
			/*
			 * wq_sleep must be called with info->lock held, and
			 * returns with the lock released
			 */
			goto out_free;
		}
	} else {
		receiver = wq_get_first_waiter(info, RECV);
		if (receiver) {
			pipelined_send(&wake_q, info, msg_ptr, receiver);
		} else {
			/* adds message to the queue */
			ret = msg_insert(msg_ptr, info);
			if (ret)
				goto out_unlock;
			__do_notify(info);
		}
		inode->i_atime = inode->i_mtime = inode->i_ctime =
				current_time(inode);
	}
out_unlock:
	spin_unlock(&info->lock);
	wake_up_q(&wake_q);
out_free:
	if (ret)
		free_msg(msg_ptr);
out_fput:
	fdput(f);
out:
	return ret;
}

走读上面代码,返回错误值的情况如下:

返回值

出现条件

备注

-EINVAL

msg_prio >= MQ_PRIO_MAX 32768

-EBADF

1. mqdes是被关闭的fd

2. mqdes不是mq文件的fd

3. mq_open时,mode不包含可写

如:线程1 mq_open的mqdes为10,在软件运行过程中,线程2 close 10,线程3打开一个普通的文件,重新分配到fd 10,这时线程1使用mqdes为10 ,mq_send 发送消息,这时就会返回-EBADF

-EMSGSIZE

发送mq的内容长度大于attr.mq_msgsize消息最大长度

-ENOMEM

内核分配msg内存失败时

-EFAULT

copy_from_user 失败时

-EAGAIN

mq属于O_NONBLOCK,且消息队列满时

发送关键流程如下:

  1. receiver = wq_get_first_waiter(info, RECV);获取等待的struct ext_wait_queue
  2. pipelined_send(&wake_q, info, msg_ptr, receiver);摘取链表,并把msg指针填入
  3. wake_up_q(&wake_q);唤醒休眠的线程

接收代码?

?

static int do_mq_timedreceive(mqd_t mqdes, char __user *u_msg_ptr,
		size_t msg_len, unsigned int __user *u_msg_prio,
		struct timespec64 *ts)
{
	ssize_t ret;
	struct msg_msg *msg_ptr;
	struct fd f;
	struct inode *inode;
	struct mqueue_inode_info *info;
	struct ext_wait_queue wait;
	ktime_t expires, *timeout = NULL;
	struct posix_msg_tree_node *new_leaf = NULL;

	if (ts) {
		expires = timespec64_to_ktime(*ts);
		timeout = &expires;
	}

	audit_mq_sendrecv(mqdes, msg_len, 0, ts);

	f = fdget(mqdes);
	if (unlikely(!f.file)) {
		ret = -EBADF;
		goto out;
	}

	inode = file_inode(f.file);
	if (unlikely(f.file->f_op != &mqueue_file_operations)) {
		ret = -EBADF;
		goto out_fput;
	}
	info = MQUEUE_I(inode);
	audit_file(f.file);

	if (unlikely(!(f.file->f_mode & FMODE_READ))) {
		ret = -EBADF;
		goto out_fput;
	}

	/* checks if buffer is big enough */
	if (unlikely(msg_len < info->attr.mq_msgsize)) {
		ret = -EMSGSIZE;
		goto out_fput;
	}

	/*
	 * msg_insert really wants us to have a valid, spare node struct so
	 * it doesn't have to kmalloc a GFP_ATOMIC allocation, but it will
	 * fall back to that if necessary.
	 */
	if (!info->node_cache)
		new_leaf = kmalloc(sizeof(*new_leaf), GFP_KERNEL);

	spin_lock(&info->lock);

	if (!info->node_cache && new_leaf) {
		/* Save our speculative allocation into the cache */
		INIT_LIST_HEAD(&new_leaf->msg_list);
		info->node_cache = new_leaf;
	} else {
		kfree(new_leaf);
	}

	if (info->attr.mq_curmsgs == 0) {
		if (f.file->f_flags & O_NONBLOCK) {
			spin_unlock(&info->lock);
			ret = -EAGAIN;
		} else {
			wait.task = current;
			wait.state = STATE_NONE;
			ret = wq_sleep(info, RECV, timeout, &wait);
			msg_ptr = wait.msg;
		}
	} else {
		DEFINE_WAKE_Q(wake_q);

		msg_ptr = msg_get(info);

		inode->i_atime = inode->i_mtime = inode->i_ctime =
				current_time(inode);

		/* There is now free space in queue. */
		pipelined_receive(&wake_q, info);
		spin_unlock(&info->lock);
		wake_up_q(&wake_q);
		ret = 0;
	}
	if (ret == 0) {
		ret = msg_ptr->m_ts;

		if ((u_msg_prio && put_user(msg_ptr->m_type, u_msg_prio)) ||
			store_msg(u_msg_ptr, msg_ptr, msg_ptr->m_ts)) {
			ret = -EFAULT;
		}
		free_msg(msg_ptr);
	}
out_fput:
	fdput(f);
out:
	return ret;
}

接收关键流程如下:

1.????? ret = wq_sleep(info, RECV, timeout, &wait); 挂入struct ext_wait_queue结构,进程休眠

2.????? 等待mq发送线程的唤醒

3.????? store_msg(u_msg_ptr, msg_ptr, msg_ptr->m_ts)接收消息。

问题分析

在实际的应用中,遇到mq问题,结合图1的mq发送接收应用模型

情况1:消息队列满,接收方线程3 mq接收不到消息,线程3不调度,最后一次调用在wq_sleep

情况2:会有多个线程、进程给同一个mq发送的情况

情况3:可以使用信号触发线程3任务被调度,并接收完队列中的消息

情况4:使用信号触发接收完消息后,线程1、线程2持续发送消息,线程3停止接收消息,消息队列消息数增长直到最大值

排查如下:

  1. 排查mq对应的struct mqueue_inode_info的struct ext_wait_queue e_wait_q[RECV]中是否有链入的成员,发现没有,说明出问题时,发送不会找到接收任务,不会去唤醒
  2. 停止线程1、线程2发送消息,信号触发线程3接收所有消息后,线程3重新进入休眠等待状态
  3. 再次查一下struct mqueue_inode_info的struct ext_wait_queue e_wait_q[RECV]中是否有链入的成员,发现有,说明线程3状态正常回到了等待唤醒状态
  4. 手动触发线程1发送消息,发现线程3还时没有收到消息,线程3调度次数不增加
  5. 再次查一下struct mqueue_inode_info的struct ext_wait_queue e_wait_q[RECV]中是否有链入的成员,发现没有,说明线程1在发送消息时,已经把步骤3中的struct ext_wait_queue成员摘取,但是没有成功唤醒线程3
  6. 结合图3的流程图,发现问题可能出在唤醒线程3的操作wake_up_q,wake_up_q与wake_q_add是成对出现的。
  7. 尝试手动调用wake_up_process(),可以唤醒线程3,说明唤醒本身没有问题,那么问题就出在wake_up_q的那个while (node != WAKE_Q_TAIL)的判断中了。
  8. 根据wake_up_q代码,当线程3的wake_q->next为NULL时,wake_up_q的那个while才会进去,同时会把wake_q->next设置为WAKE_Q_TAIL。当线程3的wake_q->next为WAKE_Q_TAIL时,说明已经有正在唤醒等待任务的执行流,wake_up_q的那个while不会进去,唤醒线程3的操作就不会被执行。
  9. 查看线程3的wake_q->next,发现为WAKE_Q_TAIL,这样的话,每次发往线程3的消息都不会被唤醒。

?wake_up_q与wake_q_add一般是成对出现的。

void wake_q_add(struct wake_q_head *head, struct task_struct *task)
{
	struct wake_q_node *node = &task->wake_q;

	/*
	 * Atomically grab the task, if ->wake_q is !nil already it means
	 * its already queued (either by us or someone else) and will get the
	 * wakeup due to that.
	 *
	 * In order to ensure that a pending wakeup will observe our pending
	 * state, even in the failed case, an explicit smp_mb() must be used.
	 */
	smp_mb__before_atomic();
	if (cmpxchg_relaxed(&node->next, NULL, WAKE_Q_TAIL))
		return;

	get_task_struct(task);

	/*
	 * The head is context local, there can be no concurrency.
	 */
	*head->lastp = node;
	head->lastp = &node->next;
}

void wake_up_q(struct wake_q_head *head)
{
	struct wake_q_node *node = head->first;

	while (node != WAKE_Q_TAIL) {
		struct task_struct *task;

		task = container_of(node, struct task_struct, wake_q);
		BUG_ON(!task);
		/* Task can safely be re-inserted now: */
		node = node->next;
		task->wake_q.next = NULL;

		/*
		 * wake_up_process() implies a wmb() to pair with the queueing
		 * in wake_q_add() so as not to miss wakeups.
		 */
		wake_up_process(task);
		put_task_struct(task);
	}
}

?

在do_mq_timedsend开始时,声明了局部变量wake_q并初始化,在wake_q_add的正常流程中,等待任务的wake_q的next会被设置为WAKE_Q_TAIL,局部变量wake_q的first会指向等待任务的wake_q。在wake_up_q函数中,会判断局部变量wake_q的first是否为WAKE_Q_TAIL,如果是会唤醒等待任务,并设置等待任务的wake_q的next会被设置为NULL。

所以上面问题的结论是等待任务线程3 的task_struct->wake_q->next被设置为WAKE_Q_TAIL,但是没有执行wake_up_q中的task_struct->wake_q->next设置为NULL,没有成对执行。

可能导致上面task_struct->wake_q->next被设置为WAKE_Q_TAIL和设置为NULL没有被成对执行的原因可能有:

  1. 线程1在mq_send过程中已经设置线程3的task_struct->wake_q->next被设置为WAKE_Q_TAIL,但是线程1被抢占,没有机会执行下面的task_struct->wake_q->next被设置为NULL
  2. 其他流程中设置线程3的task_struct->wake_q->next被设置为WAKE_Q_TAIL,但是没有给其设置NULL,wake_up_q与wake_q_add没有成对出现的。
  3. cmpxchg_relaxed(&node->next, NULL, WAKE_Q_TAIL)的原子性出错了,这种情况我还没想通

?

?

?

  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2022-10-08 21:17:28  更:2022-10-08 21:21:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 18:37:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码