分类
linux file system

linux epoll 机制分析–使用流程

这篇本来应该在poll机制分析之后写的,但是由于中间跟朋友一起在学习trace代码,耽搁了。
闲话少说。我们先看看poll机制有哪些弊端。
1.每次调用poll都要将全部文件描述符传到内核中,这是性能弊端。
2.在内核中poll是一个轮询方式。对传入的所有文件描述符进行轮询,查看是否有事件发生。如果都没有事件发生,睡眠一会再来次。这存在两个问题。第一cpu并没有空闲,一直在忙查。第二个,事件处理不及时,必须等全部轮询一遍以后才会返回。
基于以上弊端,epoll做了以下改进。
1.句柄集合下发与等待事件分开。先下发,然后在等待。等待可以多次。
2.利用系统的等待队列唤醒机制。当有事件发生时。执行特定函数将相关文件句柄放入就绪列表中。

总结:在我看来 poll和epoll就像没有中断和有中断的外设与系统通讯一样。之前需要cpu忙查现在只需要等待通知即可。
直接代码分析。首先上关键数据结构 eventpoll,这个结构贯穿始终。主要是用来保存,等待的fd集合以及就绪fd集合。

struct eventpoll {
        /* Protect the access to this structure */
        spinlock_t lock;

        /*
         * This mutex is used to ensure that files are not removed
         * while epoll is using them. This is held during the event
         * collection loop, the file cleanup path, the epoll file exit
         * code and the ctl operations.
         */
        struct mutex mtx;

        /* Wait queue used by sys_epoll_wait() */
        wait_queue_head_t wq;

        /* Wait queue used by file->poll() */
        wait_queue_head_t poll_wait;

        /* List of ready file descriptors */
        struct list_head rdllist;           //已有事件发生的fd列表

        /* RB tree root used to store monitored fd structs */
        struct rb_root_cached rbr;              //所有需要等待的fd红黑树。

        /*                       
         * This is a single linked list that chains all the "struct epitem" that
         * happened while transferring ready events to userspace w/out
         * holding ->lock.
         */
        struct epitem *ovflist;

        /* wakeup_source used when ep_scan_ready_list is running */
        struct wakeup_source *ws;

        /* The user that created the eventpoll descriptor */
        struct user_struct *user;

        struct file *file;

        /* used to optimize loop detection check */
        int visited;
        struct list_head visited_list_link;

#ifdef CONFIG_NET_RX_BUSY_POLL
        /* used to track busy poll napi_id */
        unsigned int napi_id;
#endif
};

使用epoll 第一步调用的系统调用时 epoll_create,代码如下

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
        int error, fd;
        struct eventpoll *ep = NULL;
        struct file *file;

        /* Check the EPOLL_* constant for consistency.  */
        BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

        if (flags & ~EPOLL_CLOEXEC)
                return -EINVAL;
        /*
         * Create the internal data structure ("struct eventpoll").
         */
        error = ep_alloc(&ep);  //分配eventpoll数据结构。
        if (error < 0)
                return error;
        /*
         * Creates all the items needed to setup an eventpoll file. That is,
         * a file structure and a free file descriptor.
         */
        fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
        if (fd < 0) {
                error = fd;
                goto out_free_ep;
        }
        file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                                 O_RDWR | (flags & O_CLOEXEC)); //创建eventpoll匿名文件,私有数据时eventpoll结构
        if (IS_ERR(file)) {
                error = PTR_ERR(file);
                goto out_free_fd;
        }
        ep->file = file;
        fd_install(fd, file);
        return fd;      //返回文件句柄。

out_free_fd:
        put_unused_fd(fd);
out_free_ep:
        ep_free(ep);
        return error;
}

注意 eventpoll_fops 数据。这个数据的值如下

static const struct file_operations eventpoll_fops = {
#ifdef CONFIG_PROC_FS
        .show_fdinfo    = ep_show_fdinfo,
#endif
        .release        = ep_eventpoll_release,
        .poll           = ep_eventpoll_poll,
        .llseek         = noop_llseek,
};

由于该文件不是用来读写的 所以没有读写函数,有一个release函数在关闭句柄时释放资源。有一个poll函数。由此可猜测epoll可以进行嵌套。查了一下ep_eventpoll_poll函数,的确是可以嵌套。而且嵌套时,只要下层epoll句柄中的文件句柄集合有就绪文件句柄。上层epoll会把下层epoll文件句柄放入就绪列表中。
epoll文件句柄创建好了。接下来会使用epoll_ctl系统调用向句柄中添加,等待事件的文件句柄。

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
                struct epoll_event __user *, event)
{
        int error;
        int full_check = 0;
        struct fd f, tf;
        struct eventpoll *ep;
        struct epitem *epi;
        struct epoll_event epds;
        struct eventpoll *tep = NULL;

