刚开始接触select函数的时候,没有觉得 这个机制有多厉害,自从上次看了quagga源代码以后,第一次感觉到select 这么厉害,居然能够用单线程非常好的处理多源头消息(定时任务,网络消息,进程通讯消息等)。当然,唯一的要求就是 单次处理的消息时间不能太长,消息不能非常频繁,消息对时效性要求不高。当程序满足以上条件时,即可使用select加单线程来处理。既减少了线程同步的麻烦又能同时处理多种类型的消息。
系统调用接口分析
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp, //n所有句柄中最大值,最大不能超过1024,
fd_set __user *, exp, struct timeval __user *, tvp) //inp监控读句柄,outp监控写句柄,exp监控异常句柄,tvp超时时间
{
struct timespec64 end_time, *to = NULL;
struct timeval tv;
int ret;
if (tvp) {
if (copy_from_user(&tv, tvp, sizeof(tv)))
return -EFAULT;
to = &end_time;
if (poll_select_set_timeout(to,
tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
return -EINVAL;
}
ret = core_sys_select(n, inp, outp, exp, to); //调用系统调用,并将由消息的句柄重新回填到inp,outp,exp内存中。
ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
return ret;
}
core_sys_select函数,主要是对inp,outp,exp中的数据进行预处理,最后调用do_select函数,去轮询文件句柄对应的文件的状态。在用户态中,已经将文件句柄转换成bitmap。在内核中需要将这些bitmap拷贝到内核空间。并为select的结果分配空间。这中间的代码很简单。所谓有点意思是,在栈空间首先预分配了一个空间,只有当预分配的空间不够时,才会低啊用kvmalloc重新分配新的空间。可以看出内核在最求性能方面。一直在最求极致。最后当do_select函数执行完成。core_sys_select函数将do_select函数执行的结果。拷贝到用户态的内存中。返回给用户,同样使用的也是bitmap表示,有消息的句柄对应的bitmap被置位。
do_select函数分析,这个函数是,整个系统调用的核心,废话不多说,直接上代码
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
u64 slack = 0;
unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0; //判断是否一直轮询,如果一直轮序则不休息。
unsigned long busy_start = 0;
rcu_read_lock();
retval = max_select_fd(n, fds);
rcu_read_unlock();
if (retval < 0)
return retval;
n = retval;
poll_initwait(&table);
wait = &table.pt;
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait->_qproc = NULL;
timed_out = 1;
}
if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time); //如果有设置超时计算超时时间。
retval = 0;
for (;;) { //一直循环,直到退出,退出体检时,超时,信号中断,文件句柄有事件
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) { //第二层循环,按bytes 轮询
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += BITS_PER_LONG;
continue;
}
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) { //第三层循环 按bit轮询
struct fd f;
if (i >= n)
break;
if (!(bit & all_bits))
continue;
f = fdget(i);
if (f.file) {
const struct file_operations *f_op;
f_op = f.file->f_op;
mask = DEFAULT_POLLMASK;
if (f_op->poll) {
wait_key_set(wait, in, out,
bit, busy_flag);
mask = (*f_op->poll)(f.file, wait); //查询对应的文件句柄是否有事件,wait->_qproc为空只是查询状态。
}
fdput(f);
if ((mask & POLLIN_SET) && (in & bit)) { //如果有事件,设置对应的bitmap
res_in |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLOUT_SET) && (out & bit)) { //如果有事件,设置对应的bitmap
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) { //如果有事件,设置对应的bitmap
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
/* got something, stop busy polling */
if (retval) {
can_busy_loop = false;
busy_flag = 0;
/*
* only remember a returned
* POLL_BUSY_LOOP if we asked for it
*/
} else if (busy_flag & mask)
can_busy_loop = true;
}
}
if (res_in)
*rinp = res_in; //拷贝,句柄bitmap
if (res_out)
*routp = res_out; //拷贝,句柄bitmap
if (res_ex)
*rexp = res_ex; //拷贝,句柄bitmap
cond_resched();
}
wait->_qproc = NULL;
if (retval || timed_out || signal_pending(current))
break;
if (table.error) {
retval = table.error;
break;
}
/* only if found POLL_BUSY_LOOP sockets && not out of time */
if (can_busy_loop && !need_resched()) { //如果是一直忙等,判断是否需要放弃cpu,忙等时间过长,需要给别的进程cpu
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))
continue;
}
busy_flag = 0;
/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/
if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack)) //当设置没有忙等时,用这个函数来放弃cpu。这个函数,时钟中断都可以唤醒,如果没有超时,则不设置timed_out否则设置time_out并在轮询一次。
timed_out = 1;
}
poll_freewait(&table);
return retval;
}
该函数填充fds数据结构中的res_* 变量,表示查询到已经有事件的文件句柄
core_sys_select函数最终将结果拷贝到用户态内存空间,供用户程序使用。
注意:select系统调用中限制最大文件句柄数量为1024个,文件句柄号最大值是1024,文件句柄号超过1024就不能被select,即使只有一个句柄,但是句柄号是1025,select也不会监控它。