分类
linux file system

字节招聘







字节跳动系统技术与工程团队招聘

字节跳动系统技术与工程团队招聘


Linux内核高级研发工程师

工作地点:北京/上海/杭州

投递地址:

https://job.toutiao.com/job/detail/50971


职位描述:

1. 针对业务需求定制Linux内核,结合业务需求开发内核新功能;

2. 结合服务特点对服务器底层/Linux内核进行性能调优;

3. 负责操作系统/内核前沿新技术的研究和应用;

职位要求:

1. 精通Linux内核,至少阅读过2-3个主要模块(调度,文件系统,网络,I/O,内存管理等)的源代码;

2. 熟悉Linux平台上的C语言编程,熟悉多进程多线程编程,熟悉socket编程;

3. 具有复杂系统软件的设计、开发和调优能力;

4. 有丰富内核故障调试或内核社区补丁提交经验者优先;

5. 有多平台(x86/ARM/RISC-V)内核与系统研发经验者优先

6. 了解主流虚拟化技术(Xen/KVM等)的实现,阅读过相关源代码者优先

7. 良好的团队合作精神,较强的沟通能力

8. 优秀的分析问题和解决问题的能力,对解决具有挑战性 问题充满激情

Linux内核研发工程师

工作地点:北京/上海/杭州

投递地址:

https://job.toutiao.com/job/detail/50977


职位描述:

1. 针对业务需求定制Linux内核,结合业务需求开发内核新功能;

2. 结合服务特点对服务器底层/Linux内核进行性能调优;

3. 负责改进,维护公司内部内核,保障内核稳定且高效。

职位要求:

1. 精通Linux内核,至少阅读过1个主要模块(调度,文件系统,网络,I/O,内存管理等)的源代码;

2. 熟悉Linux平台上的C语言编程,熟悉多进程多线程编程,熟悉socket编程;

3. 善于学习新的知识,动手能力强,有进取心;

4. 有内核社区补丁提交经验者优先;

5. 有丰富内核故障调试经验者优先。

网络高级研发工程师

工作地点:北京/上海/杭州

投递地址:

https://job.toutiao.com/job/detail/50988


职位描述:

1. 结合业务对Linux内核协议栈/用户态协议栈/RDMA等进行优化;

2. 负责公司内部相关 SDN、NFV 的设计、优化和开发;

3. 参与高性能网络/网络虚拟化研发;

4. 参与分布式网关/ACL/网络QoS 研发。

职位要求:

1. 3~5年相关工作经验,熟悉Linux操作系统下开发,熟悉Linux操作系统工作机制,熟悉Linux常见网络模型;

2. 熟悉Linux操作系统下C/C++开发, 能运用常见工具定位调试问题代码;

3. 熟悉了解 P4 语言,了解 openflow 协议,有 barefoot P4 相关开发经验优先;

4. 熟悉Linux kernel 或 FreeBSD 相关网络协议栈优先;

5. 熟悉路由器/交换机工作原理,熟悉NAT、VPN、负载均衡、SDN、NFV、overlay等网络技术

6. 熟悉 openstack neutron、openvswitch 等开源技术优先。

网络研发工程师

工作地点:北京/上海/杭州

投递地址:

https://job.toutiao.com/job/detail/50991


职位描述:

1. 结合业务对Linux内核协议栈等进行优化;

2. 参与高性能网络/网络虚拟化研发;

3. 参与分布式网关/ACL 研发。

职位要求:

1. 熟悉Linux操作系统下开发,熟悉Linux操作系统工作机制,熟悉Linux常见网络模型;

2. 熟悉Linux操作系统下C/C++开发, 能运用常见工具定位调试问题代码;

3. 熟悉Linux kernel 或 FreeBSD 相关网络协议栈优先;

4. 熟悉路由器/交换机工作原理,熟悉NAT、VPN等网络技术。

虚拟化高级研发工程师

工作地点:北京/上海/杭州

投递地址:

https://job.toutiao.com/job/detail/51032


职位描述:

1. 负责优化虚拟化技术的性能和稳定性;

2. 负责系统通用虚拟化技术前沿探索,不断满足业务需求;

3. 负责研究轻量虚拟化/安全容器等serverless场景下单机系统技术,满足业务需求。

岗位要求:

1. 精通C/C++编程和多线程性能优化,对系统编程有深入的理解;

2. 熟悉x86体系架构,深入理解操作系统原理及Linux内核,熟悉内核开发;

3. 有KVM/Xen,QEMU,Libvirt相关开发经验者优先;

4. 对Kata Containers、firecracker、crosvm、rust-vmm等有深入研究者优先;

5. 有较强硬件虚拟化研发经验者优先。

虚拟化研发工程师

工作地点:北京/上海/杭州

投递地址:

https://job.toutiao.com/job/detail/51039


职位描述:

1. 负责优化虚拟化技术的性能和稳定性;

2. 开发虚拟化性能分析工具和监控工具;

3. 参与系统通用虚拟化技术前沿探索,不断满足业务需求。

岗位要求:

1. 精通C/C++编程和多线程性能优化,对系统编程有深入的理解;

2. 熟悉x86体系架构,深入理解操作系统原理及Linux内核;

3. 有KVM/Xen,QEMU,Libvirt相关开发经验者优先;

4. 对Kata Containers、firecracker、crosvm、rust-vmm等有深入研究者优先;

5. 有较强硬件虚拟化研发经验者优先。

系统软件工程师

工作地点:北京

投递地址:

https://job.toutiao.com/job/detail/51040


职位描述:

1. 优化系统级基础软件设施,包括操作系统及其组件、基础库性能等;

2. 改进和优化系统监控、软件布署及其升级的自动化运维实现;

3. 优化巨量服务器的自动化管理及其维护方式;

4. 基本的系统软硬件性能瓶颈分析、故障trouble shooting排查。

职位要求:

1. 熟悉操作系统其及组件,对Linux内核有基本的了解和认识;

2. 熟悉C或者C++,并掌握其它语言如Python/Golang/Rust/Java的至少一种;

3. 熟悉常用的数据结构和算法、熟悉多线程编程及多线程程序性能优化;

4. 能够团队合作完成中大型系统软件的设计实现,代码健壮性好,有初步的代码性能优化经验;

5. 至少熟悉网络、存储、内存管理、进程调度、或者服务器硬件架构的其中一项。

系统数据平台研发工程师

工作地点:北京

投递地址:

https://job.toutiao.com/job/detail/51041


职位描述:

1. 负责超大规模数据中心系统和网络监控数据平台的设计和研发;

