IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> Linux 中断子系统(四)-Workqueue -> 正文阅读

[系统运维]Linux 中断子系统(四)-Workqueue

1、数据结构


????????1)Workqueue?工作队列是利用内核线程来异步执行工作任务的通用机制;
????????2)Workqueue?工作队列可以用作中断处理的Bottom-half机制,利用进程上下文来执行中断处理中耗时的任务,因此它允许睡眠,而Softirg和Tasklet在处理任务时不能睡眠;
????????3)在中断处理过程中,或者其他子系统中,调用workqueue的调度或入队接口后,通过建立好的链接关系图逐级找到合适的worker,最终完成工作任务的执行
????????4)关键的数据结构:
????????????????work_struct:工作队列调度的最小单位,work?item;
????????????????workqueue_struct:工作队列,work?item都挂入到工作队列中;
????????????????worker:work?item的处理者,每个worker对应一个内核线程;
????????????????worker_pool:worker池(内核线程池),是一个共享资源池,提供不同的worker来对work?item进行处理;
????????????????pool_workqueue:充当桥梁纽带的作用,用于连接workqueue和worker_pool,建立链接关系


2、workqueue子系统初始化


workqueue?子系统的初始化分成两步来完成的:workqueue_init_early和workqueue_init


2.1.1?workqueue_init_early()

workqueue_init_early()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //? ? ?kernel\Workqueue.c

?????????----→KMEM_CACHE(pool_workqueue,SLAB_PANIC)?//? 为pool_workqueue?分配?slab?缓存

????????----→init_worker_pool(pool)? ? ? ? ? ? ? ? ?//? ? ?为每个cpu?创建?worker_pool

????????----→worker_pool_assign_id(pool)? ? ? ? ? ? //? ? ? 为worker_pool?分配id

????????----→alloc_workqueue_attrs()? ? ? ? ? ? ? ? //? ? ?为?unbound?工作队列创建默认属性

????????----→system_wq?=alloc_workqueue();?????????//? ? ?kernel?创建默认的消息队列? ? ? ? ?????????????system_highpri_wq=alloc_workqueue();
?????????????system_long_wq=alloc_workqueue();
?????????????system_unbound_wq=alloc_workqueue(();
?????????????system_freezable_wq?=alloc_workqueue();


alloc_workqueue()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??//? ? include\linux\Workqueue.h

alloc_workqueue()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//? ? include\linux\Workqueue.h

????????----→kzalloc(sizeof(*wq)+tbl_size,GFP_KERNEL)? //? ? 分配内存

????????----→wq->flags=flags;? ? ? ? ? ? ? ? ? ? ? ? ? //? ? 初始化wq

????????????????wq->saved_max_active?=?max_active;?

????????----→alloc_and_link_pwqs(wq)? ? ? ? ? ? ? ?//? ? 分配?pool_workqueue?并建立连接

????????----→!(wq->flags?&?WQ_UNBOUND)

? ? ? ? ????????---Y--→alloc_percpu(struct?pool_workqueue)? ?//? ? 为每个CPU分配?pool_workqueue

????????????????------→init_pwq(pwq,wq,&cpu_pools[highpri])? ?//? ? 初始化pool_workqueue
????????????????------→link_pwq(pwq)                   //? ? 链接?pool_workqueue?和wq
                --N--→apply_workqueue_attrs(wq,ordered_wq_attrs(highpri])
                        //? ? 将新的?workqueue_attrs?应用于未绑定的工作队列 
 ???????????????    -------→apply_workqueue_attrs_locked(wq,attrs)
 ???????????????        -------→apply_wqattrs_prepare(wq,attrs)
                            //? ? 分配attrs和pwq(pool_workqueue)
 ???????????????            -------→alloc_unbound_pwq(wq,tmp_attrs)
                                //? ? 获取匹配@attr?的池并创建一个将池和@wq?相关联的pwq
                                -------→get_unbound_pool(attrs)
                                -------→init_pwq(pwq,wq,pool)
 ???????????????        -------→apply_wqattrs_commit(ctx)
                            //? ? 设置wq熟悉,并将wq和pwq建立连接
 ???????????????            -------→numa_pwq_tbl_install(ctx->wq,node,ctx->pwq_tbl[node])
                                -------→link_pwq(pwq)
????????----→init_rescuer(wq)
                //? ? wq?内存紧张时的处理逻辑
????????----→list_add_tail_rcu(&wq->list,&workqueues);
                //? ? 将创建的wq添加到全局链表中

alloc_workqueue?完成的主要工作包括:
????????1)首先当然是要分配一个?struct?workqueue_struct的数据结构,并且对该结构中的字段进行初始化操作;
????????2)workqueue?最终需要和worker_pool?关联起来,而这个纽带就是?pool_workqueue,alloc_and_link_pwqs函数就是完成这个功能:

????????????????A)如果工作队列是绑定到CPU上的,则为每个CPU都分配pool_workqueue?并且初始化,通过link_pwq将工作队列与pool_workqueue?建立连接;

????????????????B)如果工作队列不绑定到CPU上,则按内存节点(NUMA)来分配pool_workqueue,用?get_unbound_pool来实现,它会根据wq属性先去查找,如果没有找到相同的就创建一个新的pool_workqueue,并且添加到?unbound_pool_hash?哈希表中,最后也会调用link_pwq来建立连接;
????????????????C)创建工作队列时,如果设置了WQ_MEM_RECLAIM标志,则会新建?rescuer?worker,对应?rescuer_thread内核线程。当内存紧张时,新创建worker可能会失败,这时候由rescuer来处理这种情况;
????????????????D)最终将新建好的工作队列添加到全局链表workqueues中;


2.1.2?workqueue_init()

workqueue_init()                    //  让wq子系统完全在线-kernel\Workqueue.c
    ---→wq_numa_init()             //   对于numa系统,初始化?cpumask和wq属性
    ---→list_for_each_entry(wq,&workqueues,list)
        wq_update_unbound_numa(wq,smp_processor_id(),true)    //    更新?numa?affinity
    ---→for_each_online_cpu(cpu)
            for_each_cpu_worker_pool(pool,cpu)
                create_worker(pool)                  //    为worker_pool?创建初始?worker
    ---→hash_for_each(unbound_pool_hash,bkt,pool,hash_node)
        create_worker(pool)        //    create?a?new?workqueue?worker
            ----→alloc_worker(pool->node)        //    分配?worker?结构体
            ----→kthread_create_on_node(worker_thread,...)//    为worker?创建内核线程
            ----→worker_attach_to_pool(worker,pool)
                //   将?worker?添加到?worker_pool?中
            ----→worker_enter_idle(worker)
                //   worker?进入IDLE?状态
    ---→wq_watchdog_init()
                //   初始化工作队列监视程序

????????create?worker函数中,创建的内核线程名字为?kworker/xxcYY?或者kworker/uxx:YY,其中xx表示worker_pool的编号,YY表示worker的编号,u表示?unbound;workqueue子系统初始化完成后,基本就已经将数据结构的关联建立好了,当有work来进行调度的时候,就可以进行处理了


3、work?调度


3.1?schedule_work

schedule_work                        //  将work?task提交到全局?workqueue
    ----→queue_work(system_wq,work)
        ----→queue_work_on
                 //  queue?work?on?specific?cpu-kernel\Workqueue.c
            ----→!test_and_set_bit(WORK_STRUCT_PENDING_BIT,work_data_bits(work))
                //  判断work是否添加到队列中
                 _queue_work(cpu,wq,work)

            ----→wq->flags?&?WQ_UNBOUND         //  根据工作队列类型来获取pwq
                 unbound_pwq_by_node(wq,cpu_to_node(cpu))
                 per_cpu_ptr(wq->cpu_pwqs,cpu)

            ----→get_work_pool(work)        //  返回work?对应的work_pool

            ----→last_pool?&&?last_pool?!=pwq->pool //  判断上次的pwq和这次pwq是否为同一个
            find_worker_executing_work(last_pool,work)//  寻找?worker?在执行的work

            ----→insert_work(pwq,work,worklist,work_flags)?
                        //  将work?插入到worklist?中
                ----→set_work_pwq(work,?pwq,extra_flags)
                        //  设置work的data字段
                ----→wake_up_worker(pool)
                        //  唤醒?worker?内核线程


