DPDK中断管理
DPDK中的网卡都是PCI设备,多数中断都属于
PCI_MSIX类型的中断。
kernel uio驱动uio_interrupt -> igb_uio注册的回调函数igbuio_pci_irqhandler -> kernel uio驱动 uio_event_notify -> DPDK中断线程 -> 网卡驱动注册的回调(rte_intr_callback_register) -> 用户注册的回调(rte_eth_dev_callback_register)。 主要连接点:打开的/dev/uiox文件fd描述符。
igb_uio的创建
下面是igb_uio创建过程部分代码。
static int
igbuio_pci_probe(struct pci_dev *dev, const struct pci_device_id *id)
{
udev->info.name = "igb_uio";
udev->info.version = "0.1";
udev->info.handler = igbuio_pci_irqhandler;
udev->info.irqcontrol = igbuio_pci_irqcontrol;
switch (igbuio_intr_mode_preferred) {
case RTE_INTR_MODE_MSIX:
msix_entry.entry = 0;
if (pci_enable_msix(dev, &msix_entry, 1) == 0) {
dev_dbg(&dev->dev, "using MSI-X");
udev->info.irq = msix_entry.vector;
udev->mode = RTE_INTR_MODE_MSIX;
break;
}
case RTE_INTR_MODE_LEGACY:
if (pci_intx_mask_supported(dev)) {
dev_dbg(&dev->dev, "using INTX");
udev->info.irq_flags = IRQF_SHARED;
udev->info.irq = dev->irq;
udev->mode = RTE_INTR_MODE_LEGACY;
break;
}
default:
dev_err(&dev->dev, "invalid IRQ mode %u",
igbuio_intr_mode_preferred);
err = -EINVAL;
goto fail_release_iomem;
}
err = uio_register_device(&dev->dev, &udev->info);
if (err != 0)
goto fail_remove_group;
dev_info(&dev->dev, "uio device registered with irq %lx\n",
udev->info.irq);
return 0;
这里创建了一个uio设备,uio设备的中断回调函数是igbuio_pci_irqhandler,当pci设备发生了中断,就会调用此函数。
static irqreturn_t
igbuio_pci_irqhandler(int irq, struct uio_info *info)
{
struct rte_uio_pci_dev *udev = igbuio_get_uio_pci_dev(info);
if (udev->mode == RTE_INTR_MODE_LEGACY &&
!pci_check_and_mask_intx(udev->pdev))
return IRQ_NONE;
return IRQ_HANDLED;
}
linux中uio设备的创建
先看一下uio设备注册函数,文件driver/uio/uio.c
#define uio_register_device(parent, info) \
__uio_register_device(THIS_MODULE, parent, info)
int __uio_register_device(struct module *owner,
struct device *parent,
struct uio_info *info)
{
struct uio_device *idev;
int ret = 0;
if (!parent || !info || !info->name || !info->version)
return -EINVAL;
info->uio_dev = NULL;
idev = devm_kzalloc(parent, sizeof(*idev), GFP_KERNEL);
if (!idev) {
return -ENOMEM;
}
idev->owner = owner;
idev->info = info;
init_waitqueue_head(&idev->wait);
atomic_set(&idev->event, 0);
ret = uio_get_minor(idev);
if (ret)
return ret;
idev->dev = device_create(&uio_class, parent,
MKDEV(uio_major, idev->minor), idev,
"uio%d", idev->minor);
if (IS_ERR(idev->dev)) {
printk(KERN_ERR "UIO: device register failed\n");
ret = PTR_ERR(idev->dev);
goto err_device_create;
}
ret = uio_dev_add_attributes(idev);
if (ret)
goto err_uio_dev_add_attributes;
info->uio_dev = idev;
if (info->irq && (info->irq != UIO_IRQ_CUSTOM)) {
ret = request_irq(info->irq, uio_interrupt,
info->irq_flags, info->name, idev);
if (ret)
goto err_request_irq;
}
return 0;
err_request_irq:
uio_dev_del_attributes(idev);
err_uio_dev_add_attributes:
device_destroy(&uio_class, MKDEV(uio_major, idev->minor));
err_device_create:
uio_free_minor(idev);
return ret;
}
发现没,在内核里面才真正的调用request_irq函数注册中断! 回调函数为uio_interrupt,所有的uio设备,中断函数是同一个!!
static irqreturn_t uio_interrupt(int irq, void *dev_id)
{
struct uio_device *idev = (struct uio_device *)dev_id;
irqreturn_t ret = idev->info->handler(irq, idev->info);
if (ret == IRQ_HANDLED)
uio_event_notify(idev->info);
return ret;
}
在uio设备的中断处理函数中,才去调用具体的uio设备的中断handler, 比如我们的igb_uio注册的函数igbuio_pci_irqhandler, 然后发出一个事件通知uio_event_notify。
内核事件通知
事件通知属于内核部分,函数源码如下:
void uio_event_notify(struct uio_info *info)
{
struct uio_device *idev = info->uio_dev;
atomic_inc(&idev->event);
wake_up_interruptible(&idev->wait);
kill_fasync(&idev->async_queue, SIGIO, POLL_IN);
}
#define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;
list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;
if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}
感觉像是在遍历注册的任务。而这个任务就是DPDK创建的。
DPDK的中断任务线程创建 epoll
epoll创建初始化如下
int
rte_eal_intr_init(void)
{
int ret = 0;
TAILQ_INIT(&intr_sources);
if (pipe(intr_pipe.pipefd) < 0)
return -1;
ret = pthread_create(&intr_thread, NULL,
eal_intr_thread_main, NULL);
if (ret != 0)
RTE_LOG(ERR, EAL,
"Failed to create thread for interrupt handling\n");
return -ret;
}
任务入口函数如下:
static __attribute__((noreturn)) void *
eal_intr_thread_main(__rte_unused void *arg)
{
struct epoll_event ev;
for (;;) {
static struct epoll_event pipe_event = {
.events = EPOLLIN | EPOLLPRI,
};
struct rte_intr_source *src;
unsigned numfds = 0;
int pfd = epoll_create(1);
if (pfd < 0)
rte_panic("Cannot create epoll instance\n");
pipe_event.data.fd = intr_pipe.readfd;
if (epoll_ctl(pfd, EPOLL_CTL_ADD, intr_pipe.readfd,
&pipe_event) < 0) {
rte_panic("Error adding fd to %d epoll_ctl, %s\n",
intr_pipe.readfd, strerror(errno));
}
numfds++;
rte_spinlock_lock(&intr_lock);
TAILQ_FOREACH(src, &intr_sources, next) {
if (src->callbacks.tqh_first == NULL)
continue;
ev.events = EPOLLIN | EPOLLPRI;
ev.data.fd = src->intr_handle.fd;
if (epoll_ctl(pfd, EPOLL_CTL_ADD,
src->intr_handle.fd, &ev) < 0){
rte_panic("Error adding fd %d epoll_ctl, %s\n",
src->intr_handle.fd, strerror(errno));
}
else
numfds++;
}
rte_spinlock_unlock(&intr_lock);
eal_intr_handle_interrupts(pfd, numfds);
close(pfd);
}
}
static void
eal_intr_handle_interrupts(int pfd, unsigned totalfds)
{
struct epoll_event events[totalfds];
int nfds = 0;
for(;;) {
nfds = epoll_wait(pfd, events, totalfds,
EAL_INTR_EPOLL_WAIT_FOREVER);
if (nfds < 0) {
if (errno == EINTR)
continue;
RTE_LOG(ERR, EAL,
"epoll_wait returns with fail\n");
return;
}
else if (nfds == 0)
continue;
if (eal_intr_process_interrupts(events, nfds) < 0)
return;
}
}
static int
eal_intr_process_interrupts(struct epoll_event *events, int nfds)
{
int n, bytes_read;
struct rte_intr_source *src;
struct rte_intr_callback *cb;
union rte_intr_read_buffer buf;
struct rte_intr_callback active_cb;
for (n = 0; n < nfds; n++) {
if (events[n].data.fd == intr_pipe.readfd){
int r = read(intr_pipe.readfd, buf.charbuf,
sizeof(buf.charbuf));
RTE_SET_USED(r);
return -1;
}
rte_spinlock_lock(&intr_lock);
TAILQ_FOREACH(src, &intr_sources, next)
if (src->intr_handle.fd ==
events[n].data.fd)
break;
if (src == NULL){
rte_spinlock_unlock(&intr_lock);
continue;
}
src->active = 1;
rte_spinlock_unlock(&intr_lock);
switch (src->intr_handle.type) {
case RTE_INTR_HANDLE_UIO:
bytes_read = sizeof(buf.uio_intr_count);
break;
case RTE_INTR_HANDLE_ALARM:
bytes_read = sizeof(buf.timerfd_num);
break;
#ifdef VFIO_PRESENT
case RTE_INTR_HANDLE_VFIO_MSIX:
case RTE_INTR_HANDLE_VFIO_MSI:
case RTE_INTR_HANDLE_VFIO_LEGACY:
bytes_read = sizeof(buf.vfio_intr_count);
break;
#endif
default:
bytes_read = 1;
break;
}
bytes_read = read(events[n].data.fd, &buf, bytes_read);
if (bytes_read > 0) {
TAILQ_FOREACH(cb, &src->callbacks, next) {
active_cb.cb_fn(&src->intr_handle,
active_cb.cb_arg);
}
return 0;
}
网卡注册的中断回调
此时的网卡驱动属于用户态驱动,初始化函数如下
static int
eth_igb_dev_init(__attribute__((unused)) struct eth_driver *eth_drv, struct rte_eth_dev *eth_dev)
{
int error = 0;
pci_dev = eth_dev->pci_dev;
eth_dev->dev_ops = ð_igb_ops;
eth_dev->rx_pkt_burst = ð_igb_recv_pkts;
eth_dev->tx_pkt_burst = ð_igb_xmit_pkts;
if (rte_eal_process_type() != RTE_PROC_PRIMARY){
if (eth_dev->data->scattered_rx)
eth_dev->rx_pkt_burst = ð_igb_recv_scattered_pkts;
return 0;
}
hw->hw_addr= (void *)pci_dev->mem_resource[0].addr;
rte_intr_callback_register(&(pci_dev->intr_handle),
eth_igb_interrupt_handler, (void *)eth_dev);
rte_intr_enable(&(pci_dev->intr_handle));
igb_intr_enable(eth_dev);
return 0;
}
int
rte_intr_callback_register(struct rte_intr_handle *intr_handle,
rte_intr_callback_fn cb, void *cb_arg)
{
int ret, wake_thread;
struct rte_intr_source *src;
struct rte_intr_callback *callback;
wake_thread = 0;
callback = rte_zmalloc("interrupt callback list",
sizeof(*callback), 0);
if (callback == NULL) {
RTE_LOG(ERR, EAL, "Can not allocate memory\n");
return -ENOMEM;
}
callback->cb_fn = cb;
callback->cb_arg = cb_arg;
rte_spinlock_lock(&intr_lock);
TAILQ_FOREACH(src, &intr_sources, next) {
if (src->intr_handle.fd == intr_handle->fd) {
if TAILQ_EMPTY(&src->callbacks)
wake_thread = 1;
TAILQ_INSERT_TAIL(&(src->callbacks), callback, next);
ret = 0;
break;
}
}
if (src == NULL) {
if ((src = rte_zmalloc("interrupt source list",
sizeof(*src), 0)) == NULL) {
RTE_LOG(ERR, EAL, "Can not allocate memory\n");
rte_free(callback);
ret = -ENOMEM;
} else {
src->intr_handle = *intr_handle;
TAILQ_INIT(&src->callbacks);
TAILQ_INSERT_TAIL(&(src->callbacks), callback, next);
TAILQ_INSERT_TAIL(&intr_sources, src, next);
wake_thread = 1;
ret = 0;
}
}
rte_spinlock_unlock(&intr_lock);
if (wake_thread)
if (write(intr_pipe.writefd, "1", 1) < 0)
return -EPIPE;
return (ret);
}
static void
eth_igb_interrupt_handler(__rte_unused struct rte_intr_handle *handle,
void *param)
{
struct rte_eth_dev *dev = (struct rte_eth_dev *)param;
eth_igb_interrupt_get_status(dev);
eth_igb_interrupt_action(dev);
}
static int
eth_igb_interrupt_get_status(struct rte_eth_dev *dev)
{
uint32_t icr;
struct e1000_hw *hw =
E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private);
struct e1000_interrupt *intr =
E1000_DEV_PRIVATE_TO_INTR(dev->data->dev_private);
igb_intr_disable(hw);
icr = E1000_READ_REG(hw, E1000_ICR);
intr->flags = 0;
if (icr & E1000_ICR_LSC) {
intr->flags |= E1000_FLAG_NEED_LINK_UPDATE;
}
if (icr & E1000_ICR_VMMB)
intr->flags |= E1000_FLAG_MAILBOX;
return 0;
}
static int
eth_igb_interrupt_action(struct rte_eth_dev *dev)
{
struct e1000_hw *hw =
E1000_DEV_PRIVATE_TO_HW(dev->data->dev_private);
struct e1000_interrupt *intr =
E1000_DEV_PRIVATE_TO_INTR(dev->data->dev_private);
uint32_t tctl, rctl;
struct rte_eth_link link;
int ret;
if (intr->flags & E1000_FLAG_MAILBOX) {
igb_pf_mbx_process(dev);
intr->flags &= ~E1000_FLAG_MAILBOX;
}
igb_intr_enable(dev);
rte_intr_enable(&(dev->pci_dev->intr_handle));
if (intr->flags & E1000_FLAG_NEED_LINK_UPDATE) {
intr->flags &= ~E1000_FLAG_NEED_LINK_UPDATE;
hw->mac.get_link_status = 1;
ret = eth_igb_link_update(dev, 0);
if (ret < 0)
return 0;
memset(&link, 0, sizeof(link));
rte_igb_dev_atomic_read_link_status(dev, &link);
if (link.link_status) {
PMD_INIT_LOG(INFO,
" Port %d: Link Up - speed %u Mbps - %s",
dev->data->port_id,
(unsigned)link.link_speed,
link.link_duplex == ETH_LINK_FULL_DUPLEX ?
"full-duplex" : "half-duplex");
} else {
PMD_INIT_LOG(INFO, " Port %d: Link Down",
dev->data->port_id);
}
PMD_INIT_LOG(INFO, "PCI Address: %04d:%02d:%02d:%d",
dev->pci_dev->addr.domain,
dev->pci_dev->addr.bus,
dev->pci_dev->addr.devid,
dev->pci_dev->addr.function);
tctl = E1000_READ_REG(hw, E1000_TCTL);
rctl = E1000_READ_REG(hw, E1000_RCTL);
if (link.link_status) {
tctl |= E1000_TCTL_EN;
rctl |= E1000_RCTL_EN;
} else {
tctl &= ~E1000_TCTL_EN;
rctl &= ~E1000_RCTL_EN;
}
E1000_WRITE_REG(hw, E1000_TCTL, tctl);
E1000_WRITE_REG(hw, E1000_RCTL, rctl);
E1000_WRITE_FLUSH(hw);
_rte_eth_dev_callback_process(dev, RTE_ETH_EVENT_INTR_LSC);
}
return 0;
}
void
_rte_eth_dev_callback_process(struct rte_eth_dev *dev,
enum rte_eth_event_type event)
{
struct rte_eth_dev_callback *cb_lst;
struct rte_eth_dev_callback dev_cb;
rte_spinlock_lock(&rte_eth_dev_cb_lock);
TAILQ_FOREACH(cb_lst, &(dev->callbacks), next) {
if (cb_lst->cb_fn == NULL || cb_lst->event != event)
continue;
dev_cb = *cb_lst;
cb_lst->active = 1;
rte_spinlock_unlock(&rte_eth_dev_cb_lock);
dev_cb.cb_fn(dev->data->port_id, dev_cb.event,
dev_cb.cb_arg);
rte_spinlock_lock(&rte_eth_dev_cb_lock);
cb_lst->active = 0;
}
rte_spinlock_unlock(&rte_eth_dev_cb_lock);
}
其实网卡驱动的回调里面只是获取的一下Link状态,最后回调用户注册的函数。
int main(int argc, char **argv)
{
rte_eth_dev_callback_register(portid,
RTE_ETH_EVENT_INTR_LSC, lsi_event_callback, NULL);
}
static void
lsi_event_callback(uint8_t port_id, enum rte_eth_event_type type, void *param)
{
struct rte_eth_link link;
RTE_SET_USED(param);
printf("\n\nIn registered callback...\n");
printf("Event type: %s\n", type == RTE_ETH_EVENT_INTR_LSC ? "LSC interrupt" : "unknown event");
rte_eth_link_get_nowait(port_id, &link);
if (link.link_status) {
printf("Port %d Link Up - speed %u Mbps - %s\n\n",
port_id, (unsigned)link.link_speed,
(link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
("full-duplex") : ("half-duplex"));
} else
printf("Port %d Link Down\n\n", port_id);
}
void
rte_eth_link_get_nowait(uint8_t port_id, struct rte_eth_link *eth_link)
{
struct rte_eth_dev *dev;
if (port_id >= nb_ports) {
PMD_DEBUG_TRACE("Invalid port_id=%d\n", port_id);
return;
}
dev = &rte_eth_devices[port_id];
if (dev->data->dev_conf.intr_conf.lsc != 0)
rte_eth_dev_atomic_read_link_status(dev, eth_link);
else {
FUNC_PTR_OR_RET(*dev->dev_ops->link_update);
(*dev->dev_ops->link_update)(dev, 0);
*eth_link = dev->data->dev_link;
}
}
整理到这里,还是有2个疑问?
在dpdk创建的任务中,epoll 机制是怎么和 uio_event_notify 结合的,怎么触发epoll_wait(陷入内核)返回的(除了自带的超时返回)。 //这个问题的解答:问题解答
|