2. 设计和构建系统网络和监控大数据分析技术架构;

3. 应用机器学习技术提升系统和网络核心监控能力。

职位要求:

https://job.toutiao.com/job/detail/51094


1. 熟悉操作系统原理,有良好的数据结构和算法基础;

2. 熟悉至少一个数据处理平台,如 Map Reduce、Flink、Spark, 熟悉至少一种大数据数据库如;TSDB, Druid, ClickHouse,InfluxDB。PB级或以上数据处理经验优先;

3. 熟练掌握至少 Python/Go/Java 等编程语言之一,具备优秀的编码能力;

4. 自我驱动,具备良好的沟通技能,对新技术有强烈的学习热情;

5. 有统计或机器学习(TensorFlow/pytorch/MxNet)工作背景者优先;

6. 有相关开源社区工作经验者优先。

SRE工程师-系统技术方向

工作地点:北京

投递地址:

岗位职责:

1. 推进系统技术现有平台化服务的迭代和优化(系统压测平台、发布平台、系统滚动平台等服务/平台);

2. 推进新系统技术服务化、平台化、自动化研发落地;

3. 对海量数据进行数据清洗和整合,并对相关数据进行分析,构建健康预测能力和监控指标;

4. 构建底层技术的 CI/CD 机制,保证系统稳定性。

岗位要求:

1. 掌握常用开发语言 C/C++/Python/Golang/Shell;

2. 熟悉 Linux 操作系统,熟悉各种网络协议,对 linux kernel 有了解或者对底层技术有浓厚兴趣;

3. 有较强的学习能力,能够熟练阅读涉及产品和技术的英文文档;

4. 能够独立完成工作,具有较强的综合分析问题及解决问题的能力;

5. 有良好的工作文档习惯,及时按要求撰写更新工作流程及技术文档;

6. 有 AIOPS 项目经验者优先;有开源项目贡献者或开源项目领导者优先;

编译器高级研发工程师

工作地点:北京

投递地址:

https://job.toutiao.com/job/detail/51042


工作职责:

1. 负责开发与维护公司编译器工具链;

2. 负责对大型工程的代码生成以及编译器优化分析调优;

3. 深入研究编译器,优化落地并贡献开源社区。

任职要求:

1. 熟悉编译器架构与算法,有设计与实现编译各阶段的经验,包括语言处理,编译器优化和代码生成等;

2. 较强的C/C++开发能力,有丰富的问题分析定位与调试经验;

3. 熟悉业界主流的编译器的设计与开发,如LLVM,GCC;

4. 对编译器中间语言表达机制有深入的理解,有相关设计与优化经验;熟练阅读,解析,修改,优化IR指令;

5. 熟悉ARM汇编优先;对Buck、Bazel、Blade Build等编译构建系统有深入研究或者深度使用者优先。

编译器研发工程师

工作地点:北京

投递地址:

https://job.toutiao.com/job/detail/51043


工作职责:

1. 参与开发与维护公司编译器工具链;

2. 针对大型工程的代码生成以及编译器优化分析调优;

3. 深入研究编译器新技术,优化落地并贡献社区。

任职要求:

1. 熟练使用Linux;熟练使用git;熟练使用make/cmake等开发工具;

2. 较强的C/C++开发能力,有丰富的问题分析定位与调试经验;

3. 熟悉数据结构及算法;

4. 深入掌握基本的编译原理知识;

5. 熟悉业界至少1个主流的编译器的设计与开发,如LLVM/GCC等。

服务器固件研发工程师

工作地点:北京/上海/杭州

投递地址:

https://job.toutiao.com/job/detail/51044


职位描述:

1. 负责海量服务器硬件组件的各固件统一化定制(与OEM、ODM协同研发);

2. 负责下一代服务器、及自研硬件的固件软件开发及上线。

职位要求:

1. 两年以上相关工作经验,有熟练阅读英文技术规范、硬件手册的能力;

2. 熟悉C或者C++开发、具有良好的数据结构算法能力;

3. 符合以下要求其中两条或以上:

4. 熟悉Intel微处理器架构、或者Intel PCH芯片架构(或者ARM服务器对应的技术),并有相关固件代码经验;

5. 熟悉Legacy BIOS、UEFI、EDK II至少其一,熟悉ACPI标准,并有相关组件的代码开发经验;

6. 熟悉商用BMC或者OpenBMC的实现,并在此方向上有丰富的嵌入式C/C++编程经验;

7. 熟悉硬件故障处理的OS与固件的交互逻辑,并有相关代码开发经验;

8. 有TPU、GPU、高速网卡、SSL / Zip加速器其中之一的固件开发经验。

分类
linux file system linux network

linux socket REUSEADDR 内核代码分析

第一次接触REUSEADDR是在一年前,那个时候写了一个网络服务程序。写了一个监控脚本监控网络服务,当服务挂了,立马拉起来。但是经常会遇到拉不起来的情况,说端口已经被占用。但是那个端口自有我自己在用。怎么会被占用呢。后来查到说进程挂了,但是内核中连接还在。所以不能立马用。加上REUSEADDR就可以解决问题。今天挖掘一下内核代码。分析一下REUSEADDR实现。
创建socket

int __sys_socket(int family, int type, int protocol)
{
        int retval;
        struct socket *sock;
        int flags;

        /* Check the SOCK_* constants for consistency.  */
        BUILD_BUG_ON(SOCK_CLOEXEC != O_CLOEXEC);
        BUILD_BUG_ON((SOCK_MAX | SOCK_TYPE_MASK) != SOCK_TYPE_MASK);
        BUILD_BUG_ON(SOCK_CLOEXEC & SOCK_TYPE_MASK);
        BUILD_BUG_ON(SOCK_NONBLOCK & SOCK_TYPE_MASK);

        flags = type & ~SOCK_TYPE_MASK;
        if (flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK))
                return -EINVAL;
        type &= SOCK_TYPE_MASK;

        if (SOCK_NONBLOCK != O_NONBLOCK && (flags & SOCK_NONBLOCK))
                flags = (flags & ~SOCK_NONBLOCK) | O_NONBLOCK;

        retval = sock_create(family, type, protocol, &sock);            //根据用户传入的参数选择对应的协议创建对应的socket
        if (retval < 0)
                return retval;

        return sock_map_fd(sock, flags & (O_CLOEXEC | O_NONBLOCK));     //将socket和文件描述符绑定。
}

SYSCALL_DEFINE3(socket, int, family, int, type, int, protocol)
{
        return __sys_socket(family, type, protocol);
}