????????A)schedule_work?完成的工作是将work添加到对应的链表中,而在添加的过程中,首先是需要确定?pool_workqueue;
????????B)?pool_workqueue?对应一个worker_pool,因此确定了?pool_workqueue?也就确定了worker_pool,进而可以将work添加到工作链表中;
????????C)pool_workqueue的确定分为三种情况:

????????????????1)bound类型的工作队列,直接根据CPU号获取;

????????????????2)unbound类型的工作队列,根据node号获取,针对unbound类型工作队列,pool_workqueue的释放是异步执行的,需要判断refcnt的计数值,因此在获取?pool_workqueue?时可能要多次retry;

????????????????3)根据缓存热度,优先选择正在被执行的worker_pool;


3.2?worker?thread

worker_thread(void*_worker)                //  worker?thread?function-kernel\Workqueue.c
    ---→set_pf_worker(true)               //  告诉调度器这是?workqueue?worker
    ---→woke_up:
            unlikely(worker->flags?&WORKER_DIE)         //  判断work?是否消亡
                --Y--→ida_simple_remove(&pool->worker_ida,worker->id);
                    worker_detach_from_pool(worker);    //  若消亡则进行清理
                --N--→worker_leave_idle(worker)        //  work?离开IDLE状态

    ---→recheck:
            list_first_entry(&pool->worklist,struct?work_struct,entry)
                process_one_work(worker,work)    //  遍历?worklist,执行work的回调函数
    ---→sleep:
            set_current_state(TASK_IDLE)             //  设置内核状态
            schedule()                               //  让出CPU
----→
----

在创建worker时,创建内核线程,执行函数为worker_thread:
????????woker_thread都PF?WQWORKER,调度器在进行调度处理时会对task进行判断,针对workerqueue worker有特殊处理;
????????worker?对应的内核线程,在没有处理work的时候是睡眠状态,当被唤醒的时候,跳转到woke_up开始执行;
????????woke_up?之后,如果此时?worker是需要销毁的,那就进行清理工作并返回。否则,离开IDLE状态,并进入recheck模块执行;
????????recheck部分,首先判断是否需要更多的worker来处理,如果没有任务处理,跳转到sleep?地方进行睡眠。有任务需要处理时,会判断是否有空闲内核线程以及是否需要动态创建,再清除掉worker的标志位,然后遍历工作链表,对链表中的每个节点调用?process_one_worker来处理;
????????sleep?部分比较好理解,没有任务处理时,worker进入空闲状态,并将当前的内核线程设置成睡眠状态,让出CPU;


work的执行函数为?process_one_worker:

process_one_work(worker,work)
    ---→find_worker_executing_work(pool,work)       //  找到正在执行work的worker
    ---→move_linked_works(work,&collision->scheduled,NULL)
                //  将正在执行的work移动到scheduled?链表中
    ---→hash_add(pool->busy_hash,&worker->hentry,(unsigned?long)work);
            worker->current_work?=work;            //  设置?worker?字段
    ---→if(need_more_worker(pool))?wake_up_worker(pool);
                                                   //  若需要更多worker,则唤醒idle?work
    ---→set_work_pool_and_clear_pending(work,pool->id)     //  清除?pending?状态
    ---→worker->current_func(work);            //  执行work的回调函数
    ---→hash_del(&worker->hentry);            //  执行完后的清理工作
        worker->current_work?=NULL;

3.3?worker?动态管理

3.3.1?worker?状态机变化