        error = -EFAULT;
        if (ep_op_has_event(op) &&          //检查操作是否为删除
            copy_from_user(&epds, event, sizeof(struct epoll_event)))
                goto error_return;

        error = -EBADF;
        f = fdget(epfd);
        if (!f.file)            //获取epoll文件句柄
                goto error_return;

        /* Get the "struct file *" for the target file */
        tf = fdget(fd);
        if (!tf.file)           //获取目标文件句柄
                goto error_fput;

        /* The target file descriptor must support poll */
        error = -EPERM;
        if (!tf.file->f_op->poll)       //判断目标文件是否有poll函数
                goto error_tgt_fput;

        /* Check if EPOLLWAKEUP is allowed */
        if (ep_op_has_event(op))
                ep_take_care_of_epollwakeup(&epds);

        /*
         * We have to check that the file structure underneath the file descriptor
         * the user passed to us _is_ an eventpoll file. And also we do not permit
         * adding an epoll file descriptor inside itself.
         */
        error = -EINVAL;
        if (f.file == tf.file || !is_file_epoll(f.file))    //判断epoll文件句柄是否是epoll类型文件
                goto error_tgt_fput;

        /*
         * epoll adds to the wakeup queue at EPOLL_CTL_ADD time only,
         * so EPOLLEXCLUSIVE is not allowed for a EPOLL_CTL_MOD operation.
         * Also, we do not currently supported nested exclusive wakeups.
         */
        if (ep_op_has_event(op) && (epds.events & EPOLLEXCLUSIVE)) {
                if (op == EPOLL_CTL_MOD)
                        goto error_tgt_fput;
                if (op == EPOLL_CTL_ADD && (is_file_epoll(tf.file) ||
                                (epds.events & ~EPOLLEXCLUSIVE_OK_BITS)))
                        goto error_tgt_fput;
        }

        /*
         * At this point it is safe to assume that the "private_data" contains
         * our own data structure.
         */
        ep = f.file->private_data;
        /*
         * When we insert an epoll file descriptor, inside another epoll file
         * descriptor, there is the change of creating closed loops, which are
         * better be handled here, than in more critical paths. While we are
         * checking for loops we also determine the list of files reachable
         * and hang them on the tfile_check_list, so we can check that we
         * haven't created too many possible wakeup paths.
         *
         * We do not need to take the global 'epumutex' on EPOLL_CTL_ADD when
         * the epoll file descriptor is attaching directly to a wakeup source,
         * unless the epoll file descriptor is nested. The purpose of taking the
         * 'epmutex' on add is to prevent complex toplogies such as loops and
         * deep wakeup paths from forming in parallel through multiple
         * EPOLL_CTL_ADD operations.
         */
        mutex_lock_nested(&ep->mtx, 0);
        if (op == EPOLL_CTL_ADD) {
                if (!list_empty(&f.file->f_ep_links) ||
                                                is_file_epoll(tf.file)) {
                        full_check = 1;
                        mutex_unlock(&ep->mtx);
                        mutex_lock(&epmutex);
                        if (is_file_epoll(tf.file)) {
                                error = -ELOOP;
                                if (ep_loop_check(ep, tf.file) != 0) {
                                        clear_tfile_check_list();
                                        goto error_tgt_fput;
                                }
                        } else
                                list_add(&tf.file->f_tfile_llink,
                                                        &tfile_check_list);
                        mutex_lock_nested(&ep->mtx, 0);
                        if (is_file_epoll(tf.file)) {
                                tep = tf.file->private_data;
                                mutex_lock_nested(&tep->mtx, 1);
                        }
                }
        }               //如果是添加类型操作,判断是否有环。

        /*
         * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
         * above, we can be sure to be able to use the item looked up by
         * ep_find() till we release the mutex.
         */
        epi = ep_find(ep, tf.file, fd);

        error = -EINVAL;
        switch (op) {
        case EPOLL_CTL_ADD: //添加操作
                if (!epi) {
                        epds.events |= EPOLLERR | EPOLLHUP;
                        error = ep_insert(ep, &epds, tf.file, fd, full_check);
                } else
                        error = -EEXIST;
                if (full_check)
                        clear_tfile_check_list();
                break;
        case EPOLL_CTL_DEL: //删除操作
                if (epi)
                        error = ep_remove(ep, epi);
                else
                        error = -ENOENT;
                break;
        case EPOLL_CTL_MOD: //修改等待的类型。
                if (epi) {
                        if (!(epi->event.events & EPOLLEXCLUSIVE)) {
                                epds.events |= EPOLLERR | EPOLLHUP;
                                error = ep_modify(ep, epi, &epds);
                        }
                } else
                        error = -ENOENT;
                break;
        }
        if (tep != NULL)
                mutex_unlock(&tep->mtx);
        mutex_unlock(&ep->mtx);

error_tgt_fput:
        if (full_check)
                mutex_unlock(&epmutex);