在linux中一切皆文件,sock_map_fd函数将sock和文件关联起来,返回给用户一个文件描述符。

static int sock_map_fd(struct socket *sock, int flags)
{
        struct file *newfile;
        int fd = get_unused_fd_flags(flags);
        if (unlikely(fd < 0)) {
                sock_release(sock); 
                return fd;
        }

        newfile = sock_alloc_file(sock, flags, NULL);           //创建文件,
        if (likely(!IS_ERR(newfile))) {
                fd_install(fd, newfile);            //将文件和文件描述符关联
                return fd;                          //返回文件描述符
        }

        put_unused_fd(fd);
        return PTR_ERR(newfile);
}

sock_alloc_file是创建sock匿名文件系统的文件。代码如下

struct file *sock_alloc_file(struct socket *sock, int flags, const char *dname)
{
        struct qstr name = { .name = "" };
        struct path path;
        struct file *file;

        if (dname) {
                name.name = dname; 
                name.len = strlen(name.name);
        } else if (sock->sk) {
                name.name = sock->sk->sk_prot_creator->name;
                name.len = strlen(name.name);
        }       
        path.dentry = d_alloc_pseudo(sock_mnt->mnt_sb, &name);
        if (unlikely(!path.dentry)) {
                sock_release(sock);
                return ERR_PTR(-ENOMEM);
        }       
        path.mnt = mntget(sock_mnt);

        d_instantiate(path.dentry, SOCK_INODE(sock));

        file = alloc_file(&path, FMODE_READ | FMODE_WRITE,
                  &socket_file_ops);            //创建文件,文件的读写函数是socket_file_ops指定。
        if (IS_ERR(file)) {
                /* drop dentry, keep inode for a bit */
                ihold(d_inode(path.dentry));
                path_put(&path);
                /* ... and now kill it properly */
                sock_release(sock);
                return file;
        }

        sock->file = file;
        file->f_flags = O_RDWR | (flags & O_NONBLOCK);
        file->private_data = sock;
        return file;
}       

创建问socket 后使用setsockopt设置REUSEADDR。代码如下

static int __sys_setsockopt(int fd, int level, int optname,
                            char __user *optval, int optlen)
{       
        int err, fput_needed;
        struct socket *sock;

        if (optlen < 0)
                return -EINVAL;

        sock = sockfd_lookup_light(fd, &err, &fput_needed);
        if (sock != NULL) {
                err = security_socket_setsockopt(sock, level, optname);
                if (err)
                        goto out_put;

                if (level == SOL_SOCKET)            //第二个参数时这个 调用sock_setsockopt函数
                        err =
                            sock_setsockopt(sock, level, optname, optval,
                                            optlen);            //调用该函数设置 REUSEADDR。
                else
                        err =
                            sock->ops->setsockopt(sock, level, optname, optval,
                                                  optlen);
out_put:
                fput_light(sock->file, fput_needed);
        }
        return err;
}               

SYSCALL_DEFINE5(setsockopt, int, fd, int, level, int, optname,
                char __user *, optval, int, optlen)
{               
        return __sys_setsockopt(fd, level, optname, optval, optlen);
}  

sock_setsockopt函数非常长,因为包含很多参数,所以函数很长,但是逻辑简单,关键代码如下

        case SO_REUSEADDR:
                sk->sk_reuse = (valbool ? SK_CAN_REUSE : SK_NO_REUSE);      //如果使用REUSEADDR 则sk->sk_reuse置位
                break;

最后看bind 系统调用

int __sys_bind(int fd, struct sockaddr __user *umyaddr, int addrlen)
{
        struct socket *sock;    
        struct sockaddr_storage address;
        int err, fput_needed;

        sock = sockfd_lookup_light(fd, &err, &fput_needed);
        if (sock) {
                err = move_addr_to_kernel(umyaddr, addrlen, &address);
                if (err >= 0) {
                        err = security_socket_bind(sock,
                                                   (struct sockaddr *)&address,
                                                   addrlen);
                        if (!err)
                                err = sock->ops->bind(sock,
                                                      (struct sockaddr *)
                                                      &address, addrlen);       //最后调用4层协议的bind函数,接下来以udp为例分析
                }
                fput_light(sock->file, fput_needed);
        }
        return err;
}

SYSCALL_DEFINE3(bind, int, fd, struct sockaddr __user *, umyaddr, int, addrlen)
{
        return __sys_bind(fd, umyaddr, addrlen);
}

以 IPV4 udp 为例,可以得知 sock->ops->bind最终调用的函数是inet_bind函数,函数在net/ipv4/af_inet.c文件中。inet_bind 经过一些安全检查调用 __inet_bind函数, __inet_bind函数中会调用协议对应的get_port函数,判断绑定的端口是否能够被使用。

        if (snum || !(inet->bind_address_no_port ||
                      force_bind_address_no_port)) {
                if (sk->sk_prot->get_port(sk, snum)) {          //如果返回值非零,表示地址已经被使用
                        inet->inet_saddr = inet->inet_rcv_saddr = 0;
                        err = -EADDRINUSE;
                        goto out_release_sock;
                }
                err = BPF_CGROUP_RUN_PROG_INET4_POST_BIND(sk);
                if (err) {
                        inet->inet_saddr = inet->inet_rcv_saddr = 0;
                        goto out_release_sock;
                }
        }

以ipv4 udp为例子,get_port调用的是udp_v4_get_port 文件位置是net/ipv4/udp.c
udp_v4_get_port函数最终调用udp_lib_get_port函数判断地址是否被使用。udp_lib_get_port调用udp_lib_lport_inuse2判断地址是否被使用。
udp_lib_lport_inuse2函数

static int udp_lib_lport_inuse2(struct net *net, __u16 num, 
                                struct udp_hslot *hslot2,
                                struct sock *sk) 
{
        struct sock *sk2;
        kuid_t uid = sock_i_uid(sk);
        int res = 0; 

        spin_lock(&hslot2->lock);
        udp_portaddr_for_each_entry(sk2, &hslot2->head) {
                if (net_eq(sock_net(sk2), net) &&
                    sk2 != sk &&
                    (udp_sk(sk2)->udp_port_hash == num) &&
                    (!sk2->sk_reuse || !sk->sk_reuse) &&                //如果都设置了reuse,则跳过sk2 继续查找下一个。
                    (!sk2->sk_bound_dev_if || !sk->sk_bound_dev_if ||
                     sk2->sk_bound_dev_if == sk->sk_bound_dev_if) &&
                    inet_rcv_saddr_equal(sk, sk2, true)) {
                        if (sk2->sk_reuseport && sk->sk_reuseport &&
                            !rcu_access_pointer(sk->sk_reuseport_cb) &&
                            uid_eq(uid, sock_i_uid(sk2))) {
                                res = 0; 
                        } else {
                                res = 1; 
                        }    
                        break;
                }    
        }    
        spin_unlock(&hslot2->lock);
        return res; 
}

