分类
未分类

linux 内核select机制分析

刚开始接触select函数的时候,没有觉得 这个机制有多厉害,自从上次看了quagga源代码以后,第一次感觉到select 这么厉害,居然能够用单线程非常好的处理多源头消息(定时任务,网络消息,进程通讯消息等)。当然,唯一的要求就是 单次处理的消息时间不能太长,消息不能非常频繁,消息对时效性要求不高。当程序满足以上条件时,即可使用select加单线程来处理。既减少了线程同步的麻烦又能同时处理多种类型的消息。
系统调用接口分析

SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp, //n所有句柄中最大值,最大不能超过1024,
                fd_set __user *, exp, struct timeval __user *, tvp) //inp监控读句柄,outp监控写句柄,exp监控异常句柄,tvp超时时间
{
        struct timespec64 end_time, *to = NULL;
        struct timeval tv;
        int ret;

        if (tvp) {
                if (copy_from_user(&tv, tvp, sizeof(tv)))
                        return -EFAULT;

                to = &end_time;
                if (poll_select_set_timeout(to,
                                tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
                                (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
                        return -EINVAL;
        }

        ret = core_sys_select(n, inp, outp, exp, to);       //调用系统调用,并将由消息的句柄重新回填到inp,outp,exp内存中。
        ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);

        return ret;
}

core_sys_select函数,主要是对inp,outp,exp中的数据进行预处理,最后调用do_select函数,去轮询文件句柄对应的文件的状态。在用户态中,已经将文件句柄转换成bitmap。在内核中需要将这些bitmap拷贝到内核空间。并为select的结果分配空间。这中间的代码很简单。所谓有点意思是,在栈空间首先预分配了一个空间,只有当预分配的空间不够时,才会低啊用kvmalloc重新分配新的空间。可以看出内核在最求性能方面。一直在最求极致。最后当do_select函数执行完成。core_sys_select函数将do_select函数执行的结果。拷贝到用户态的内存中。返回给用户,同样使用的也是bitmap表示,有消息的句柄对应的bitmap被置位。
do_select函数分析,这个函数是,整个系统调用的核心,废话不多说,直接上代码

static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
        ktime_t expire, *to = NULL;
        struct poll_wqueues table;
        poll_table *wait;
        int retval, i, timed_out = 0;
        u64 slack = 0;
        unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;   //判断是否一直轮询,如果一直轮序则不休息。
        unsigned long busy_start = 0;

        rcu_read_lock();
        retval = max_select_fd(n, fds);
        rcu_read_unlock();

        if (retval < 0)
                return retval;
        n = retval;

        poll_initwait(&table);
        wait = &table.pt;
        if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
                wait->_qproc = NULL;
                timed_out = 1;
        }

        if (end_time && !timed_out)
                slack = select_estimate_accuracy(end_time);         //如果有设置超时计算超时时间。

        retval = 0;
        for (;;) {          //一直循环,直到退出,退出体检时,超时,信号中断,文件句柄有事件
                unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
                bool can_busy_loop = false;

                inp = fds->in; outp = fds->out; exp = fds->ex;
                rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

                for (i = 0; i < n; ++rinp, ++routp, ++rexp) {       //第二层循环,按bytes 轮询
                        unsigned long in, out, ex, all_bits, bit = 1, mask, j;
                        unsigned long res_in = 0, res_out = 0, res_ex = 0;

                        in = *inp++; out = *outp++; ex = *exp++;
                        all_bits = in | out | ex;
                        if (all_bits == 0) {
                                i += BITS_PER_LONG;
                                continue;
                        }

                        for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {       //第三层循环 按bit轮询
                                struct fd f;
                                if (i >= n)
                                        break;
                                if (!(bit & all_bits))
                                        continue;
                                f = fdget(i);
                                if (f.file) {
                                        const struct file_operations *f_op;
                                        f_op = f.file->f_op;
                                        mask = DEFAULT_POLLMASK;
                                        if (f_op->poll) {
                                                wait_key_set(wait, in, out,
                                                             bit, busy_flag);
                                                mask = (*f_op->poll)(f.file, wait); //查询对应的文件句柄是否有事件,wait->_qproc为空只是查询状态。
                                        }
                                        fdput(f);
                                        if ((mask & POLLIN_SET) && (in & bit)) {    //如果有事件,设置对应的bitmap
                                                res_in |= bit;
                                                retval++;
                                                wait->_qproc = NULL;
                                        }
                                        if ((mask & POLLOUT_SET) && (out & bit)) {  //如果有事件,设置对应的bitmap
                                                res_out |= bit;
                                                retval++;
                                                wait->_qproc = NULL;
                                        }
                                        if ((mask & POLLEX_SET) && (ex & bit)) {    //如果有事件,设置对应的bitmap
                                                res_ex |= bit;
                                                retval++;
                                                wait->_qproc = NULL;
                                        }
                                        /* got something, stop busy polling */
                                        if (retval) {
                                                can_busy_loop = false;
                                                busy_flag = 0;

                                        /*
                                         * only remember a returned
                                         * POLL_BUSY_LOOP if we asked for it
                                         */
                                        } else if (busy_flag & mask)
                                                can_busy_loop = true;

                                }
                        }
                        if (res_in)
                                *rinp = res_in;     //拷贝,句柄bitmap
                        if (res_out)
                                *routp = res_out;   //拷贝,句柄bitmap
                        if (res_ex)
                                *rexp = res_ex;     //拷贝,句柄bitmap
                        cond_resched();
                }
                wait->_qproc = NULL;
                if (retval || timed_out || signal_pending(current))
                        break;
                if (table.error) {
                        retval = table.error;
                        break;
                }

                /* only if found POLL_BUSY_LOOP sockets && not out of time */
                if (can_busy_loop && !need_resched()) {     //如果是一直忙等,判断是否需要放弃cpu,忙等时间过长,需要给别的进程cpu
                        if (!busy_start) {
                                busy_start = busy_loop_current_time();
                                continue;
                        }
                        if (!busy_loop_timeout(busy_start))
                                continue;
                }
                busy_flag = 0;

                /*
                 * If this is the first loop and we have a timeout
                 * given, then we convert to ktime_t and set the to
                 * pointer to the expiry value.
                 */
                if (end_time && !to) {
                        expire = timespec64_to_ktime(*end_time);
                        to = &expire;
                }

                if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
                                           to, slack))  //当设置没有忙等时,用这个函数来放弃cpu。这个函数,时钟中断都可以唤醒,如果没有超时,则不设置timed_out否则设置time_out并在轮询一次。
                        timed_out = 1;
        }

        poll_freewait(&table);
        return retval;
}

该函数填充fds数据结构中的res_* 变量,表示查询到已经有事件的文件句柄
core_sys_select函数最终将结果拷贝到用户态内存空间,供用户程序使用。

注意:select系统调用中限制最大文件句柄数量为1024个,文件句柄号最大值是1024,文件句柄号超过1024就不能被select,即使只有一个句柄,但是句柄号是1025,select也不会监控它。

发表评论

电子邮件地址不会被公开。 必填项已用*标注