1、数据结构
????????1)Workqueue?工作队列是利用内核线程来异步执行工作任务的通用机制; ????????2)Workqueue?工作队列可以用作中断处理的Bottom-half机制,利用进程上下文来执行中断处理中耗时的任务,因此它允许睡眠,而Softirg和Tasklet在处理任务时不能睡眠; ????????3)在中断处理过程中,或者其他子系统中,调用workqueue的调度或入队接口后,通过建立好的链接关系图逐级找到合适的worker,最终完成工作任务的执行 ????????4)关键的数据结构: ????????????????work_struct:工作队列调度的最小单位,work?item; ????????????????workqueue_struct:工作队列,work?item都挂入到工作队列中; ????????????????worker:work?item的处理者,每个worker对应一个内核线程; ????????????????worker_pool:worker池(内核线程池),是一个共享资源池,提供不同的worker来对work?item进行处理; ????????????????pool_workqueue:充当桥梁纽带的作用,用于连接workqueue和worker_pool,建立链接关系
2、workqueue子系统初始化
workqueue?子系统的初始化分成两步来完成的:workqueue_init_early和workqueue_init
2.1.1?workqueue_init_early()
workqueue_init_early()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? //? ? ?kernel\Workqueue.c
?????????----→KMEM_CACHE(pool_workqueue,SLAB_PANIC)?//? 为pool_workqueue?分配?slab?缓存
????????----→init_worker_pool(pool)? ? ? ? ? ? ? ? ?//? ? ?为每个cpu?创建?worker_pool
????????----→worker_pool_assign_id(pool)? ? ? ? ? ? //? ? ? 为worker_pool?分配id
????????----→alloc_workqueue_attrs()? ? ? ? ? ? ? ? //? ? ?为?unbound?工作队列创建默认属性
????????----→system_wq?=alloc_workqueue();?????????//? ? ?kernel?创建默认的消息队列? ? ? ? ?????????????system_highpri_wq=alloc_workqueue();
?????????????system_long_wq=alloc_workqueue();
?????????????system_unbound_wq=alloc_workqueue(();
?????????????system_freezable_wq?=alloc_workqueue();
alloc_workqueue()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??//? ? include\linux\Workqueue.h
alloc_workqueue()? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?//? ? include\linux\Workqueue.h
????????----→kzalloc(sizeof(*wq)+tbl_size,GFP_KERNEL)? //? ? 分配内存
????????----→wq->flags=flags;? ? ? ? ? ? ? ? ? ? ? ? ? //? ? 初始化wq
????????????????wq->saved_max_active?=?max_active;?
????????----→alloc_and_link_pwqs(wq)? ? ? ? ? ? ? ?//? ? 分配?pool_workqueue?并建立连接
????????----→!(wq->flags?&?WQ_UNBOUND)
? ? ? ? ????????---Y--→alloc_percpu(struct?pool_workqueue)? ?//? ? 为每个CPU分配?pool_workqueue
????????????????------→init_pwq(pwq,wq,&cpu_pools[highpri])? ?//? ? 初始化pool_workqueue
????????????????------→link_pwq(pwq) //? ? 链接?pool_workqueue?和wq
--N--→apply_workqueue_attrs(wq,ordered_wq_attrs(highpri])
//? ? 将新的?workqueue_attrs?应用于未绑定的工作队列
??????????????? -------→apply_workqueue_attrs_locked(wq,attrs)
??????????????? -------→apply_wqattrs_prepare(wq,attrs)
//? ? 分配attrs和pwq(pool_workqueue)
??????????????? -------→alloc_unbound_pwq(wq,tmp_attrs)
//? ? 获取匹配@attr?的池并创建一个将池和@wq?相关联的pwq
-------→get_unbound_pool(attrs)
-------→init_pwq(pwq,wq,pool)
??????????????? -------→apply_wqattrs_commit(ctx)
//? ? 设置wq熟悉,并将wq和pwq建立连接
??????????????? -------→numa_pwq_tbl_install(ctx->wq,node,ctx->pwq_tbl[node])
-------→link_pwq(pwq)
????????----→init_rescuer(wq)
//? ? wq?内存紧张时的处理逻辑
????????----→list_add_tail_rcu(&wq->list,&workqueues);
//? ? 将创建的wq添加到全局链表中
alloc_workqueue?完成的主要工作包括: ????????1)首先当然是要分配一个?struct?workqueue_struct的数据结构,并且对该结构中的字段进行初始化操作; ????????2)workqueue?最终需要和worker_pool?关联起来,而这个纽带就是?pool_workqueue,alloc_and_link_pwqs函数就是完成这个功能:
????????????????A)如果工作队列是绑定到CPU上的,则为每个CPU都分配pool_workqueue?并且初始化,通过link_pwq将工作队列与pool_workqueue?建立连接;
????????????????B)如果工作队列不绑定到CPU上,则按内存节点(NUMA)来分配pool_workqueue,用?get_unbound_pool来实现,它会根据wq属性先去查找,如果没有找到相同的就创建一个新的pool_workqueue,并且添加到?unbound_pool_hash?哈希表中,最后也会调用link_pwq来建立连接; ????????????????C)创建工作队列时,如果设置了WQ_MEM_RECLAIM标志,则会新建?rescuer?worker,对应?rescuer_thread内核线程。当内存紧张时,新创建worker可能会失败,这时候由rescuer来处理这种情况; ????????????????D)最终将新建好的工作队列添加到全局链表workqueues中;
2.1.2?workqueue_init()
workqueue_init() // 让wq子系统完全在线-kernel\Workqueue.c
---→wq_numa_init() // 对于numa系统,初始化?cpumask和wq属性
---→list_for_each_entry(wq,&workqueues,list)
wq_update_unbound_numa(wq,smp_processor_id(),true) // 更新?numa?affinity
---→for_each_online_cpu(cpu)
for_each_cpu_worker_pool(pool,cpu)
create_worker(pool) // 为worker_pool?创建初始?worker
---→hash_for_each(unbound_pool_hash,bkt,pool,hash_node)
create_worker(pool) // create?a?new?workqueue?worker
----→alloc_worker(pool->node) // 分配?worker?结构体
----→kthread_create_on_node(worker_thread,...)// 为worker?创建内核线程
----→worker_attach_to_pool(worker,pool)
// 将?worker?添加到?worker_pool?中
----→worker_enter_idle(worker)
// worker?进入IDLE?状态
---→wq_watchdog_init()
// 初始化工作队列监视程序
????????create?worker函数中,创建的内核线程名字为?kworker/xxcYY?或者kworker/uxx:YY,其中xx表示worker_pool的编号,YY表示worker的编号,u表示?unbound;workqueue子系统初始化完成后,基本就已经将数据结构的关联建立好了,当有work来进行调度的时候,就可以进行处理了
3、work?调度
3.1?schedule_work
schedule_work // 将work?task提交到全局?workqueue
----→queue_work(system_wq,work)
----→queue_work_on
// queue?work?on?specific?cpu-kernel\Workqueue.c
----→!test_and_set_bit(WORK_STRUCT_PENDING_BIT,work_data_bits(work))
// 判断work是否添加到队列中
_queue_work(cpu,wq,work)
----→wq->flags?&?WQ_UNBOUND // 根据工作队列类型来获取pwq
unbound_pwq_by_node(wq,cpu_to_node(cpu))
per_cpu_ptr(wq->cpu_pwqs,cpu)
----→get_work_pool(work) // 返回work?对应的work_pool
----→last_pool?&&?last_pool?!=pwq->pool // 判断上次的pwq和这次pwq是否为同一个
find_worker_executing_work(last_pool,work)// 寻找?worker?在执行的work
----→insert_work(pwq,work,worklist,work_flags)?
// 将work?插入到worklist?中
----→set_work_pwq(work,?pwq,extra_flags)
// 设置work的data字段
----→wake_up_worker(pool)
// 唤醒?worker?内核线程
????????A)schedule_work?完成的工作是将work添加到对应的链表中,而在添加的过程中,首先是需要确定?pool_workqueue; ????????B)?pool_workqueue?对应一个worker_pool,因此确定了?pool_workqueue?也就确定了worker_pool,进而可以将work添加到工作链表中; ????????C)pool_workqueue的确定分为三种情况:
????????????????1)bound类型的工作队列,直接根据CPU号获取;
????????????????2)unbound类型的工作队列,根据node号获取,针对unbound类型工作队列,pool_workqueue的释放是异步执行的,需要判断refcnt的计数值,因此在获取?pool_workqueue?时可能要多次retry;
????????????????3)根据缓存热度,优先选择正在被执行的worker_pool;
3.2?worker?thread
worker_thread(void*_worker) // worker?thread?function-kernel\Workqueue.c
---→set_pf_worker(true) // 告诉调度器这是?workqueue?worker
---→woke_up:
unlikely(worker->flags?&WORKER_DIE) // 判断work?是否消亡
--Y--→ida_simple_remove(&pool->worker_ida,worker->id);
worker_detach_from_pool(worker); // 若消亡则进行清理
--N--→worker_leave_idle(worker) // work?离开IDLE状态
---→recheck:
list_first_entry(&pool->worklist,struct?work_struct,entry)
process_one_work(worker,work) // 遍历?worklist,执行work的回调函数
---→sleep:
set_current_state(TASK_IDLE) // 设置内核状态
schedule() // 让出CPU
----→
----
在创建worker时,创建内核线程,执行函数为worker_thread: ????????woker_thread都PF?WQWORKER,调度器在进行调度处理时会对task进行判断,针对workerqueue worker有特殊处理; ????????worker?对应的内核线程,在没有处理work的时候是睡眠状态,当被唤醒的时候,跳转到woke_up开始执行; ????????woke_up?之后,如果此时?worker是需要销毁的,那就进行清理工作并返回。否则,离开IDLE状态,并进入recheck模块执行; ????????recheck部分,首先判断是否需要更多的worker来处理,如果没有任务处理,跳转到sleep?地方进行睡眠。有任务需要处理时,会判断是否有空闲内核线程以及是否需要动态创建,再清除掉worker的标志位,然后遍历工作链表,对链表中的每个节点调用?process_one_worker来处理; ????????sleep?部分比较好理解,没有任务处理时,worker进入空闲状态,并将当前的内核线程设置成睡眠状态,让出CPU;
work的执行函数为?process_one_worker:
process_one_work(worker,work)
---→find_worker_executing_work(pool,work) // 找到正在执行work的worker
---→move_linked_works(work,&collision->scheduled,NULL)
// 将正在执行的work移动到scheduled?链表中
---→hash_add(pool->busy_hash,&worker->hentry,(unsigned?long)work);
worker->current_work?=work; // 设置?worker?字段
---→if(need_more_worker(pool))?wake_up_worker(pool);
// 若需要更多worker,则唤醒idle?work
---→set_work_pool_and_clear_pending(work,pool->id) // 清除?pending?状态
---→worker->current_func(work); // 执行work的回调函数
---→hash_del(&worker->hentry); // 执行完后的清理工作
worker->current_work?=NULL;
3.3?worker?动态管理
3.3.1?worker?状态机变化
????????1)worker_pool?通过?nr_running?字段来在不同的状态机之间进行切换; ????????2)worker_pool?中有work需要处理时,需要至少保证有一个运行状态的worker,当nr_running大于1时,将多余的worker进入IDLE状态,没有work需要处理时,所有的worker都会进入IDLE状态; ????????3)执行work时,如果回调函数阻塞运行,那么会让?worker?进入睡眠状态,此时调度器会进行判断是否需要唤醒另一个worker; ????????4)IDLE状态的worker?都存放在idle_list链表中,如果空闲时间超过了300秒,则会将其进行销毁;
1、Worker?Running---→Suspend _schedule() ????????//? ? 当worker进入睡眠状态时,如果该worker_pool?没有其他的worker处于运行状态,那么是需要唤醒一个空闲的worker来维持并发处理的能力;
2、Worker?Suspend---→Running
wake_up_worker // 唤醒idle?worker-\kernelWorkqueue.c
---→wake_up_process(worker->task)
---→try_to_wake_up(p,TASK_NORMAL,0) // 唤醒一个线程-Kernel\sched\Core.c
---→ttwu_queue(p,cpu,?wake_flags)
---→ttwu_do_activate(rq,?p,?wake_flags,&rf)
????????睡眠状态可以通过?wake_up_worker?来进行唤醒处理,最终判断如果该worker?不在运行状态,则增加worker_pool的nr_running?值
3.3.2?worker的动态添加和删除
1、动态删除
init_worker_pool // 初始化一个worker_pool ————kernel\Workqueue.c
---→timer_setup(&pool->idle_timer,idle_worker_timeout,TIMER_DEFERRABLE)
---→while?(too_many_workers(pool))
list_entry(pool->idle_list.prev,struct?worker,?entry)
destroy_worker(worker)
// 遍历?idle?空闲链表,若worker?空闲时间超过IDLE_WORKER_TIMEOUT?则进行销毁
????????worker_pool?初始化时,注册了?timer回调函数,用于定时对空闲链表上的worker进行处理,如果worker太多,且空闲时间太长,超过了5分钟,那么就直接进行销毁处理了。
2、动态添加
Worker_thread
---→manage_workers(worker) // 管理?worker?pool
---→maybe_create_worker(pool) // 创建更多的worker
????????内核线程执行worker_thread函数时,如果没有空闲的worker,会调用manage_workers接口来创建更多的worker来处理工作
https://www.cnblogs.com/wahaha02/p/6341095.html
APIC介绍: ????????APIC相较于PIC来说,最大的优点是能适用于MP平台,当然,管脚多是它另一个优点。APIC由两部分组成,一个称为LAPIC(Local?APIC,本地高级中断控制器),一个称IOAPIC?(I/O?APCI,I/O高级中断控制器)。前者位于CPU中,在MP平台,每个CPU都有一个自己的LAPIC。后者通常位于南桥上,像PIC一样,连接各个产生中断的设备。在一个典型的具有多个处理器的PC平台,通常有一个IOAPIC和多个LAPIC,它们相互配合,形成一个中断的分发网络。
????????图中的中断控制器通信总线,是IOAPIC和LAPIC通信的桥梁,在Intel的P6架构和Pentium系列CPU中,它是一条单独的APIC总线。时代在进步,Pentium4和Xeon系列CPU出现后APIC?Bus已经不存在,系统的前端总线代替了它。
????????话分两头,让我们先来看看IOAPIC。和PIC对比,IOAPIC最大的作用在于中断分发。根据其内部的PRT(Programmable?Redirection?Table)表,IOAPIC可以格式化出一条中断消息,发送给某个CPU的LAPIC,由LAPIC通知CPU进行处理。目前典型的IOAPIC具有24个中断管脚,每个管脚对应一个RTE(Redirection?Table?Entry,PRT表项)。与PIC不同的是,IOAPIC的管脚没有优先级,也就是说,连接在管脚上的设备是平等的。
PCIE之?MSI/MSI-X中断: ????????https://blog.csdn.net/yhb1047818384/article/details/106676560 ????????https://www.cnblogs.com/dream397/p/13614666.html
|