由上面的代码可以看出,在udp模式下,允许两个socket共享一个地址。而不报错。

进一步分析代码,我们可以发现,只是使用REUSEADDR,虽然在同一个地址上创建多个socket,但是只有最后一个socket是收包的,其他socket不收包,做不到多个socket 负载均衡的功能,要想使用负载均衡的功能需要使用REUSEPORT参数。在上面代码里面已经隐隐约约能看到REUSEPORT参数的代码。我们下街分析。

分类
linux file system

linux epoll 机制分析–如何就绪

在上一节中,我们讨论了如何使用epoll。这一节,我们将讨论,epoll是如何由不就绪变成就绪状态。
在上一节中,我们已经知道如果使用epoll监控某个文件状态改变,需要调用epoll_ctl系统调用将文件加入监控集合中。分析代码发现,再加入时是使用ep_insert函数完成一些初始操作。因此,状态改变的方式一定跟ep_insert的操作有关系。
首先查看重要的数据结构 struct epitem.结构如下

struct epitem {
        union {
                /* RB tree node links this structure to the eventpoll RB tree */
                struct rb_node rbn;
                /* Used to free the struct epitem */
                struct rcu_head rcu;
        };          //还没有发生事件时,红黑树。

        /* List header used to link this structure to the eventpoll ready list */
        struct list_head rdllink;       //如果就绪,放入epoll就序列表

        /*
         * Works together "struct eventpoll"->ovflist in keeping the
         * single linked chain of items.
         */
        struct epitem *next;

        /* The file descriptor information this item refers to */
        struct epoll_filefd ffd;        //存放等待事件发生的文件

        /* Number of active wait queue attached to poll operations */
        int nwait;      

        /* List containing poll wait queues */
        struct list_head pwqlist;       //等待列表

        /* The "container" of this item */
        struct eventpoll *ep;           //表明属于哪个epoll句柄

        /* List header used to link this item to the "struct file" items list */
        struct list_head fllink;

        /* wakeup_source used when EPOLLWAKEUP is set */
        struct wakeup_source __rcu *ws;

        /* The structure that describe the interested events and the source fd */
        struct epoll_event event;           //响应的事件
};

这个结构体,就是被监控文件在epoll中的一个实例。每次增加监控,多增加一个这样的结构。
分析ep_insert中的三处关关键代码。其他代码都是错误检查初始化等就不分析了。
第一处关键代码

        INIT_LIST_HEAD(&epi->rdllink);
        INIT_LIST_HEAD(&epi->fllink);
        INIT_LIST_HEAD(&epi->pwqlist);
        epi->ep = ep;
        ep_set_ffd(&epi->ffd, tfile, fd);
        epi->event = *event;
        epi->nwait = 0;
        epi->next = EP_UNACTIVE_PTR;
        if (epi->event.events & EPOLLWAKEUP) {
                error = ep_create_wakeup_source(epi);
                if (error)
                        goto error_create_wakeup_source;
        } else {
                RCU_INIT_POINTER(epi->ws, NULL);
        }           //以上都是初始化epi结构体

        /* Initialize the poll table using the queue callback */
        epq.epi = epi;
        init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);       //初始化回调函数,这个回调函数会在目标文件的poll执行时回调。目的是把epi注册到目标文件的事件通知链中。

init_poll_funcptr 函数代码如下

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
        pt->_qproc = qproc;
        pt->_key   = ~(__poll_t)0; /* all events enabled */
}

ep_table_queue_proc 函数代码如下

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                                 poll_table *pt)
{
        struct epitem *epi = ep_item_from_epqueue(pt);
        struct eppoll_entry *pwq;       //等待队列实体

        if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
                init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);        //注册事件通知时需要执行的函数
                pwq->whead = whead;
                pwq->base = epi;
                if (epi->event.events & EPOLLEXCLUSIVE)
                        add_wait_queue_exclusive(whead, &pwq->wait);
                else
                        add_wait_queue(whead, &pwq->wait);  //添加到事件列表中。
                list_add_tail(&pwq->llink, &epi->pwqlist);
                epi->nwait++;
        } else {
                /* We have to signal that an error occurred */
                epi->nwait = -1;
        }
}

ep_poll_callback 函数是epoll的核心函数,当监控句柄有事件发生时,这个函数被执行,判断是否为关心事件,如果是关心事件,则将epi结构体放入ep就绪列表中。最后我们回分析ep_poll_callback函数。
epoll_insert第二处关键代码

        revents = ep_item_poll(epi, &epq.pt, 1);        //这处代码是注册事件

        /*
         * We have to check if something went wrong during the poll wait queue
         * install process. Namely an allocation for a wait queue failed due
         * high memory pressure.
         */
        error = -ENOMEM;
        if (epi->nwait < 0)
                goto error_unregister;

        /* Add the current item to the list of active epoll hook for this file */
        spin_lock(&tfile->f_lock);
        list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
        spin_unlock(&tfile->f_lock);

ep_item_poll函数 代码如下:

static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
                                 int depth)
{       
        struct eventpoll *ep;
        bool locked;

        pt->_key = epi->event.events;
        if (!is_file_epoll(epi->ffd.file))          //如果被监控文件也是epoll则走下面逻辑。否则执行被监控文件的poll函数。
                return epi->ffd.file->f_op->poll(epi->ffd.file, pt) &
                       epi->event.events;

        ep = epi->ffd.file->private_data;
        poll_wait(epi->ffd.file, &ep->poll_wait, pt);
        locked = pt && (pt->_qproc == ep_ptable_queue_proc);

        return ep_scan_ready_list(epi->ffd.file->private_data,
                                  ep_read_events_proc, &depth, depth,
                                  locked) & epi->event.events;
} 

到这里我们需要看被监控文件的poll函数。这里使用eventfd模块的poll函数作为例子分析。

static __poll_t eventfd_poll(struct file *file, poll_table *wait)
{
        struct eventfd_ctx *ctx = file->private_data;
        __poll_t events = 0;
        u64 count;

        poll_wait(file, &ctx->wqh, wait);       //select 和poll这个函数都返回空,epoll会执行这个函数。
        count = READ_ONCE(ctx->count);

        if (count > 0)
                events |= EPOLLIN;
        if (count == ULLONG_MAX)
                events |= EPOLLERR;
        if (ULLONG_MAX - 1 > count)
                events |= EPOLLOUT;

        return events;
}