????????1)worker_pool?通过?nr_running?字段来在不同的状态机之间进行切换;
????????2)worker_pool?中有work需要处理时,需要至少保证有一个运行状态的worker,当nr_running大于1时,将多余的worker进入IDLE状态,没有work需要处理时,所有的worker都会进入IDLE状态;
????????3)执行work时,如果回调函数阻塞运行,那么会让?worker?进入睡眠状态,此时调度器会进行判断是否需要唤醒另一个worker;
????????4)IDLE状态的worker?都存放在idle_list链表中,如果空闲时间超过了300秒,则会将其进行销毁;


1、Worker?Running---→Suspend
_schedule()
????????//? ? 当worker进入睡眠状态时,如果该worker_pool?没有其他的worker处于运行状态,那么是需要唤醒一个空闲的worker来维持并发处理的能力;


2、Worker?Suspend---→Running

wake_up_worker                               //  唤醒idle?worker-\kernelWorkqueue.c
    ---→wake_up_process(worker->task)
        ---→try_to_wake_up(p,TASK_NORMAL,0)  //  唤醒一个线程-Kernel\sched\Core.c
            ---→ttwu_queue(p,cpu,?wake_flags)
                ---→ttwu_do_activate(rq,?p,?wake_flags,&rf)

????????睡眠状态可以通过?wake_up_worker?来进行唤醒处理,最终判断如果该worker?不在运行状态,则增加worker_pool的nr_running?值


3.3.2?worker的动态添加和删除


1、动态删除

init_worker_pool            //  初始化一个worker_pool     ————kernel\Workqueue.c
    ---→timer_setup(&pool->idle_timer,idle_worker_timeout,TIMER_DEFERRABLE)
        ---→while?(too_many_workers(pool))
                list_entry(pool->idle_list.prev,struct?worker,?entry)
                destroy_worker(worker)
                 //  遍历?idle?空闲链表,若worker?空闲时间超过IDLE_WORKER_TIMEOUT?则进行销毁

????????worker_pool?初始化时,注册了?timer回调函数,用于定时对空闲链表上的worker进行处理,如果worker太多,且空闲时间太长,超过了5分钟,那么就直接进行销毁处理了。


2、动态添加

Worker_thread
    ---→manage_workers(worker)                //  管理?worker?pool
    ---→maybe_create_worker(pool)            //  创建更多的worker

????????内核线程执行worker_thread函数时,如果没有空闲的worker,会调用manage_workers接口来创建更多的worker来处理工作


https://www.cnblogs.com/wahaha02/p/6341095.html


APIC介绍:
????????APIC相较于PIC来说,最大的优点是能适用于MP平台,当然,管脚多是它另一个优点。APIC由两部分组成,一个称为LAPIC(Local?APIC,本地高级中断控制器),一个称IOAPIC?(I/O?APCI,I/O高级中断控制器)。前者位于CPU中,在MP平台,每个CPU都有一个自己的LAPIC。后者通常位于南桥上,像PIC一样,连接各个产生中断的设备。在一个典型的具有多个处理器的PC平台,通常有一个IOAPIC和多个LAPIC,它们相互配合,形成一个中断的分发网络。


????????图中的中断控制器通信总线,是IOAPIC和LAPIC通信的桥梁,在Intel的P6架构和Pentium系列CPU中,它是一条单独的APIC总线。时代在进步,Pentium4和Xeon系列CPU出现后APIC?Bus已经不存在,系统的前端总线代替了它。


????????话分两头,让我们先来看看IOAPIC。和PIC对比,IOAPIC最大的作用在于中断分发。根据其内部的PRT(Programmable?Redirection?Table)表,IOAPIC可以格式化出一条中断消息,发送给某个CPU的LAPIC,由LAPIC通知CPU进行处理。目前典型的IOAPIC具有24个中断管脚,每个管脚对应一个RTE(Redirection?Table?Entry,PRT表项)。与PIC不同的是,IOAPIC的管脚没有优先级,也就是说,连接在管脚上的设备是平等的。


PCIE之?MSI/MSI-X中断:
????????https://blog.csdn.net/yhb1047818384/article/details/106676560
????????https://www.cnblogs.com/dream397/p/13614666.html

  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2022-01-03 16:30:34  更:2022-01-03 16:30:43 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/10 12:05:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码