        fdput(tf);
error_fput:
        fdput(f);
error_return:

        return error;

最后使用epoll_wait系统调用获取,有事件发生的句柄。代码如下

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
                int, maxevents, int, timeout)       //maxevents 最多等待事件数,timeout超时时间
{
        int error;
        struct fd f;
        struct eventpoll *ep;

        /* The maximum number of event must be greater than zero */
        if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
                return -EINVAL;

        /* Verify that the area passed by the user is writeable */
        if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
                return -EFAULT;

        /* Get the "struct file *" for the eventpoll file */
        f = fdget(epfd);
        if (!f.file)
                return -EBADF;

        /*
         * We have to check that the file structure underneath the fd
         * the user passed to us _is_ an eventpoll file.
         */
        error = -EINVAL;
        if (!is_file_epoll(f.file))
                goto error_fput;

        /*
         * At this point it is safe to assume that the "private_data" contains
         * our own data structure.
         */
        ep = f.file->private_data;

        /* Time to fish for events ... */
        error = ep_poll(ep, events, maxevents, timeout);    //前面都是做安全检查,真正做事的函数是ep_poll

error_fput:
        fdput(f);
        return error;
}

ep_poll代码

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                   int maxevents, long timeout)
{
        int res = 0, eavail, timed_out = 0;
        unsigned long flags;
        u64 slack = 0;
        wait_queue_entry_t wait;
        ktime_t expires, *to = NULL;

        if (timeout > 0) {
                struct timespec64 end_time = ep_set_mstimeout(timeout);

                slack = select_estimate_accuracy(&end_time);
                to = &expires;
                *to = timespec64_to_ktime(end_time);
        } else if (timeout == 0) {
                /*
                 * Avoid the unnecessary trip to the wait queue loop, if the
                 * caller specified a non blocking operation.
                 */
                timed_out = 1;
                spin_lock_irqsave(&ep->lock, flags);
                goto check_events;
        }       

fetch_events:   

        if (!ep_events_available(ep))
                ep_busy_loop(ep, timed_out);

        spin_lock_irqsave(&ep->lock, flags);

        if (!ep_events_available(ep)) {
                /*
                 * Busy poll timed out.  Drop NAPI ID for now, we can add
                 * it back in when we have moved a socket with a valid NAPI
                 * ID onto the ready list.
                 */
                ep_reset_busy_poll_napi_id(ep);

                /* 
                 * We don't have any available event to return to the caller.
                 * We need to sleep here, and we will be wake up by
                 * ep_poll_callback() when events will become available.
                 */
                init_waitqueue_entry(&wait, current);
                __add_wait_queue_exclusive(&ep->wq, &wait);

                for (;;) {
                        /*
                         * We don't want to sleep if the ep_poll_callback() sends us
                         * a wakeup in between. That's why we set the task state
                         * to TASK_INTERRUPTIBLE before doing the checks.
                         */
                        set_current_state(TASK_INTERRUPTIBLE);
                        /*
                         * Always short-circuit for fatal signals to allow
                         * threads to make a timely exit without the chance of
                         * finding more events available and fetching
                         * repeatedly.
                         */
                        if (fatal_signal_pending(current)) {
                                res = -EINTR;
                                break;
                        }
                        if (ep_events_available(ep) || timed_out)
                                break;
                        if (signal_pending(current)) {
                                res = -EINTR;
                                break;
                        }

                        spin_unlock_irqrestore(&ep->lock, flags);
                        if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
                                timed_out = 1;

                        spin_lock_irqsave(&ep->lock, flags);
                }

                __remove_wait_queue(&ep->wq, &wait);
                __set_current_state(TASK_RUNNING);
        }
check_events:
        /* Is it worth to try to dig for events ? */
        eavail = ep_events_available(ep);

        spin_unlock_irqrestore(&ep->lock, flags);

        /*
         * Try to transfer events to user space. In case we get 0 events and
         * there's still timeout left over, we go trying again in search of
         * more luck.
         */
        if (!res && eavail &&
            !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
                goto fetch_events;          //前面所有的代码都是进行检查和等待就绪队列中有句柄。最后调用ep_send_events将就绪队列中的数据拷贝到events中

        return res;
}

ep_send_events代码

static int ep_send_events(struct eventpoll *ep,
                          struct epoll_event __user *events, int maxevents)
{
        struct ep_send_events_data esed;

        esed.maxevents = maxevents;
        esed.events = events;

        ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
        return esed.res;
}

ep_scan_ready_list函数将rdlist中的句柄考入esed结构体中,最后返回给用户态。在拷贝时,这个函数做了很多保护。
结束语: 这篇大概写了epoll系统调用的执行流程。但是没有写,系统是怎么讲等待红黑树中的句柄加入 就绪队列中的。将在下篇中详细说明。

发表评论

电子邮件地址不会被公开。 必填项已用*标注