poll_wait函数代码。

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) 
{
        if (p && p->_qproc && wait_address)
                p->_qproc(filp, wait_address, p); 
}

_qproc这里就是ep_ptable_queue_proc 函数。这样就将 epoll的epi挂在到目标文件的事件通知链上了。如果事件发生,则会执行ep_poll_callback函数代码。接下来查看ep_poll_callback代码。如果对事件发生时会执行ep_poll_callback代码有疑问的可以看一下前面写的eventfd的分析。
ep_poll_callback 执行了一系列检查以后,将文件放入就绪列表中。至此,目标文件状态改变,epoll也能监控到。 ep_poll_callback中还加入唤醒ep_wait的操作,让ep_wait执行检查,ep_wait在上一节中介绍。
到这里,我们已经了解epoll的主要执行流程,其他细节不同的内核版本可能不一样,可以通过看代码详细了解。

分类
linux file system

linux epoll 机制分析–使用流程

这篇本来应该在poll机制分析之后写的,但是由于中间跟朋友一起在学习trace代码,耽搁了。
闲话少说。我们先看看poll机制有哪些弊端。
1.每次调用poll都要将全部文件描述符传到内核中,这是性能弊端。
2.在内核中poll是一个轮询方式。对传入的所有文件描述符进行轮询,查看是否有事件发生。如果都没有事件发生,睡眠一会再来次。这存在两个问题。第一cpu并没有空闲,一直在忙查。第二个,事件处理不及时,必须等全部轮询一遍以后才会返回。
基于以上弊端,epoll做了以下改进。
1.句柄集合下发与等待事件分开。先下发,然后在等待。等待可以多次。
2.利用系统的等待队列唤醒机制。当有事件发生时。执行特定函数将相关文件句柄放入就绪列表中。

总结:在我看来 poll和epoll就像没有中断和有中断的外设与系统通讯一样。之前需要cpu忙查现在只需要等待通知即可。
直接代码分析。首先上关键数据结构 eventpoll,这个结构贯穿始终。主要是用来保存,等待的fd集合以及就绪fd集合。

struct eventpoll {
        /* Protect the access to this structure */
        spinlock_t lock;

        /*
         * This mutex is used to ensure that files are not removed
         * while epoll is using them. This is held during the event
         * collection loop, the file cleanup path, the epoll file exit
         * code and the ctl operations.
         */
        struct mutex mtx;

        /* Wait queue used by sys_epoll_wait() */
        wait_queue_head_t wq;

        /* Wait queue used by file->poll() */
        wait_queue_head_t poll_wait;

        /* List of ready file descriptors */
        struct list_head rdllist;           //已有事件发生的fd列表

        /* RB tree root used to store monitored fd structs */
        struct rb_root_cached rbr;              //所有需要等待的fd红黑树。

        /*                       
         * This is a single linked list that chains all the "struct epitem" that
         * happened while transferring ready events to userspace w/out
         * holding ->lock.
         */
        struct epitem *ovflist;

        /* wakeup_source used when ep_scan_ready_list is running */
        struct wakeup_source *ws;

        /* The user that created the eventpoll descriptor */
        struct user_struct *user;

        struct file *file;

        /* used to optimize loop detection check */
        int visited;
        struct list_head visited_list_link;

#ifdef CONFIG_NET_RX_BUSY_POLL
        /* used to track busy poll napi_id */
        unsigned int napi_id;
#endif
};

使用epoll 第一步调用的系统调用时 epoll_create,代码如下

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
        int error, fd;
        struct eventpoll *ep = NULL;
        struct file *file;

        /* Check the EPOLL_* constant for consistency.  */
        BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

        if (flags & ~EPOLL_CLOEXEC)
                return -EINVAL;
        /*
         * Create the internal data structure ("struct eventpoll").
         */
        error = ep_alloc(&ep);  //分配eventpoll数据结构。
        if (error < 0)
                return error;
        /*
         * Creates all the items needed to setup an eventpoll file. That is,
         * a file structure and a free file descriptor.
         */
        fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
        if (fd < 0) {
                error = fd;
                goto out_free_ep;
        }
        file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                                 O_RDWR | (flags & O_CLOEXEC)); //创建eventpoll匿名文件,私有数据时eventpoll结构
        if (IS_ERR(file)) {
                error = PTR_ERR(file);
                goto out_free_fd;
        }
        ep->file = file;
        fd_install(fd, file);
        return fd;      //返回文件句柄。

out_free_fd:
        put_unused_fd(fd);
out_free_ep:
        ep_free(ep);
        return error;
}

注意 eventpoll_fops 数据。这个数据的值如下

static const struct file_operations eventpoll_fops = {
#ifdef CONFIG_PROC_FS
        .show_fdinfo    = ep_show_fdinfo,
#endif
        .release        = ep_eventpoll_release,
        .poll           = ep_eventpoll_poll,
        .llseek         = noop_llseek,
};

由于该文件不是用来读写的 所以没有读写函数,有一个release函数在关闭句柄时释放资源。有一个poll函数。由此可猜测epoll可以进行嵌套。查了一下ep_eventpoll_poll函数,的确是可以嵌套。而且嵌套时,只要下层epoll句柄中的文件句柄集合有就绪文件句柄。上层epoll会把下层epoll文件句柄放入就绪列表中。
epoll文件句柄创建好了。接下来会使用epoll_ctl系统调用向句柄中添加,等待事件的文件句柄。

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
                struct epoll_event __user *, event)
{
        int error;
        int full_check = 0;
        struct fd f, tf;
        struct eventpoll *ep;
        struct epitem *epi;
        struct epoll_event epds;
        struct eventpoll *tep = NULL;

        error = -EFAULT;
        if (ep_op_has_event(op) &&          //检查操作是否为删除
            copy_from_user(&epds, event, sizeof(struct epoll_event)))
                goto error_return;

        error = -EBADF;
        f = fdget(epfd);
        if (!f.file)            //获取epoll文件句柄
                goto error_return;

        /* Get the "struct file *" for the target file */
        tf = fdget(fd);
        if (!tf.file)           //获取目标文件句柄
                goto error_fput;

        /* The target file descriptor must support poll */
        error = -EPERM;
        if (!tf.file->f_op->poll)       //判断目标文件是否有poll函数
                goto error_tgt_fput;

        /* Check if EPOLLWAKEUP is allowed */
        if (ep_op_has_event(op))
                ep_take_care_of_epollwakeup(&epds);

        /*
         * We have to check that the file structure underneath the file descriptor
         * the user passed to us _is_ an eventpoll file. And also we do not permit
         * adding an epoll file descriptor inside itself.
         */
        error = -EINVAL;
        if (f.file == tf.file || !is_file_epoll(f.file))    //判断epoll文件句柄是否是epoll类型文件
                goto error_tgt_fput;

        /*
         * epoll adds to the wakeup queue at EPOLL_CTL_ADD time only,
         * so EPOLLEXCLUSIVE is not allowed for a EPOLL_CTL_MOD operation.
         * Also, we do not currently supported nested exclusive wakeups.
         */
        if (ep_op_has_event(op) && (epds.events & EPOLLEXCLUSIVE)) {
                if (op == EPOLL_CTL_MOD)
                        goto error_tgt_fput;
                if (op == EPOLL_CTL_ADD && (is_file_epoll(tf.file) ||
                                (epds.events & ~EPOLLEXCLUSIVE_OK_BITS)))
                        goto error_tgt_fput;
        }

        /*
         * At this point it is safe to assume that the "private_data" contains
         * our own data structure.
         */
        ep = f.file->private_data;
        /*
         * When we insert an epoll file descriptor, inside another epoll file
         * descriptor, there is the change of creating closed loops, which are
         * better be handled here, than in more critical paths. While we are
         * checking for loops we also determine the list of files reachable
         * and hang them on the tfile_check_list, so we can check that we
         * haven't created too many possible wakeup paths.
         *
         * We do not need to take the global 'epumutex' on EPOLL_CTL_ADD when
         * the epoll file descriptor is attaching directly to a wakeup source,
         * unless the epoll file descriptor is nested. The purpose of taking the
         * 'epmutex' on add is to prevent complex toplogies such as loops and
         * deep wakeup paths from forming in parallel through multiple
         * EPOLL_CTL_ADD operations.
         */
        mutex_lock_nested(&ep->mtx, 0);
        if (op == EPOLL_CTL_ADD) {
                if (!list_empty(&f.file->f_ep_links) ||
                                                is_file_epoll(tf.file)) {
                        full_check = 1;
                        mutex_unlock(&ep->mtx);
                        mutex_lock(&epmutex);
                        if (is_file_epoll(tf.file)) {
                                error = -ELOOP;
                                if (ep_loop_check(ep, tf.file) != 0) {
                                        clear_tfile_check_list();
                                        goto error_tgt_fput;
                                }
                        } else
                                list_add(&tf.file->f_tfile_llink,
                                                        &tfile_check_list);
                        mutex_lock_nested(&ep->mtx, 0);
                        if (is_file_epoll(tf.file)) {
                                tep = tf.file->private_data;
                                mutex_lock_nested(&tep->mtx, 1);
                        }
                }
        }               //如果是添加类型操作,判断是否有环。

        /*
         * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
         * above, we can be sure to be able to use the item looked up by
         * ep_find() till we release the mutex.
         */
        epi = ep_find(ep, tf.file, fd);

        error = -EINVAL;
        switch (op) {
        case EPOLL_CTL_ADD: //添加操作
                if (!epi) {
                        epds.events |= EPOLLERR | EPOLLHUP;
                        error = ep_insert(ep, &epds, tf.file, fd, full_check);
                } else
                        error = -EEXIST;
                if (full_check)
                        clear_tfile_check_list();
                break;
        case EPOLL_CTL_DEL: //删除操作
                if (epi)
                        error = ep_remove(ep, epi);
                else
                        error = -ENOENT;
                break;
        case EPOLL_CTL_MOD: //修改等待的类型。
                if (epi) {
                        if (!(epi->event.events & EPOLLEXCLUSIVE)) {
                                epds.events |= EPOLLERR | EPOLLHUP;
                                error = ep_modify(ep, epi, &epds);
                        }
                } else
                        error = -ENOENT;
                break;
        }
        if (tep != NULL)
                mutex_unlock(&tep->mtx);
        mutex_unlock(&ep->mtx);

error_tgt_fput:
        if (full_check)
                mutex_unlock(&epmutex);

        fdput(tf);
error_fput:
        fdput(f);
error_return:

        return error;

最后使用epoll_wait系统调用获取,有事件发生的句柄。代码如下

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
                int, maxevents, int, timeout)       //maxevents 最多等待事件数,timeout超时时间
{
        int error;
        struct fd f;
        struct eventpoll *ep;

        /* The maximum number of event must be greater than zero */
        if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
                return -EINVAL;

        /* Verify that the area passed by the user is writeable */
        if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
                return -EFAULT;

        /* Get the "struct file *" for the eventpoll file */
        f = fdget(epfd);
        if (!f.file)
                return -EBADF;

        /*
         * We have to check that the file structure underneath the fd
         * the user passed to us _is_ an eventpoll file.
         */
        error = -EINVAL;
        if (!is_file_epoll(f.file))
                goto error_fput;

        /*
         * At this point it is safe to assume that the "private_data" contains
         * our own data structure.
         */
        ep = f.file->private_data;

        /* Time to fish for events ... */
        error = ep_poll(ep, events, maxevents, timeout);    //前面都是做安全检查,真正做事的函数是ep_poll

error_fput:
        fdput(f);
        return error;
}

ep_poll代码

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                   int maxevents, long timeout)
{
        int res = 0, eavail, timed_out = 0;
        unsigned long flags;
        u64 slack = 0;
        wait_queue_entry_t wait;
        ktime_t expires, *to = NULL;

        if (timeout > 0) {
                struct timespec64 end_time = ep_set_mstimeout(timeout);

                slack = select_estimate_accuracy(&end_time);
                to = &expires;
                *to = timespec64_to_ktime(end_time);
        } else if (timeout == 0) {
                /*
                 * Avoid the unnecessary trip to the wait queue loop, if the
                 * caller specified a non blocking operation.
                 */
                timed_out = 1;
                spin_lock_irqsave(&ep->lock, flags);
                goto check_events;
        }       

fetch_events:   

        if (!ep_events_available(ep))
                ep_busy_loop(ep, timed_out);

        spin_lock_irqsave(&ep->lock, flags);

        if (!ep_events_available(ep)) {
                /*
                 * Busy poll timed out.  Drop NAPI ID for now, we can add
                 * it back in when we have moved a socket with a valid NAPI
                 * ID onto the ready list.
                 */
                ep_reset_busy_poll_napi_id(ep);

                /* 
                 * We don't have any available event to return to the caller.
                 * We need to sleep here, and we will be wake up by
                 * ep_poll_callback() when events will become available.
                 */
                init_waitqueue_entry(&wait, current);
                __add_wait_queue_exclusive(&ep->wq, &wait);

                for (;;) {
                        /*
                         * We don't want to sleep if the ep_poll_callback() sends us
                         * a wakeup in between. That's why we set the task state
                         * to TASK_INTERRUPTIBLE before doing the checks.
                         */
                        set_current_state(TASK_INTERRUPTIBLE);
                        /*
                         * Always short-circuit for fatal signals to allow
                         * threads to make a timely exit without the chance of
                         * finding more events available and fetching
                         * repeatedly.
                         */
                        if (fatal_signal_pending(current)) {
                                res = -EINTR;
                                break;
                        }
                        if (ep_events_available(ep) || timed_out)
                                break;
                        if (signal_pending(current)) {
                                res = -EINTR;
                                break;
                        }

                        spin_unlock_irqrestore(&ep->lock, flags);
                        if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
                                timed_out = 1;

                        spin_lock_irqsave(&ep->lock, flags);
                }

                __remove_wait_queue(&ep->wq, &wait);
                __set_current_state(TASK_RUNNING);
        }
check_events:
        /* Is it worth to try to dig for events ? */
        eavail = ep_events_available(ep);

        spin_unlock_irqrestore(&ep->lock, flags);

        /*
         * Try to transfer events to user space. In case we get 0 events and
         * there's still timeout left over, we go trying again in search of
         * more luck.
         */
        if (!res && eavail &&
            !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
                goto fetch_events;          //前面所有的代码都是进行检查和等待就绪队列中有句柄。最后调用ep_send_events将就绪队列中的数据拷贝到events中

        return res;
}

ep_send_events代码

static int ep_send_events(struct eventpoll *ep,
                          struct epoll_event __user *events, int maxevents)
{
        struct ep_send_events_data esed;

        esed.maxevents = maxevents;
        esed.events = events;

        ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
        return esed.res;
}

ep_scan_ready_list函数将rdlist中的句柄考入esed结构体中,最后返回给用户态。在拷贝时,这个函数做了很多保护。
结束语: 这篇大概写了epoll系统调用的执行流程。但是没有写,系统是怎么讲等待红黑树中的句柄加入 就绪队列中的。将在下篇中详细说明。

分类
linux file system

linux eventfd详解

最开始听说eventfd的时候是在virtio vhost驱动那块听说的。以为很高深的一个技术,没有细看。
最近在了解select poll epoll的时候有看到这个东西,于是决定好好看看eventfd这个模块的代码。
代码路径:fs/eventfd.c
系统调用:eventfd1 eventfd2
使用逻辑:首先程序调用eventfd*系统调用,创建eventfd文件描述符。然后通过对文件描述符的read和write来做进程间通讯。由于eventfd模块的文件操作指针中有poll函数因此可以使用select poll epoll来等待。逻辑简单。具体看代码分析。
关键数据结构

struct eventfd_ctx {
        struct kref kref;           //对象引用计数
        wait_queue_head_t wqh;      //等待队列首地址
        /*
         * Every time that a write(2) is performed on an eventfd, the
         * value of the __u64 being written is added to "count" and a
         * wakeup is performed on "wqh". A read(2) will return the "count"
         * value to userspace, and will reset "count" to zero. The kernel
         * side eventfd_signal() also, adds to the "count" counter and
         * issue a wakeup.
         */
        __u64 count;                //read write 操作的计数
        unsigned int flags;         //eventfd 文件特性描述
};

创建eventfd文件描述符

SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags)
{
        int fd, error;
        struct file *file;

        error = get_unused_fd_flags(flags & EFD_SHARED_FCNTL_FLAGS);    //获取空闲描述符
        if (error < 0)
                return error;
        fd = error;

        file = eventfd_file_create(count, flags);           //创建eventfd文件对象
        if (IS_ERR(file)) {            
                error = PTR_ERR(file);
                goto err_put_unused_fd;
        }
        fd_install(fd, file);                           //文件描述符和eventfd文件对象关联

        return fd;                              

err_put_unused_fd:
        put_unused_fd(fd);

        return error;
}

struct file *eventfd_file_create(unsigned int count, int flags)
{
        struct file *file;
        struct eventfd_ctx *ctx;

        /* Check the EFD_* constants for consistency.  */
        BUILD_BUG_ON(EFD_CLOEXEC != O_CLOEXEC);
        BUILD_BUG_ON(EFD_NONBLOCK != O_NONBLOCK);

        if (flags & ~EFD_FLAGS_SET)
                return ERR_PTR(-EINVAL); 

        ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);        //分配eventfd文件私有数据结构内存。
        if (!ctx)
                return ERR_PTR(-ENOMEM);

        kref_init(&ctx->kref);
        init_waitqueue_head(&ctx->wqh);
        ctx->count = count;
        ctx->flags = flags;

        file = anon_inode_getfile("[eventfd]", &eventfd_fops, ctx,
                                  O_RDWR | (flags & EFD_SHARED_FCNTL_FLAGS));//创建匿名inode 文件,即创建一个文件,但是这个文件的挂载点是匿名的。表明这个文件只是内存中存在,硬盘中不存在。一个虚拟文件系统
        if (IS_ERR(file))
                eventfd_free_ctx(ctx);

        return file;
}

eventfd_fops数据如下

static const struct file_operations eventfd_fops = {
#ifdef CONFIG_PROC_FS
        .show_fdinfo    = eventfd_show_fdinfo,
#endif
        .release        = eventfd_release,
        .poll           = eventfd_poll,         //文件poll操作,给select epoll使用
        .read           = eventfd_read,         //文件读操作
        .write          = eventfd_write,        //文件写操作
        .llseek         = noop_llseek,
};

至此,虚拟evnetfd 文件已经创建完,对应的文件描述符也返回给用户程序。

现在分析 对改文件的读写操作。首先是写操作

static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count,
                             loff_t *ppos)
{
        struct eventfd_ctx *ctx = file->private_data;
        ssize_t res;
        __u64 ucnt;
        DECLARE_WAITQUEUE(wait, current);               //初始化等待变量

        if (count < sizeof(ucnt))
                return -EINVAL;
        if (copy_from_user(&ucnt, buf, sizeof(ucnt)))   //拷贝要写入的值大小
                return -EFAULT;
        if (ucnt == ULLONG_MAX)
                return -EINVAL;
        spin_lock_irq(&ctx->wqh.lock);                  //自旋锁,禁止中断
        res = -EAGAIN;
        if (ULLONG_MAX - ctx->count > ucnt)
                res = sizeof(ucnt);
        else if (!(file->f_flags & O_NONBLOCK)) {       //判断是否阻塞,如果不阻塞跳过等待可写状态代码。
                __add_wait_queue(&ctx->wqh, &wait);     //将wait添加到等待队列中
                for (res = 0;;) {
                        set_current_state(TASK_INTERRUPTIBLE);
                        if (ULLONG_MAX - ctx->count > ucnt) {
                                res = sizeof(ucnt);
                                break;
                        }
                        if (signal_pending(current)) {
                                res = -ERESTARTSYS;
                                break;
                        }
                        spin_unlock_irq(&ctx->wqh.lock);
                        schedule();                     //切换到其他进程执行,当前进程挂起
                        spin_lock_irq(&ctx->wqh.lock);
                }
                __remove_wait_queue(&ctx->wqh, &wait);  //进程不需要等待,醒来。
                __set_current_state(TASK_RUNNING);      //设置当前进程为运行状态
        }
        if (likely(res > 0)) {
                ctx->count += ucnt;                     //写入数据
                if (waitqueue_active(&ctx->wqh))        //判断等待队列是否为空
                        wake_up_locked_poll(&ctx->wqh, POLLIN);     //唤醒等待读的进程。
        }
        spin_unlock_irq(&ctx->wqh.lock);                //解除自旋锁

        return res;             //返回写入的字节数
}

wake_up_locked_poll 函数函数体

static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
                        int nr_exclusive, int wake_flags, void *key,
                        wait_queue_entry_t *bookmark)
{
        wait_queue_entry_t *curr, *next;
        int cnt = 0;

        if (bookmark && (bookmark->flags & WQ_FLAG_BOOKMARK)) {
                curr = list_next_entry(bookmark, entry);

                list_del(&bookmark->entry);
                bookmark->flags = 0;
        } else
                curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);

        if (&curr->entry == &wq_head->head)
                return nr_exclusive;

        list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
                unsigned flags = curr->flags;
                int ret;

                if (flags & WQ_FLAG_BOOKMARK)
                        continue;

                ret = curr->func(curr, mode, wake_flags, key);  //执行相应的唤醒函数,进程阻塞和使用epoll执行的这个函数不同。决定是唤醒进程还是,只是将描述符加入就绪队列。
                if (ret < 0)
                        break;
                if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)  //决定唤醒的进程数量,防止兽群效应。
                        break;

                if (bookmark && (++cnt > WAITQUEUE_WALK_BREAK_CNT) &&
                                (&next->entry != &wq_head->head)) {
                        bookmark->flags = WQ_FLAG_BOOKMARK;
                        list_add_tail(&bookmark->entry, &next->entry);
                        break;
                }
        }
        return nr_exclusive;
}

eventfd read函数

static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count,
                            loff_t *ppos)
{
        struct eventfd_ctx *ctx = file->private_data;
        ssize_t res;
        __u64 cnt;

        if (count < sizeof(cnt))
                return -EINVAL;
        res = eventfd_ctx_read(ctx, file->f_flags & O_NONBLOCK, &cnt);//关键函数,读取count至
        if (res < 0)
                return res;

        return put_user(cnt, (__u64 __user *) buf) ? -EFAULT : sizeof(cnt);
}

ssize_t eventfd_ctx_read(struct eventfd_ctx *ctx, int no_wait, __u64 *cnt)
{       
        ssize_t res;
        DECLARE_WAITQUEUE(wait, current);

        spin_lock_irq(&ctx->wqh.lock);
        *cnt = 0;
        res = -EAGAIN; 
        if (ctx->count > 0)
                res = 0;
        else if (!no_wait) {            //阻塞模式,则执行阻塞代码
                __add_wait_queue(&ctx->wqh, &wait);             //添加到等待队列
                for (;;) {
                        set_current_state(TASK_INTERRUPTIBLE);
                        if (ctx->count > 0) {
                                res = 0;
                                break;
                        }
                        if (signal_pending(current)) {
                                res = -ERESTARTSYS;
                                break;
                        }
                        spin_unlock_irq(&ctx->wqh.lock);
                        schedule();
                        spin_lock_irq(&ctx->wqh.lock);
                }
                __remove_wait_queue(&ctx->wqh, &wait);
                __set_current_state(TASK_RUNNING);
        }
        if (likely(res == 0)) {
                eventfd_ctx_do_read(ctx, cnt);      //读取ctx->count数据,怎么读取有flags控制。
                if (waitqueue_active(&ctx->wqh))
                        wake_up_locked_poll(&ctx->wqh, POLLOUT);    //跟write一样,唤醒等待的进程
        }
        spin_unlock_irq(&ctx->wqh.lock);

        return res;
}
static void eventfd_ctx_do_read(struct eventfd_ctx *ctx, __u64 *cnt)
{       
        *cnt = (ctx->flags & EFD_SEMAPHORE) ? 1 : ctx->count;   //根据设置的标志位,决定每次读一个还是全部读取
        ctx->count -= *cnt;
}

eventfd中的poll函数。
当前我发现poll函数在两个地方有执行。
fs/select.c文件中的do_select函数中执行,这个函数遍历文件描述符列表对应的文件的poll函数检查是否有事件发生。如果有事件发生则返回。
fs/eventpoll.c文件中ep_ctl在执行ep_insert时候执行,用来注册等待队列,当有事件发生,唤起等待队列时,可以执行fs/eventpoll.c中的一个函数,将文件描述符放到就绪列表中。

static unsigned int eventfd_poll(struct file *file, poll_table *wait)
{       
        struct eventfd_ctx *ctx = file->private_data;
        unsigned int events = 0;
        u64 count;

        poll_wait(file, &ctx->wqh, wait);           //在epoll中有用。在ep_insert时将poll_table挂在等待队列中

/*下面的代码是select时有用。select 遍历每个文件描述符,调用对应poll查询是否有事件发生*/
        count = READ_ONCE(ctx->count);

        if (count > 0) 
                events |= POLLIN;
        if (count == ULLONG_MAX)
                events |= POLLERR;
        if (ULLONG_MAX - 1 > count)
                events |= POLLOUT;

        return events;
}

eventfd 相当于进程间的一个通讯机制,能交互的数据时一个整数。更像是一个事件通知机制。所以virtio的vhost会用这个来做数据包通知。