分类
协议栈分析

TCP socket 创建

IPV4的TCP socket 创建和其他IPV4 socket 创建在调用inet_create之前流程都是一样的,直到在inet_create函数中,才会调用4层协议特有的init函数。
创建socket的系统调用始于SYSCALL_DEFINE3

SYSCALL_DEFINE3(socket, int, family, int, type, int, protocol)
{
        return __sys_socket(family, type, protocol); //work function family 3层协议类型。type套接字类型,protocol 四层协议类型。
}

其中__sys_socket函数的作用主要是创建socket,并给socket分配文件描述符

int __sys_socket(int family, int type, int protocol)
{
        int retval;
        struct socket *sock;
        int flags;

        /* Check the SOCK_* constants for consistency.  */
        BUILD_BUG_ON(SOCK_CLOEXEC != O_CLOEXEC);
        BUILD_BUG_ON((SOCK_MAX | SOCK_TYPE_MASK) != SOCK_TYPE_MASK);
        BUILD_BUG_ON(SOCK_CLOEXEC & SOCK_TYPE_MASK);
        BUILD_BUG_ON(SOCK_NONBLOCK & SOCK_TYPE_MASK);

        flags = type & ~SOCK_TYPE_MASK;  //type除了传递socket类型还可以传递socket flags
        if (flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK))
                return -EINVAL;
        type &= SOCK_TYPE_MASK;

        if (SOCK_NONBLOCK != O_NONBLOCK && (flags & SOCK_NONBLOCK))
                flags = (flags & ~SOCK_NONBLOCK) | O_NONBLOCK;

        retval = sock_create(family, type, protocol, &sock); //创建socket 主函数
        if (retval < 0)
                return retval;

        return sock_map_fd(sock, flags & (O_CLOEXEC | O_NONBLOCK)); //将socket映射成一个文件描述符。通过文件的读写操作来操作套接字。
}

sock_create函数稍微复杂 后续分解。先分解 sock_map_fd函数

struct file *sock_alloc_file(struct socket *sock, int flags, const char *dname)
{
        struct file *file;

        if (!dname)
                dname = sock->sk ? sock->sk->sk_prot_creator->name : "";

        file = alloc_file_pseudo(SOCK_INODE(sock), sock_mnt, dname,
                                O_RDWR | (flags & O_NONBLOCK),
                                &socket_file_ops); //创建文件结构,操作函数是socket_file_ops
        if (IS_ERR(file)) {
                sock_release(sock);
                return file;
        }

        sock->file = file;  //sock file 结构为该文件
        file->private_data = sock; //file的private_data指向sock。后续读写文件实质上操作 这个socket。具体查看socket_file_ops对应的函数。
        return file;
}
EXPORT_SYMBOL(sock_alloc_file);

static int sock_map_fd(struct socket *sock, int flags)
{
        struct file *newfile;
        int fd = get_unused_fd_flags(flags); // 获取空闲的文件描述符
        if (unlikely(fd < 0)) {
                sock_release(sock);
                return fd;
        }

        newfile = sock_alloc_file(sock, flags, NULL); //创建文件结构,上面的函数
        if (likely(!IS_ERR(newfile))) {
                fd_install(fd, newfile);  //将文件描述符和文件关联
                return fd;   //返回文件描述符
        }

        put_unused_fd(fd);
        return PTR_ERR(newfile);
}

sock_create函数最后调用__sock_create函数创建socket。__sock_create函数做了一些检查,创建 socket结构后,查找三层协议。最后调用三层协议的create函数,这个函数的过程简单,这里就不分析,对应IPV4 TCP套接字最后调用的是inet_create函数。
inet_create函数较长,前面部分主要是根据类型和协议查找四层协议的inet_protosw结构。下面列出该函数的关键代码

        sock->ops = answer->ops;            //type的操作函数
        answer_prot = answer->prot;         //proto结构体,创建sk使用
        answer_flags = answer->flags;       //四层协议的flags
        rcu_read_unlock();

        WARN_ON(!answer_prot->slab);

        err = -ENOBUFS;
        sk = sk_alloc(net, PF_INET, GFP_KERNEL, answer_prot, kern); //创建sk,这个函数很简单,就是创建sk,并将answer_prot赋值到sk的sk_prot变量中。
        if (!sk)
                goto out;
......
        if (sk->sk_prot->init) {  //判断四层协议是否有init函数
                err = sk->sk_prot->init(sk); //调用init函数,TCP中调用的是tcp_v4_init_sock函数 
                if (err) {
                        sk_common_release(sk);
                        goto out;
                }
        }

tcp_v4_init_sock函数只是对tcp_init_sock函数进行封装,并赋值icsk_af_ops为ipv4特有的操作。这个结构主要是tcp在ipv4协议中的一些特定操作(发包 socket option 创建ip头等 ),ipv6中也有一个类似的结构。
tcp_init_sock函数才是tcp socket 初始化的核心函数。代码如下

void tcp_init_sock(struct sock *sk)
{
        struct inet_connection_sock *icsk = inet_csk(sk); //初始化 icsk 套接字
        struct tcp_sock *tp = tcp_sk(sk);   //初始化tcp套接字

        tp->out_of_order_queue = RB_ROOT;
        sk->tcp_rtx_queue = RB_ROOT;
        tcp_init_xmit_timers(sk);    //初始化tcp套接字定时器,retran delay_ack keepalive等 
        INIT_LIST_HEAD(&tp->tsq_node);
        INIT_LIST_HEAD(&tp->tsorted_sent_queue);

        icsk->icsk_rto = TCP_TIMEOUT_INIT;
        tp->mdev_us = jiffies_to_usecs(TCP_TIMEOUT_INIT);
        minmax_reset(&tp->rtt_min, tcp_jiffies32, ~0U);

        /* So many TCP implementations out there (incorrectly) count the
         * initial SYN frame in their delayed-ACK and congestion control
         * algorithms that we must have the following bandaid to talk
         * efficiently to them.  -DaveM
         */
        tp->snd_cwnd = TCP_INIT_CWND; //tcp默认发送窗口 居然是10 好小 

        /* There's a bubble in the pipe until at least the first ACK. */
        tp->app_limited = ~0U;

        /* See draft-stevens-tcpca-spec-01 for discussion of the
         * initialization of these values.
         */
        tp->snd_ssthresh = TCP_INFINITE_SSTHRESH;
        tp->snd_cwnd_clamp = ~0;
        tp->mss_cache = TCP_MSS_DEFAULT;

        tp->reordering = sock_net(sk)->ipv4.sysctl_tcp_reordering;
        tcp_assign_congestion_control(sk);  //这个是拥塞算法的初始化函数

        tp->tsoffset = 0;
        tp->rack.reo_wnd_steps = 1;

        sk->sk_state = TCP_CLOSE;       //创建socket的初始状态 TCP_CLOSE状态

        sk->sk_write_space = sk_stream_write_space; //唤醒阻塞在socket上的写进程
        sock_set_flag(sk, SOCK_USE_WRITE_QUEUE);

        icsk->icsk_sync_mss = tcp_sync_mss;  //协商 mss 的函数 通过pmtu获取貌似

        sk->sk_sndbuf = sock_net(sk)->ipv4.sysctl_tcp_wmem[1]; //这个应该是发送buf
        sk->sk_rcvbuf = sock_net(sk)->ipv4.sysctl_tcp_rmem[1]; //这个应该是接受buf

        sk_sockets_allocated_inc(sk);  //貌似是做统计用的
        sk->sk_route_forced_caps = NETIF_F_GSO; //暂时不知道干嘛用的 ,大概跟GSO有关系
}
EXPORT_SYMBOL(tcp_init_sock); 

tcp_assign_congestion_control函数是拥塞控制算法的初始化函数,这个里面会选取默认的拥塞控制算法并初始化。代码如下

void tcp_assign_congestion_control(struct sock *sk)
{
        struct net *net = sock_net(sk);
        struct inet_connection_sock *icsk = inet_csk(sk);
        const struct tcp_congestion_ops *ca;

        rcu_read_lock();
        ca = rcu_dereference(net->ipv4.tcp_congestion_control);
        if (unlikely(!try_module_get(ca->owner)))
                ca = &tcp_reno;
        icsk->icsk_ca_ops = ca;   //直接赋值默认拥塞控制算法的结构
        rcu_read_unlock();

        memset(icsk->icsk_ca_priv, 0, sizeof(icsk->icsk_ca_priv));
        if (ca->flags & TCP_CONG_NEEDS_ECN)
                INET_ECN_xmit(sk);
        else
                INET_ECN_dontxmit(sk);
}

到这里,TCP socket 的创建就完事,接下来,有两种用法,作为客户端进行connect。 作为服务器端 bind 本地端口后进行listen。 connect很有意思的地方在于,刚发出去syn包,本地就认为这个链接是establish得了。这个会下回分解。

分类
协议栈分析

IP/TCP 协议栈解析 —- 初始化

tcp 是4层协议,向上通过socket与用户态程序交互,向下通过接口函数与3层协议通讯(ip,ipv6)。可能由于历史原因,tcp协议栈和ip协议栈在目录上做的不是很模块化,其实在ipv4目录下的tcp文件很大一部分代码都是和ipv6的tcp共享的。而且很奇怪的是,4层协议栈和3层协议栈之间完全可以做成模块结构,但是tcp和ip在初始化的时候代码完全杂糅在一起。在这一方面,ipv6的初始化反而更清晰。
IP初始函数
由于ip协议对应的tcp协议栈和ip协议初始化代码没有分离,因此,tcp协议的初始化函数在ip 初始化函数里面。函数是 net/ipv4/af_inet.c中的inet_init函数。

static int __init inet_init(void)
{
        struct inet_protosw *q;
        struct list_head *r;
        int rc = -EINVAL;

        sock_skb_cb_check_size(sizeof(struct inet_skb_parm));

        rc = proto_register(&tcp_prot, 1);//这个函数做一些初始化,创建memcache,并注册到socket的一个全局表中,后续看在主要的发包流程中不会用到这个全局表。看上去这个表主要是在proc文件中显示当前系统中有多少个socket协议
        if (rc)
                goto out;

        rc = proto_register(&udp_prot, 1);
        if (rc)
                goto out_unregister_tcp_proto;

        rc = proto_register(&raw_prot, 1);
        if (rc)
                goto out_unregister_udp_proto;

        rc = proto_register(&ping_prot, 1);
        if (rc)
                goto out_unregister_raw_proto;

        /*
         *      Tell SOCKET that we are alive...
         */

        (void)sock_register(&inet_family_ops);//注册ip协议族,socket协同调用的时候指定的domain。就是在选协议族。

#ifdef CONFIG_SYSCTL
        ip_static_sysctl_init();
#endif

        /*
         *      Add all the base protocols.
         */

        if (inet_add_protocol(&icmp_protocol, IPPROTO_ICMP) < 0)
                pr_crit("%s: Cannot add ICMP protocol\n", __func__);
        if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0)
                pr_crit("%s: Cannot add UDP protocol\n", __func__);
        if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0) //注册tcp收包结构,结构在下文贴出
                pr_crit("%s: Cannot add TCP protocol\n", __func__);
#ifdef CONFIG_IP_MULTICAST
        if (inet_add_protocol(&igmp_protocol, IPPROTO_IGMP) < 0)
                pr_crit("%s: Cannot add IGMP protocol\n", __func__);
#endif

        /* Register the socket-side information for inet_create. */
        for (r = &inetsw[0]; r < &inetsw[SOCK_MAX]; ++r)
                INIT_LIST_HEAD(r);

        for (q = inetsw_array; q < &inetsw_array[INETSW_ARRAY_LEN]; ++q)
                inet_register_protosw(q); //注册四层协议结构。用户态程序创建socket第二个和第三个参数 type 和protol,就是用来筛选这个结构的。 

tcp 接受ip层数据接口

static struct net_protocol tcp_protocol = {
        .early_demux    =       tcp_v4_early_demux,
        .early_demux_handler =  tcp_v4_early_demux, 
        .handler        =       tcp_v4_rcv,  //这个是正常流程的入口函数
        .err_handler    =       tcp_v4_err,  //这个是出错是入口函数
        .no_policy      =       1, 
        .netns_ok       =       1,
        .icmp_strict_tag_validation = 1,
};

tcp inetsw_array结构

        {
                .type =       SOCK_STREAM,    //流类型
                .protocol =   IPPROTO_TCP,    //协议类型
                .prot =       &tcp_prot,      // 协议的结构
                .ops =        &inet_stream_ops,// 流操作函数
                .flags =      INET_PROTOSW_PERMANENT |
                              INET_PROTOSW_ICSK,
        },

在linux中,对四层协议栈又做了一次分类,同类型的协议抽象了一层操作函数,然后在操作函数中在调用各个具体协议的操作函数。当前分类一共有以下几种

enum sock_type {
        SOCK_STREAM     = 1, //流类型,有链接
        SOCK_DGRAM      = 2, //数据包包类型无连接
        SOCK_RAW        = 3, //raw socket类型直接发包
        SOCK_RDM        = 4, //可靠连接类型,这个类型貌似就是RDM
        SOCK_SEQPACKET  = 5, //顺序包类型
        SOCK_DCCP       = 6, //数据包拥塞控制类型
        SOCK_PACKET     = 10, //直接从dev获取包的类型,linux特有。
};

tcp prot 结构,这个结构 太过复杂,现在只是搞懂了几个用的函数在哪里调用。剩下的需要慢慢搞懂。

struct proto tcp_prot = {
        .name                   = "TCP",
        .owner                  = THIS_MODULE,
        .close                  = tcp_close,
        .pre_connect            = tcp_v4_pre_connect,   //创建连接前调用
        .connect                = tcp_v4_connect,       //创建连接时调用,syn包
        .disconnect             = tcp_disconnect,       //断开连接时调用
        .accept                 = inet_csk_accept,    //接受连接时调用
        .ioctl                  = tcp_ioctl,         //ioctl 低啊用
        .init                   = tcp_v4_init_sock,  //创建socket时调用
        .destroy                = tcp_v4_destroy_sock,
        .shutdown               = tcp_shutdown,
        .setsockopt             = tcp_setsockopt,   //设置opt调用
        .getsockopt             = tcp_getsockopt,   //获取opt调用
        .keepalive              = tcp_set_keepalive,
        .recvmsg                = tcp_recvmsg,
        .sendmsg                = tcp_sendmsg,
        .sendpage               = tcp_sendpage,
        .backlog_rcv            = tcp_v4_do_rcv,
        .release_cb             = tcp_release_cb,
        .hash                   = inet_hash,
        .unhash                 = inet_unhash,
        .get_port               = inet_csk_get_port,
        .enter_memory_pressure  = tcp_enter_memory_pressure,
        .leave_memory_pressure  = tcp_leave_memory_pressure,
        .stream_memory_free     = tcp_stream_memory_free,
        .sockets_allocated      = &tcp_sockets_allocated,
        .orphan_count           = &tcp_orphan_count,
        .memory_allocated       = &tcp_memory_allocated,
        .memory_pressure        = &tcp_memory_pressure,
        .sysctl_mem             = sysctl_tcp_mem,
        .sysctl_wmem_offset     = offsetof(struct net, ipv4.sysctl_tcp_wmem),
        .sysctl_rmem_offset     = offsetof(struct net, ipv4.sysctl_tcp_rmem),
        .max_header             = MAX_TCP_HEADER,
        .obj_size               = sizeof(struct tcp_sock), //tcp sock,这个sock里面套了一堆结构,第一个结构是就是sock。注意内核在这做了三次抽象 sock , connection_sock最后是tcp_sock.通过这三次抽象减少重复代码。面向对象啊
        .slab_flags             = SLAB_TYPESAFE_BY_RCU,
        .twsk_prot              = &tcp_timewait_sock_ops,
        .rsk_prot               = &tcp_request_sock_ops,
        .h.hashinfo             = &tcp_hashinfo,
        .no_autobind            = true,
#ifdef CONFIG_COMPAT
        .compat_setsockopt      = compat_tcp_setsockopt,
        .compat_getsockopt      = compat_tcp_getsockopt,
#endif
        .diag_destroy           = tcp_abort,
};
EXPORT_SYMBOL(tcp_prot);

inet_add_protocol函数,很简单就是讲四层协议的收包结构注册到三层协议中,方便调用。使用cmpxchg
inet_register_protosw 函数,注册AF_INET协议栈的协议类型,这个函数也很简单,首先通过根据类型分类,然后在同类型的链表中查找是否有重名显现如果么有则注册。

proto_register 这个函数,跟进去看了一下,主要是做了一些memcache的创建。然后注册到一个全局链表中。它注册的那个链表后续貌似没有用到的地方看代码只有在proc文件显示的时候中用到。
到这里初始化就完成了。下一章将分析tcp socket创建

分类
ebpf

xdp ebpf程序执行流程

xdp 使用ebpf 做 包过滤,相对于dpdk将数据包直接送到用户态,用用户态当做快速数据处理平面,xdp是在驱动层创建了一个数据快速平面。在数据被网卡硬件dma到内存,分配skb之前,对数据包进行处理。由于完全不存在锁操作。且bypass了协议栈,非常适合用修改数据包并转发,数据探针,执行丢包。
xdp 程序执行的位置实在ixgbe_clean_rx_irq函数中。网卡刚把数据dma到内存时。

        while (likely(total_rx_packets < budget)) {
                union ixgbe_adv_rx_desc *rx_desc;
                struct ixgbe_rx_buffer *rx_buffer;
                struct sk_buff *skb;
                unsigned int size;

                /* return some buffers to hardware, one at a time is too slow */
                if (cleaned_count >= IXGBE_RX_BUFFER_WRITE) {
                        ixgbe_alloc_rx_buffers(rx_ring, cleaned_count);
                        cleaned_count = 0;
                }

                rx_desc = IXGBE_RX_DESC(rx_ring, rx_ring->next_to_clean);
                size = le16_to_cpu(rx_desc->wb.upper.length);
                if (!size)
                        break;

                /* This memory barrier is needed to keep us from reading
                 * any other fields out of the rx_desc until we know the
                 * descriptor has been written back
                 */
                dma_rmb();

                rx_buffer = ixgbe_get_rx_buffer(rx_ring, rx_desc, &skb, size);

                /* retrieve a buffer from the ring */
                if (!skb) {
                        xdp.data = page_address(rx_buffer->page) +
                                   rx_buffer->page_offset;
                        xdp.data_meta = xdp.data;
                        xdp.data_hard_start = xdp.data -
                                              ixgbe_rx_offset(rx_ring);
                        xdp.data_end = xdp.data + size;

                        skb = ixgbe_run_xdp(adapter, rx_ring, &xdp);            //执行run_xdp函数
                }

ixgbe_run_xdp 函数。

static struct sk_buff *ixgbe_run_xdp(struct ixgbe_adapter *adapter,
                                     struct ixgbe_ring *rx_ring,
                                     struct xdp_buff *xdp)
{
        int err, result = IXGBE_XDP_PASS;
        struct bpf_prog *xdp_prog;
        u32 act;

        rcu_read_lock();
        xdp_prog = READ_ONCE(rx_ring->xdp_prog);

        if (!xdp_prog)              //如果没有注册xdp程序,则直接跳出处理流程
                goto xdp_out;

        act = bpf_prog_run_xdp(xdp_prog, xdp);                  //执行xdp虚拟机。 xdp_prog 是 ebpf程序体, xdp是出入给ebpf程序的参数
        switch (act) {
        case XDP_PASS:              //xdp处理结果是将数据上送协议栈
                break;
        case XDP_TX:                //xdp处理结果是将数据通过同一个口转发出去
                result = ixgbe_xmit_xdp_ring(adapter, xdp);
                break;
        case XDP_REDIRECT:          //xdp处理结果是从另一个网口转发出去
                err = xdp_do_redirect(adapter->netdev, xdp, xdp_prog);
                if (!err)
                        result = IXGBE_XDP_TX;
                else    
                        result = IXGBE_XDP_CONSUMED;
                break;
        default:
                bpf_warn_invalid_xdp_action(act);
                /* fallthrough */
        case XDP_ABORTED:               //xdp程序异常丢掉数据包
                trace_xdp_exception(rx_ring->netdev, xdp_prog, act);
                /* fallthrough -- handle aborts by dropping packet */
        case XDP_DROP:                  //数据包被丢弃。
                result = IXGBE_XDP_CONSUMED;
                break;
        }
xdp_out:
        rcu_read_unlock();
        return ERR_PTR(-result);
}

设置xdp程序函数。在intel 82599网卡中使用的是 ixgbe_xdp函数。 ixgbe_xdp函数最后赋值给 net_device_ops结构体的ndo_bpf函数变量中。

static int ixgbe_xdp(struct net_device *dev, struct netdev_bpf *xdp)
{       
        struct ixgbe_adapter *adapter = netdev_priv(dev);

        switch (xdp->command) {
        case XDP_SETUP_PROG:
                return ixgbe_xdp_setup(dev, xdp->prog);         //设置xdp程序,把xdp程序设置到rx_ring的 xdp_prog变量中。
        case XDP_QUERY_PROG:
                xdp->prog_attached = !!(adapter->xdp_prog);
                xdp->prog_id = adapter->xdp_prog ?
                        adapter->xdp_prog->aux->id : 0;
                return 0;
        default:
                return -EINVAL;
        }
}

在 net/core/rtnetlink.c文件中的rtnl_setlink函数中会调用最后会调用 do_setlink函数。 rtnl_setlink 关联到 PF_UNSPEC netlink 的 RTM_SETLINK 中,用户态发送这类消息时,直接调用rtnl_setlink函数处理。
do_setlink函数 最后会调用dev_change_xdp_fd函数

        if (tb[IFLA_XDP]) {
                struct nlattr *xdp[IFLA_XDP_MAX + 1];
                u32 xdp_flags = 0;

                err = nla_parse_nested(xdp, IFLA_XDP_MAX, tb[IFLA_XDP],
                                       ifla_xdp_policy, NULL);
                if (err < 0)
                        goto errout;

                if (xdp[IFLA_XDP_ATTACHED] || xdp[IFLA_XDP_PROG_ID]) {
                        err = -EINVAL;
                        goto errout;
                }

                if (xdp[IFLA_XDP_FLAGS]) {
                        xdp_flags = nla_get_u32(xdp[IFLA_XDP_FLAGS]);
                        if (xdp_flags & ~XDP_FLAGS_MASK) {
                                err = -EINVAL;
                                goto errout;
                        }
                        if (hweight32(xdp_flags & XDP_FLAGS_MODES) > 1) {
                                err = -EINVAL;
                                goto errout;
                        }
                }

                if (xdp[IFLA_XDP_FD]) {
                        err = dev_change_xdp_fd(dev, extack,
                                                nla_get_s32(xdp[IFLA_XDP_FD]),
                                                xdp_flags);         //修改指定dev的 prog
                        if (err)
                                goto errout;
                        status |= DO_SETLINK_NOTIFY;
                }
        }

dev_change_xdp_fd函数做了一系列检查后最后调用 dev_xdp_install函数

int dev_change_xdp_fd(struct net_device *dev, struct netlink_ext_ack *extack,
                      int fd, u32 flags)
{       
        const struct net_device_ops *ops = dev->netdev_ops;
        struct bpf_prog *prog = NULL;
        bpf_op_t bpf_op, bpf_chk;
        int err;

        ASSERT_RTNL();

        bpf_op = bpf_chk = ops->ndo_bpf;                //获取网卡驱动ndo_bpf程序 ixgbe网卡的函数是 ixgbe_bpf
        if (!bpf_op && (flags & (XDP_FLAGS_DRV_MODE | XDP_FLAGS_HW_MODE)))
                return -EOPNOTSUPP;
        if (!bpf_op || (flags & XDP_FLAGS_SKB_MODE))
                bpf_op = generic_xdp_install;
        if (bpf_op == bpf_chk)
                bpf_chk = generic_xdp_install;

        if (fd >= 0) {
                if (bpf_chk && __dev_xdp_attached(dev, bpf_chk))
                        return -EEXIST;
                if ((flags & XDP_FLAGS_UPDATE_IF_NOEXIST) &&
                    __dev_xdp_attached(dev, bpf_op))
                        return -EBUSY;

                prog = bpf_prog_get_type_dev(fd, BPF_PROG_TYPE_XDP,
                                             bpf_op == ops->ndo_bpf);
                if (IS_ERR(prog))
                        return PTR_ERR(prog);

                if (!(flags & XDP_FLAGS_HW_MODE) &&
                    bpf_prog_is_dev_bound(prog->aux)) {
                        NL_SET_ERR_MSG(extack, "using device-bound program without HW_MODE flag is not supported");
                        bpf_prog_put(prog);
                        return -EINVAL;
                }
        }

        err = dev_xdp_install(dev, bpf_op, extack, flags, prog);        //将xdp ebpf程序安装到网卡的 rx_ring中
        if (err < 0 && prog)
                bpf_prog_put(prog);

        return err;
}

dev_xdp_install 函数代码如下

static int dev_xdp_install(struct net_device *dev, bpf_op_t bpf_op,
                           struct netlink_ext_ack *extack, u32 flags,
                           struct bpf_prog *prog)
{               
        struct netdev_bpf xdp;

        memset(&xdp, 0, sizeof(xdp));
        if (flags & XDP_FLAGS_HW_MODE)
                xdp.command = XDP_SETUP_PROG_HW;
        else    
                xdp.command = XDP_SETUP_PROG;
        xdp.extack = extack;
        xdp.flags = flags;
        xdp.prog = prog;

        return bpf_op(dev, &xdp);           //执行ndo_xdp函数, ixgbe驱动的函数为ixgbe_bpf
} 

至此,xdp 运行位置及用户程序将ebpf代码attach到网卡的流程都分析完毕。

分类
ebpf

ebpf maps 内核态代码分析

ebpf maps 内核态代码分析
创建maps BPF_MAP_CREATE

          map = find_and_alloc_map(attr);                   //根据map类型创建map对象
          if (IS_ERR(map))
                  return PTR_ERR(map);

          err = bpf_obj_name_cpy(map->name, attr->map_name);
          if (err)
                  goto free_map_nouncharge;

          atomic_set(&map->refcnt, 1);
          atomic_set(&map->usercnt, 1);

          err = security_bpf_map_alloc(map);
          if (err)
                  goto free_map_nouncharge;

          err = bpf_map_charge_memlock(map);
          if (err)
                  goto free_map_sec;

          err = bpf_map_alloc_id(map);              //将map存放在idr数据结构中
          if (err)
                  goto free_map;

          err = bpf_map_new_fd(map, f_flags);       //将map映射成fd文件返回给用户态的fd文件描述符
          if (err < 0) {
                  /* failed to allocate fd.
                   * bpf_map_put() is needed because the above
                   * bpf_map_alloc_id() has published the map
                   * to the userspace and the userspace may
                   * have refcnt-ed it through BPF_MAP_GET_FD_BY_ID.
                   */
                  bpf_map_put(map);
                  return err;
          }

          trace_bpf_map_create(map, err);
          return err;               //返回文件描述符

查找更新和删除,没什么好说的,代码写的很清楚。代码在kernel/bpf/syscall.c文件中,入口函数分别是map_lookup_elem map_update_elem map_delete_elem。
比较关键是,内核创建的map 对象怎么跟ebpf程序关联起来。这个关联在ebpf程序加载的时候进行的。如果代码时bpf_prog_load函数

        err = find_prog_type(type, prog);
        if (err < 0)
                goto free_prog;

        prog->aux->load_time = ktime_get_boot_ns();
        err = bpf_obj_name_cpy(prog->aux->name, attr->prog_name);
        if (err)
                goto free_prog;

        /* run eBPF verifier */
        err = bpf_check(&prog, attr);                       //这个函数对ebpf程序进行检查并将与fd关联的map对象替换到ebpf程序中
        if (err < 0)
                goto free_used_maps;

        /* eBPF program is ready to be JITed */
        if (!prog->bpf_func)
                prog = bpf_prog_select_runtime(prog, &err);
        if (err < 0)
                goto free_used_maps;

        err = bpf_prog_alloc_id(prog);
        if (err)
                goto free_used_maps;

        err = bpf_prog_new_fd(prog);

关键函数是replace_map_fd_with_map_ptr.

        env->strict_alignment = !!(attr->prog_flags & BPF_F_STRICT_ALIGNMENT);
        if (!IS_ENABLED(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS))
                env->strict_alignment = true;

        if (bpf_prog_is_dev_bound(env->prog->aux)) {
                ret = bpf_prog_offload_verifier_prep(env);
                if (ret)
                        goto err_unlock;
        }

        ret = replace_map_fd_with_map_ptr(env);                 //将文件描述符替换成map指针地址
        if (ret < 0)
                goto skip_full_check;

        env->explored_states = kcalloc(env->prog->len,
                                       sizeof(struct bpf_verifier_state_list *),
                                       GFP_USER);
        ret = -ENOMEM;
        if (!env->explored_states)
                goto skip_full_check;

        env->allow_ptr_leaks = capable(CAP_SYS_ADMIN);

        ret = check_cfg(env);

replace_map_fd_with_map_ptr函数中可以看到会获取文件描述符,并根据文件描述符获取文件,最后获取文件的private_data即 map的地址。

                        f = fdget(insn->imm);
                        map = __bpf_map_get(f);             //获取map对象地址
                        if (IS_ERR(map)) {
                                verbose(env, "fd %d is not pointing to valid bpf_map\n",
                                        insn->imm);
                                return PTR_ERR(map);
                        }

                        err = check_map_prog_compatibility(env, map, env->prog);
                        if (err) {
                                fdput(f);
                                return err;
                        }

                        /* store map pointer inside BPF_LD_IMM64 instruction */
                        insn[0].imm = (u32) (unsigned long) map;
                        insn[1].imm = ((u64) (unsigned long) map) >> 32;            //将地址赋值到insn中,这样ebpf虚拟机可以直接访问map。

                        /* check whether we recorded this map already */
                        for (j = 0; j < env->used_map_cnt; j++)
                                if (env->used_maps[j] == map) {
                                        fdput(f);
                                        goto next_insn;
                                }

                        if (env->used_map_cnt >= MAX_USED_MAPS) {
                                fdput(f);
                                return -E2BIG;
                        }

至此, ebpf map 在用户态和内核传递数据的方式基本上已经搞情况了。

分类
ebpf

ebpf map 用户态源码分析

最近 facebook 开源了一份 L4LB katran 基于 xdp做的。 里面大量使用了 ebpf maps 进行用户态和内核态进行数据交互。今天dig了一下代码大概了解一下原理。 使用的代码时 kernel/sample/bpf/中的代码。
ebpf 代码部分:
在ebpf代码中需要申明map

struct bpf_map_def SEC("maps") port_a = {           //SEC("maps") 这个是llvm生成代码的section标记,表示为这段代码单独生成一个maps code setion。 在load_bpf代码时,会根据这个标记找到这个section 然后在内核创建相应的内存。
        .type = BPF_MAP_TYPE_ARRAY,                 //map的类型  有很多当前为array类型
        .key_size = sizeof(u32),                    //key 的长度
        .value_size = sizeof(int),                  //value 的长度
        .max_entries = MAX_NR_PORTS,                //这个array的最大长度。
};

编译完成后 会生成一个 elf的.o文件,然后在进程上下文中解析elf文件获取maps信息,代码如下

        fd = open(path, O_RDONLY, 0);               //打开llvm 编译后的test_map_in_kernel.o文件
        if (fd < 0)
                return 1;

        elf = elf_begin(fd, ELF_C_READ, NULL);

        if (!elf)
                return 1;

        if (gelf_getehdr(elf, &ehdr) != &ehdr)      //获取所有section head信息
                return 1;

        /* clear all kprobes */
        i = system("echo \"\" > /sys/kernel/debug/tracing/kprobe_events");

        /* scan over all elf sections to get license and map info */
        for (i = 1; i < ehdr.e_shnum; i++) {

                if (get_sec(elf, i, &ehdr, &shname, &shdr, &data))
                        continue;

                if (0) /* helpful for llvm debugging */
                        printf("section %d:%s data %p size %zd link %d flags %d\n",
                               i, shname, data->d_buf, data->d_size,
                               shdr.sh_link, (int) shdr.sh_flags);

                if (strcmp(shname, "license") == 0) {
                        processed_sec[i] = true;
                        memcpy(license, data->d_buf, data->d_size);
                } else if (strcmp(shname, "version") == 0) {
                        processed_sec[i] = true;
                        if (data->d_size != sizeof(int)) {
                                printf("invalid size of version section %zd\n",
                                       data->d_size);
                                return 1;
                        }
                        memcpy(&kern_version, data->d_buf, sizeof(int));
                } else if (strcmp(shname, "maps") == 0) {           //如果section 头部名字是maps,则说明 test_map_in_kernel中有map
                        int j;

                        maps_shndx = i;
                        data_maps = data;
                        for (j = 0; j < MAX_MAPS; j++)
                                map_data[j].fd = -1;
                } else if (shdr.sh_type == SHT_SYMTAB) {
                        strtabidx = shdr.sh_link;
                        symbols = data;
                }
        }

分析 maps section 并在内核创建map 对象,对外表现为文件类型

        if (data_maps) {
                nr_maps = load_elf_maps_section(map_data, maps_shndx,
                                                elf, symbols, strtabidx);           //解析maps section 获取每个定义map的对象
                if (nr_maps < 0) {
                        printf("Error: Failed loading ELF maps (errno:%d):%s\n",
                               nr_maps, strerror(-nr_maps));
                        ret = 1;
                        goto done;
                }
                if (load_maps(map_data, nr_maps, fixup_map))                //加载map并在内核创建相应的对象。返回文件描述符
                        goto done;
                map_data_count = nr_maps;

                processed_sec[maps_shndx] = true;
        }

load_elf_maps_section 主要是讲maps 对象的定义读取出来,到load_maps中使用,关键代码如下

        for (i = 0; i < nr_maps; i++) {
                unsigned char *addr, *end;
                struct bpf_map_def *def;
                const char *map_name;
                size_t offset;

                map_name = elf_strptr(elf, strtabidx, sym[i].st_name);
                maps[i].name = strdup(map_name);
                if (!maps[i].name) {
                        printf("strdup(%s): %s(%d)\n", map_name,
                               strerror(errno), errno);
                        free(sym);
                        return -errno;
                }

                /* Symbol value is offset into ELF maps section data area */
                offset = sym[i].st_value;                               //获取变量的偏移位置
                def = (struct bpf_map_def *)(data_maps->d_buf + offset);
                maps[i].elf_offset = offset;
                memset(&maps[i].def, 0, sizeof(struct bpf_map_def));
                memcpy(&maps[i].def, def, map_sz_copy);             //拷贝变量的定义到maps[i].def中后面会使用

                /* Verify no newer features were requested */
                if (validate_zero) {
                        addr = (unsigned char*) def + map_sz_copy;
                        end  = (unsigned char*) def + map_sz_elf;
                        for (; addr < end; addr++) {
                                if (*addr != 0) {
                                        free(sym);
                                        return -EFBIG;
                                }
                        }
                }
        }

load_maps函数,根据上一个函数获取的信息将信息下发到内核并创建文件描述符,关键代码如下

                if (maps[i].def.type == BPF_MAP_TYPE_ARRAY_OF_MAPS ||
                    maps[i].def.type == BPF_MAP_TYPE_HASH_OF_MAPS) {
                        int inner_map_fd = map_fd[maps[i].def.inner_map_idx];

                        map_fd[i] = bpf_create_map_in_map_node(maps[i].def.type,        //元素的值是map的
                                                        maps[i].name,
                                                        maps[i].def.key_size,
                                                        inner_map_fd,
                                                        maps[i].def.max_entries,
                                                        maps[i].def.map_flags,
                                                        numa_node);
                } else {
                        map_fd[i] = bpf_create_map_node(maps[i].def.type,           //元素的值是普通类型的
                                                        maps[i].name,
                                                        maps[i].def.key_size,
                                                        maps[i].def.value_size,
                                                        maps[i].def.max_entries,
                                                        maps[i].def.map_flags,
                                                        numa_node);
                }

bpf_create_map_node函数就是将这些信息组织成attr 结构体 并调用 bpf系统调用,将信息下发到内核,并返回map的fd信息给用户态,在加载程序时需要用到。
接下来是将代码段中的 maps 与相应的fd 关联起来,内核中会根据fd,给代码中使用符号赋地址

        for (i = 1; i < ehdr.e_shnum; i++) {
                if (processed_sec[i])
                        continue;

                if (get_sec(elf, i, &ehdr, &shname, &shdr, &data))
                        continue;

                if (shdr.sh_type == SHT_REL) {
                        struct bpf_insn *insns;

                        /* locate prog sec that need map fixup (relocations) */
                        if (get_sec(elf, shdr.sh_info, &ehdr, &shname_prog,
                                    &shdr_prog, &data_prog))
                                continue;

                        if (shdr_prog.sh_type != SHT_PROGBITS ||
                            !(shdr_prog.sh_flags & SHF_EXECINSTR))
                                continue;

                        insns = (struct bpf_insn *) data_prog->d_buf;
                        processed_sec[i] = true; /* relo section */

                        if (parse_relo_and_apply(data, symbols, &shdr, insns,           //这个函数是将fd与insns关联起来。内核中根据fd将map地址赋值到程序中
                                                 map_data, nr_maps))
                                continue;
                }
        }

最后是通过加载EBPF程序到内核中

        for (i = 1; i < ehdr.e_shnum; i++) {

                if (processed_sec[i])
                        continue;

                if (get_sec(elf, i, &ehdr, &shname, &shdr, &data))
                        continue;

                if (memcmp(shname, "kprobe/", 7) == 0 ||
                    memcmp(shname, "kretprobe/", 10) == 0 ||
                    memcmp(shname, "tracepoint/", 11) == 0 ||
                    memcmp(shname, "raw_tracepoint/", 15) == 0 ||
                    memcmp(shname, "xdp", 3) == 0 ||
                    memcmp(shname, "perf_event", 10) == 0 ||
                    memcmp(shname, "socket", 6) == 0 ||
                    memcmp(shname, "cgroup/", 7) == 0 ||
                    memcmp(shname, "sockops", 7) == 0 ||
                    memcmp(shname, "sk_skb", 6) == 0 ||
                    memcmp(shname, "sk_msg", 6) == 0) {
                        ret = load_and_attach(shname, data->d_buf,
                                              data->d_size);                    //这个函数根据不同的ebpf类型讲代码加载到内核中。
                        if (ret != 0)
                                goto done;
                }
        }
分类
lua

C 内嵌lua入门

上周,撸了撸 openresty最佳实践。发现在nginx.conf文件里面可以直接写lua代码。然后在nginx的work线程里面执行lua 代码。
使用这种方式,极大的提升了程序的灵活性,且不需要使用者对C框架非常了解也可以做一些功能开发。
初步了解了一下方法例子如下:
C代码部分

#include <stdio.h>

#include <luajit-2.0/lua.h>        //包含lua开发的相关头文件
#include <luajit-2.0/lualib.h>
#include <luajit-2.0/lauxlib.h>

int luaadd(lua_State* L ,int x,int y)
{
    int sum;

    lua_getglobal(L,"add");         //获取lua脚本中的函数。在lua中函数也是全局变量

    lua_pushnumber(L,x);            //传入函数的第一个参数
    lua_pushnumber(L,y);            //传入函数的第二个参数
    lua_call(L,2,1);                //调用lua函数,2表示两个参数,1表示返回值
    sum =(int)lua_tonumber(L,-1);   //获取执行结果
    lua_pop(L,1);                   //清空函数调用栈
    return sum;
}
int main(int argc,char *argv[])
{
    lua_State* L;
    int sum;

    L = lua_open();     //打开lua虚拟机
    luaL_openlibs(L);   //加载lua库
    luaL_dofile(L,"add.lua");   //加载lua脚本文件
    sum=luaadd(L,10,15);        //调用lua中的函数。并获取返回值
    printf("The sum is %d\n",sum);  //打印函数执行结果
    lua_close(L);               //关闭lua
    return 0;
}

lua脚本

--add two numbers
function add(x,y)
    --将2个数相加并加上25
    return x+y+25
end

编译
gcc -c main.c
gcc -o main -lluajit-5.1
执行 ./main
The sum is 50

我们可以发现lua虚拟机完全嵌在主进程中执行,并没有创建新的进程或线程执行lua虚拟机。

分类
linux network

linux socket REUSEPORT UDP socket内核代码分析

REUSEPORT 和REUSEADDR是差不多的功能,都是一个端口可以多个socket进行监听。差别在于 REUSEPORT可以做负载,且不会引起惊群效应。
REUSEPORT设置方式和REUSEADDR设置方式一样都是通过setsockopt进行设置,最终在sk上设置sk_reuseport的值。
在绑定时,会调用一系列函数,这些函数在REUSEADDR中已经讲过。最终会调用到udp_lib_lport_inuse2函数

static int udp_lib_lport_inuse2(struct net *net, __u16 num,
                                struct udp_hslot *hslot2,
                                struct sock *sk)
{
        struct sock *sk2;
        kuid_t uid = sock_i_uid(sk);
        int res = 0;

        spin_lock(&hslot2->lock);
        udp_portaddr_for_each_entry(sk2, &hslot2->head) {
                if (net_eq(sock_net(sk2), net) &&
                    sk2 != sk &&
                    (udp_sk(sk2)->udp_port_hash == num) &&
                    (!sk2->sk_reuse || !sk->sk_reuse) &&
                    (!sk2->sk_bound_dev_if || !sk->sk_bound_dev_if ||
                     sk2->sk_bound_dev_if == sk->sk_bound_dev_if) &&
                    inet_rcv_saddr_equal(sk, sk2, true)) {          //首先检查是否有冲突,如果有冲突走if中的逻辑
                        if (sk2->sk_reuseport && sk->sk_reuseport &&
                            !rcu_access_pointer(sk->sk_reuseport_cb) &&     //判断两个socket 是否都开启了sk_reuseport功能,并且最新的socket没有初始化sk_reuseport_cb。则表示端口可用返回
                            uid_eq(uid, sock_i_uid(sk2))) {
                                res = 0;
                        } else {
                                res = 1;
                        }
                        break;
                }
        }
        spin_unlock(&hslot2->lock);
        return res;
}

返回到 udp_lib_get_port函数中。会执行以下代码

found:
        inet_sk(sk)->inet_num = snum;
        udp_sk(sk)->udp_port_hash = snum;
        udp_sk(sk)->udp_portaddr_hash ^= snum;
        if (sk_unhashed(sk)) {
                if (sk->sk_reuseport &&
                    udp_reuseport_add_sock(sk, hslot)) {            //将sk按照resuseport处理以下
                        inet_sk(sk)->inet_num = 0;
                        udp_sk(sk)->udp_port_hash = 0;
                        udp_sk(sk)->udp_portaddr_hash ^= snum;
                        goto fail_unlock;
                }

                sk_add_node_rcu(sk, &hslot->head);
                hslot->count++;
                sock_prot_inuse_add(sock_net(sk), sk->sk_prot, 1);

                hslot2 = udp_hashslot2(udptable, udp_sk(sk)->udp_portaddr_hash);
                spin_lock(&hslot2->lock);
                if (IS_ENABLED(CONFIG_IPV6) && sk->sk_reuseport &&
                    sk->sk_family == AF_INET6)
                        hlist_add_tail_rcu(&udp_sk(sk)->udp_portaddr_node,
                                           &hslot2->head);
                else
                        hlist_add_head_rcu(&udp_sk(sk)->udp_portaddr_node,
                                           &hslot2->head);          //添加sk到hash表中
                hslot2->count++;
                spin_unlock(&hslot2->lock);
        }
        sock_set_flag(sk, SOCK_RCU_FREE);
        error = 0;

udp_reuseport_add_sock 函数是是新加函数,这个函数是REUSEPORT的关键。代码分析。

static int udp_reuseport_add_sock(struct sock *sk, struct udp_hslot *hslot)
{
        struct net *net = sock_net(sk);
        kuid_t uid = sock_i_uid(sk);
        struct sock *sk2;

        sk_for_each(sk2, &hslot->head) {
                if (net_eq(sock_net(sk2), net) &&
                    sk2 != sk &&
                    sk2->sk_family == sk->sk_family &&
                    ipv6_only_sock(sk2) == ipv6_only_sock(sk) &&
                    (udp_sk(sk2)->udp_port_hash == udp_sk(sk)->udp_port_hash) &&
                    (sk2->sk_bound_dev_if == sk->sk_bound_dev_if) &&
                    sk2->sk_reuseport && uid_eq(uid, sock_i_uid(sk2)) &&        //同一用户才能共享。
                    inet_rcv_saddr_equal(sk, sk2, false)) {
                        return reuseport_add_sock(sk, sk2); //端口已经被使用多次
                }
        }

        return reuseport_alloc(sk);         //这个端口第一次被使用
}

reuseport_alloc函数如下

int reuseport_alloc(struct sock *sk)
{
        struct sock_reuseport *reuse;

        /* bh lock used since this function call may precede hlist lock in
         * soft irq of receive path or setsockopt from process context
         */
        spin_lock_bh(&reuseport_lock);

        /* Allocation attempts can occur concurrently via the setsockopt path
         * and the bind/hash path.  Nothing to do when we lose the race.
         */
        if (rcu_dereference_protected(sk->sk_reuseport_cb,
                                      lockdep_is_held(&reuseport_lock)))
                goto out;

        reuse = __reuseport_alloc(INIT_SOCKS);
        if (!reuse) {
                spin_unlock_bh(&reuseport_lock);
                return -ENOMEM;
        }

        reuse->socks[0] = sk;
        reuse->num_socks = 1;
        rcu_assign_pointer(sk->sk_reuseport_cb, reuse);     //创建并初始化reuse结构体

out:
        spin_unlock_bh(&reuseport_lock);

        return 0;
}

第一个开启reuseport的socket 初始化流程使用的是上面的流程,后续使用reuseport_add_sock,代码如下

int reuseport_add_sock(struct sock *sk, struct sock *sk2)
{       
        struct sock_reuseport *old_reuse, *reuse;

        if (!rcu_access_pointer(sk2->sk_reuseport_cb)) {
                int err = reuseport_alloc(sk2);

                if (err)
                        return err;
        }       //如果sk2没有reuse结构体,分配一个新的

        spin_lock_bh(&reuseport_lock);
        reuse = rcu_dereference_protected(sk2->sk_reuseport_cb,
                                          lockdep_is_held(&reuseport_lock));
        old_reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
                                             lockdep_is_held(&reuseport_lock));
        if (old_reuse && old_reuse->num_socks != 1) {       //如果sk已经有resue结构体,并且已经和其他sk组合,则失败。
                spin_unlock_bh(&reuseport_lock);
                return -EBUSY;
        }

        if (reuse->num_socks == reuse->max_socks) {     //如果超过最多组合数,则扩整最多组合数。
                reuse = reuseport_grow(reuse);
                if (!reuse) {
                        spin_unlock_bh(&reuseport_lock);
                        return -ENOMEM;
                }
        }

        reuse->socks[reuse->num_socks] = sk;        //将新的sk加入reuse组合里面
        /* paired with smp_rmb() in reuseport_select_sock() */
        smp_wmb();
        reuse->num_socks++;
        rcu_assign_pointer(sk->sk_reuseport_cb, reuse);

        spin_unlock_bh(&reuseport_lock);

        if (old_reuse)
                call_rcu(&old_reuse->rcu, reuseport_free_rcu);      //释放就得reuse。如果需要释放的话。
        return 0;
}

这个函数很简单,就是将所有reuseport的sk放在一个数组里面。等接收到数据包时,可以根据数组来里面的sk数量通过一定的算法来负载均衡。(没有最大限制,初始化最大存放128个,如果超过则以2的指数增长,直到不能分配更大的内存为止。)
通过目的地址和目的端口找到sk以后,如果发现sk有reuse_port标志,则会调用reuseport_select_sock,使用原地址和源端口,目的地址 目的端口做hash。选择组合里面的一个sk。代码如下

                                hash = udp_ehashfn(net, daddr, hnum,
                                                   saddr, sport);
                                result = reuseport_select_sock(sk, hash, skb,
                                                        sizeof(struct udphdr));
struct sock *reuseport_select_sock(struct sock *sk,
                                   u32 hash,
                                   struct sk_buff *skb,
                                   int hdr_len)
{
        struct sock_reuseport *reuse;
        struct bpf_prog *prog; 
        struct sock *sk2 = NULL;
        u16 socks;

        rcu_read_lock();
        reuse = rcu_dereference(sk->sk_reuseport_cb);

        /* if memory allocation failed or add call is not yet complete */
        if (!reuse)
                goto out;

        prog = rcu_dereference(reuse->prog);
        socks = READ_ONCE(reuse->num_socks);
        if (likely(socks)) {
                /* paired with smp_wmb() in reuseport_add_sock() */
                smp_rmb();

                if (prog && skb)
                        sk2 = run_bpf(reuse, socks, prog, skb, hdr_len);        //bpf简直无处不在啊,现在内核里面各处都可以执行bpf,以后可以在点位或者kprobe给 内核下各种各样的规则啊。

                /* no bpf or invalid bpf result: fall back to hash usage */
                if (!sk2)
                        sk2 = reuse->socks[reciprocal_scale(hash, socks)];      //根据hash选择合适的sk返回
        }

out:
        rcu_read_unlock();
        return sk2;
}

然后,返回之后,会唤醒选定的sk对应的进程,接受数据包处理。

reuseport优势
如果在没有reuseport之前,我们使用一个进程等待网络事件,然后通过IPC通知其他进程处理业务,这中间涉及到IPC,并且可能引起进程切换,降低性能。 又有REUSEPORT后,可以使用多个进程同时在一个端口等待,根据4元组,将数据hash到不同的进程处理,不需要IPC。可以提升性能。

分类
linux file system linux network

linux socket REUSEADDR 内核代码分析

第一次接触REUSEADDR是在一年前,那个时候写了一个网络服务程序。写了一个监控脚本监控网络服务,当服务挂了,立马拉起来。但是经常会遇到拉不起来的情况,说端口已经被占用。但是那个端口自有我自己在用。怎么会被占用呢。后来查到说进程挂了,但是内核中连接还在。所以不能立马用。加上REUSEADDR就可以解决问题。今天挖掘一下内核代码。分析一下REUSEADDR实现。
创建socket

int __sys_socket(int family, int type, int protocol)
{
        int retval;
        struct socket *sock;
        int flags;

        /* Check the SOCK_* constants for consistency.  */
        BUILD_BUG_ON(SOCK_CLOEXEC != O_CLOEXEC);
        BUILD_BUG_ON((SOCK_MAX | SOCK_TYPE_MASK) != SOCK_TYPE_MASK);
        BUILD_BUG_ON(SOCK_CLOEXEC & SOCK_TYPE_MASK);
        BUILD_BUG_ON(SOCK_NONBLOCK & SOCK_TYPE_MASK);

        flags = type & ~SOCK_TYPE_MASK;
        if (flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK))
                return -EINVAL;
        type &= SOCK_TYPE_MASK;

        if (SOCK_NONBLOCK != O_NONBLOCK && (flags & SOCK_NONBLOCK))
                flags = (flags & ~SOCK_NONBLOCK) | O_NONBLOCK;

        retval = sock_create(family, type, protocol, &sock);            //根据用户传入的参数选择对应的协议创建对应的socket
        if (retval < 0)
                return retval;

        return sock_map_fd(sock, flags & (O_CLOEXEC | O_NONBLOCK));     //将socket和文件描述符绑定。
}

SYSCALL_DEFINE3(socket, int, family, int, type, int, protocol)
{
        return __sys_socket(family, type, protocol);
}

在linux中一切皆文件,sock_map_fd函数将sock和文件关联起来,返回给用户一个文件描述符。

static int sock_map_fd(struct socket *sock, int flags)
{
        struct file *newfile;
        int fd = get_unused_fd_flags(flags);
        if (unlikely(fd < 0)) {
                sock_release(sock); 
                return fd;
        }

        newfile = sock_alloc_file(sock, flags, NULL);           //创建文件,
        if (likely(!IS_ERR(newfile))) {
                fd_install(fd, newfile);            //将文件和文件描述符关联
                return fd;                          //返回文件描述符
        }

        put_unused_fd(fd);
        return PTR_ERR(newfile);
}

sock_alloc_file是创建sock匿名文件系统的文件。代码如下

struct file *sock_alloc_file(struct socket *sock, int flags, const char *dname)
{
        struct qstr name = { .name = "" };
        struct path path;
        struct file *file;

        if (dname) {
                name.name = dname; 
                name.len = strlen(name.name);
        } else if (sock->sk) {
                name.name = sock->sk->sk_prot_creator->name;
                name.len = strlen(name.name);
        }       
        path.dentry = d_alloc_pseudo(sock_mnt->mnt_sb, &name);
        if (unlikely(!path.dentry)) {
                sock_release(sock);
                return ERR_PTR(-ENOMEM);
        }       
        path.mnt = mntget(sock_mnt);

        d_instantiate(path.dentry, SOCK_INODE(sock));

        file = alloc_file(&path, FMODE_READ | FMODE_WRITE,
                  &socket_file_ops);            //创建文件,文件的读写函数是socket_file_ops指定。
        if (IS_ERR(file)) {
                /* drop dentry, keep inode for a bit */
                ihold(d_inode(path.dentry));
                path_put(&path);
                /* ... and now kill it properly */
                sock_release(sock);
                return file;
        }

        sock->file = file;
        file->f_flags = O_RDWR | (flags & O_NONBLOCK);
        file->private_data = sock;
        return file;
}       

创建问socket 后使用setsockopt设置REUSEADDR。代码如下

static int __sys_setsockopt(int fd, int level, int optname,
                            char __user *optval, int optlen)
{       
        int err, fput_needed;
        struct socket *sock;

        if (optlen < 0)
                return -EINVAL;

        sock = sockfd_lookup_light(fd, &err, &fput_needed);
        if (sock != NULL) {
                err = security_socket_setsockopt(sock, level, optname);
                if (err)
                        goto out_put;

                if (level == SOL_SOCKET)            //第二个参数时这个 调用sock_setsockopt函数
                        err =
                            sock_setsockopt(sock, level, optname, optval,
                                            optlen);            //调用该函数设置 REUSEADDR。
                else
                        err =
                            sock->ops->setsockopt(sock, level, optname, optval,
                                                  optlen);
out_put:
                fput_light(sock->file, fput_needed);
        }
        return err;
}               

SYSCALL_DEFINE5(setsockopt, int, fd, int, level, int, optname,
                char __user *, optval, int, optlen)
{               
        return __sys_setsockopt(fd, level, optname, optval, optlen);
}  

sock_setsockopt函数非常长,因为包含很多参数,所以函数很长,但是逻辑简单,关键代码如下

        case SO_REUSEADDR:
                sk->sk_reuse = (valbool ? SK_CAN_REUSE : SK_NO_REUSE);      //如果使用REUSEADDR 则sk->sk_reuse置位
                break;

最后看bind 系统调用

int __sys_bind(int fd, struct sockaddr __user *umyaddr, int addrlen)
{
        struct socket *sock;    
        struct sockaddr_storage address;
        int err, fput_needed;

        sock = sockfd_lookup_light(fd, &err, &fput_needed);
        if (sock) {
                err = move_addr_to_kernel(umyaddr, addrlen, &address);
                if (err >= 0) {
                        err = security_socket_bind(sock,
                                                   (struct sockaddr *)&address,
                                                   addrlen);
                        if (!err)
                                err = sock->ops->bind(sock,
                                                      (struct sockaddr *)
                                                      &address, addrlen);       //最后调用4层协议的bind函数,接下来以udp为例分析
                }
                fput_light(sock->file, fput_needed);
        }
        return err;
}

SYSCALL_DEFINE3(bind, int, fd, struct sockaddr __user *, umyaddr, int, addrlen)
{
        return __sys_bind(fd, umyaddr, addrlen);
}

以 IPV4 udp 为例,可以得知 sock->ops->bind最终调用的函数是inet_bind函数,函数在net/ipv4/af_inet.c文件中。inet_bind 经过一些安全检查调用 __inet_bind函数, __inet_bind函数中会调用协议对应的get_port函数,判断绑定的端口是否能够被使用。

        if (snum || !(inet->bind_address_no_port ||
                      force_bind_address_no_port)) {
                if (sk->sk_prot->get_port(sk, snum)) {          //如果返回值非零,表示地址已经被使用
                        inet->inet_saddr = inet->inet_rcv_saddr = 0;
                        err = -EADDRINUSE;
                        goto out_release_sock;
                }
                err = BPF_CGROUP_RUN_PROG_INET4_POST_BIND(sk);
                if (err) {
                        inet->inet_saddr = inet->inet_rcv_saddr = 0;
                        goto out_release_sock;
                }
        }

以ipv4 udp为例子,get_port调用的是udp_v4_get_port 文件位置是net/ipv4/udp.c
udp_v4_get_port函数最终调用udp_lib_get_port函数判断地址是否被使用。udp_lib_get_port调用udp_lib_lport_inuse2判断地址是否被使用。
udp_lib_lport_inuse2函数

static int udp_lib_lport_inuse2(struct net *net, __u16 num, 
                                struct udp_hslot *hslot2,
                                struct sock *sk) 
{
        struct sock *sk2;
        kuid_t uid = sock_i_uid(sk);
        int res = 0; 

        spin_lock(&hslot2->lock);
        udp_portaddr_for_each_entry(sk2, &hslot2->head) {
                if (net_eq(sock_net(sk2), net) &&
                    sk2 != sk &&
                    (udp_sk(sk2)->udp_port_hash == num) &&
                    (!sk2->sk_reuse || !sk->sk_reuse) &&                //如果都设置了reuse,则跳过sk2 继续查找下一个。
                    (!sk2->sk_bound_dev_if || !sk->sk_bound_dev_if ||
                     sk2->sk_bound_dev_if == sk->sk_bound_dev_if) &&
                    inet_rcv_saddr_equal(sk, sk2, true)) {
                        if (sk2->sk_reuseport && sk->sk_reuseport &&
                            !rcu_access_pointer(sk->sk_reuseport_cb) &&
                            uid_eq(uid, sock_i_uid(sk2))) {
                                res = 0; 
                        } else {
                                res = 1; 
                        }    
                        break;
                }    
        }    
        spin_unlock(&hslot2->lock);
        return res; 
}

由上面的代码可以看出,在udp模式下,允许两个socket共享一个地址。而不报错。

进一步分析代码,我们可以发现,只是使用REUSEADDR,虽然在同一个地址上创建多个socket,但是只有最后一个socket是收包的,其他socket不收包,做不到多个socket 负载均衡的功能,要想使用负载均衡的功能需要使用REUSEPORT参数。在上面代码里面已经隐隐约约能看到REUSEPORT参数的代码。我们下街分析。

分类
linux file system

linux epoll 机制分析–如何就绪

在上一节中,我们讨论了如何使用epoll。这一节,我们将讨论,epoll是如何由不就绪变成就绪状态。
在上一节中,我们已经知道如果使用epoll监控某个文件状态改变,需要调用epoll_ctl系统调用将文件加入监控集合中。分析代码发现,再加入时是使用ep_insert函数完成一些初始操作。因此,状态改变的方式一定跟ep_insert的操作有关系。
首先查看重要的数据结构 struct epitem.结构如下

struct epitem {
        union {
                /* RB tree node links this structure to the eventpoll RB tree */
                struct rb_node rbn;
                /* Used to free the struct epitem */
                struct rcu_head rcu;
        };          //还没有发生事件时,红黑树。

        /* List header used to link this structure to the eventpoll ready list */
        struct list_head rdllink;       //如果就绪,放入epoll就序列表

        /*
         * Works together "struct eventpoll"->ovflist in keeping the
         * single linked chain of items.
         */
        struct epitem *next;

        /* The file descriptor information this item refers to */
        struct epoll_filefd ffd;        //存放等待事件发生的文件

        /* Number of active wait queue attached to poll operations */
        int nwait;      

        /* List containing poll wait queues */
        struct list_head pwqlist;       //等待列表

        /* The "container" of this item */
        struct eventpoll *ep;           //表明属于哪个epoll句柄

        /* List header used to link this item to the "struct file" items list */
        struct list_head fllink;

        /* wakeup_source used when EPOLLWAKEUP is set */
        struct wakeup_source __rcu *ws;

        /* The structure that describe the interested events and the source fd */
        struct epoll_event event;           //响应的事件
};

这个结构体,就是被监控文件在epoll中的一个实例。每次增加监控,多增加一个这样的结构。
分析ep_insert中的三处关关键代码。其他代码都是错误检查初始化等就不分析了。
第一处关键代码

        INIT_LIST_HEAD(&epi->rdllink);
        INIT_LIST_HEAD(&epi->fllink);
        INIT_LIST_HEAD(&epi->pwqlist);
        epi->ep = ep;
        ep_set_ffd(&epi->ffd, tfile, fd);
        epi->event = *event;
        epi->nwait = 0;
        epi->next = EP_UNACTIVE_PTR;
        if (epi->event.events & EPOLLWAKEUP) {
                error = ep_create_wakeup_source(epi);
                if (error)
                        goto error_create_wakeup_source;
        } else {
                RCU_INIT_POINTER(epi->ws, NULL);
        }           //以上都是初始化epi结构体

        /* Initialize the poll table using the queue callback */
        epq.epi = epi;
        init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);       //初始化回调函数,这个回调函数会在目标文件的poll执行时回调。目的是把epi注册到目标文件的事件通知链中。

init_poll_funcptr 函数代码如下

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
        pt->_qproc = qproc;
        pt->_key   = ~(__poll_t)0; /* all events enabled */
}

ep_table_queue_proc 函数代码如下

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                                 poll_table *pt)
{
        struct epitem *epi = ep_item_from_epqueue(pt);
        struct eppoll_entry *pwq;       //等待队列实体

        if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
                init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);        //注册事件通知时需要执行的函数
                pwq->whead = whead;
                pwq->base = epi;
                if (epi->event.events & EPOLLEXCLUSIVE)
                        add_wait_queue_exclusive(whead, &pwq->wait);
                else
                        add_wait_queue(whead, &pwq->wait);  //添加到事件列表中。
                list_add_tail(&pwq->llink, &epi->pwqlist);
                epi->nwait++;
        } else {
                /* We have to signal that an error occurred */
                epi->nwait = -1;
        }
}

ep_poll_callback 函数是epoll的核心函数,当监控句柄有事件发生时,这个函数被执行,判断是否为关心事件,如果是关心事件,则将epi结构体放入ep就绪列表中。最后我们回分析ep_poll_callback函数。
epoll_insert第二处关键代码

        revents = ep_item_poll(epi, &epq.pt, 1);        //这处代码是注册事件

        /*
         * We have to check if something went wrong during the poll wait queue
         * install process. Namely an allocation for a wait queue failed due
         * high memory pressure.
         */
        error = -ENOMEM;
        if (epi->nwait < 0)
                goto error_unregister;

        /* Add the current item to the list of active epoll hook for this file */
        spin_lock(&tfile->f_lock);
        list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
        spin_unlock(&tfile->f_lock);

ep_item_poll函数 代码如下:

static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
                                 int depth)
{       
        struct eventpoll *ep;
        bool locked;

        pt->_key = epi->event.events;
        if (!is_file_epoll(epi->ffd.file))          //如果被监控文件也是epoll则走下面逻辑。否则执行被监控文件的poll函数。
                return epi->ffd.file->f_op->poll(epi->ffd.file, pt) &
                       epi->event.events;

        ep = epi->ffd.file->private_data;
        poll_wait(epi->ffd.file, &ep->poll_wait, pt);
        locked = pt && (pt->_qproc == ep_ptable_queue_proc);

        return ep_scan_ready_list(epi->ffd.file->private_data,
                                  ep_read_events_proc, &depth, depth,
                                  locked) & epi->event.events;
} 

到这里我们需要看被监控文件的poll函数。这里使用eventfd模块的poll函数作为例子分析。

static __poll_t eventfd_poll(struct file *file, poll_table *wait)
{
        struct eventfd_ctx *ctx = file->private_data;
        __poll_t events = 0;
        u64 count;

        poll_wait(file, &ctx->wqh, wait);       //select 和poll这个函数都返回空,epoll会执行这个函数。
        count = READ_ONCE(ctx->count);

        if (count > 0)
                events |= EPOLLIN;
        if (count == ULLONG_MAX)
                events |= EPOLLERR;
        if (ULLONG_MAX - 1 > count)
                events |= EPOLLOUT;

        return events;
}

poll_wait函数代码。

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) 
{
        if (p && p->_qproc && wait_address)
                p->_qproc(filp, wait_address, p); 
}

_qproc这里就是ep_ptable_queue_proc 函数。这样就将 epoll的epi挂在到目标文件的事件通知链上了。如果事件发生,则会执行ep_poll_callback函数代码。接下来查看ep_poll_callback代码。如果对事件发生时会执行ep_poll_callback代码有疑问的可以看一下前面写的eventfd的分析。
ep_poll_callback 执行了一系列检查以后,将文件放入就绪列表中。至此,目标文件状态改变,epoll也能监控到。 ep_poll_callback中还加入唤醒ep_wait的操作,让ep_wait执行检查,ep_wait在上一节中介绍。
到这里,我们已经了解epoll的主要执行流程,其他细节不同的内核版本可能不一样,可以通过看代码详细了解。

分类
linux file system

linux epoll 机制分析–使用流程

这篇本来应该在poll机制分析之后写的,但是由于中间跟朋友一起在学习trace代码,耽搁了。
闲话少说。我们先看看poll机制有哪些弊端。
1.每次调用poll都要将全部文件描述符传到内核中,这是性能弊端。
2.在内核中poll是一个轮询方式。对传入的所有文件描述符进行轮询,查看是否有事件发生。如果都没有事件发生,睡眠一会再来次。这存在两个问题。第一cpu并没有空闲,一直在忙查。第二个,事件处理不及时,必须等全部轮询一遍以后才会返回。
基于以上弊端,epoll做了以下改进。
1.句柄集合下发与等待事件分开。先下发,然后在等待。等待可以多次。
2.利用系统的等待队列唤醒机制。当有事件发生时。执行特定函数将相关文件句柄放入就绪列表中。

总结:在我看来 poll和epoll就像没有中断和有中断的外设与系统通讯一样。之前需要cpu忙查现在只需要等待通知即可。
直接代码分析。首先上关键数据结构 eventpoll,这个结构贯穿始终。主要是用来保存,等待的fd集合以及就绪fd集合。

struct eventpoll {
        /* Protect the access to this structure */
        spinlock_t lock;

        /*
         * This mutex is used to ensure that files are not removed
         * while epoll is using them. This is held during the event
         * collection loop, the file cleanup path, the epoll file exit
         * code and the ctl operations.
         */
        struct mutex mtx;

        /* Wait queue used by sys_epoll_wait() */
        wait_queue_head_t wq;

        /* Wait queue used by file->poll() */
        wait_queue_head_t poll_wait;

        /* List of ready file descriptors */
        struct list_head rdllist;           //已有事件发生的fd列表

        /* RB tree root used to store monitored fd structs */
        struct rb_root_cached rbr;              //所有需要等待的fd红黑树。

        /*                       
         * This is a single linked list that chains all the "struct epitem" that
         * happened while transferring ready events to userspace w/out
         * holding ->lock.
         */
        struct epitem *ovflist;

        /* wakeup_source used when ep_scan_ready_list is running */
        struct wakeup_source *ws;

        /* The user that created the eventpoll descriptor */
        struct user_struct *user;

        struct file *file;

        /* used to optimize loop detection check */
        int visited;
        struct list_head visited_list_link;

#ifdef CONFIG_NET_RX_BUSY_POLL
        /* used to track busy poll napi_id */
        unsigned int napi_id;
#endif
};

使用epoll 第一步调用的系统调用时 epoll_create,代码如下

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
        int error, fd;
        struct eventpoll *ep = NULL;
        struct file *file;

        /* Check the EPOLL_* constant for consistency.  */
        BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

        if (flags & ~EPOLL_CLOEXEC)
                return -EINVAL;
        /*
         * Create the internal data structure ("struct eventpoll").
         */
        error = ep_alloc(&ep);  //分配eventpoll数据结构。
        if (error < 0)
                return error;
        /*
         * Creates all the items needed to setup an eventpoll file. That is,
         * a file structure and a free file descriptor.
         */
        fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
        if (fd < 0) {
                error = fd;
                goto out_free_ep;
        }
        file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                                 O_RDWR | (flags & O_CLOEXEC)); //创建eventpoll匿名文件,私有数据时eventpoll结构
        if (IS_ERR(file)) {
                error = PTR_ERR(file);
                goto out_free_fd;
        }
        ep->file = file;
        fd_install(fd, file);
        return fd;      //返回文件句柄。

out_free_fd:
        put_unused_fd(fd);
out_free_ep:
        ep_free(ep);
        return error;
}

注意 eventpoll_fops 数据。这个数据的值如下

static const struct file_operations eventpoll_fops = {
#ifdef CONFIG_PROC_FS
        .show_fdinfo    = ep_show_fdinfo,
#endif
        .release        = ep_eventpoll_release,
        .poll           = ep_eventpoll_poll,
        .llseek         = noop_llseek,
};

由于该文件不是用来读写的 所以没有读写函数,有一个release函数在关闭句柄时释放资源。有一个poll函数。由此可猜测epoll可以进行嵌套。查了一下ep_eventpoll_poll函数,的确是可以嵌套。而且嵌套时,只要下层epoll句柄中的文件句柄集合有就绪文件句柄。上层epoll会把下层epoll文件句柄放入就绪列表中。
epoll文件句柄创建好了。接下来会使用epoll_ctl系统调用向句柄中添加,等待事件的文件句柄。

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
                struct epoll_event __user *, event)
{
        int error;
        int full_check = 0;
        struct fd f, tf;
        struct eventpoll *ep;
        struct epitem *epi;
        struct epoll_event epds;
        struct eventpoll *tep = NULL;

        error = -EFAULT;
        if (ep_op_has_event(op) &&          //检查操作是否为删除
            copy_from_user(&epds, event, sizeof(struct epoll_event)))
                goto error_return;

        error = -EBADF;
        f = fdget(epfd);
        if (!f.file)            //获取epoll文件句柄
                goto error_return;

        /* Get the "struct file *" for the target file */
        tf = fdget(fd);
        if (!tf.file)           //获取目标文件句柄
                goto error_fput;

        /* The target file descriptor must support poll */
        error = -EPERM;
        if (!tf.file->f_op->poll)       //判断目标文件是否有poll函数
                goto error_tgt_fput;

        /* Check if EPOLLWAKEUP is allowed */
        if (ep_op_has_event(op))
                ep_take_care_of_epollwakeup(&epds);

        /*
         * We have to check that the file structure underneath the file descriptor
         * the user passed to us _is_ an eventpoll file. And also we do not permit
         * adding an epoll file descriptor inside itself.
         */
        error = -EINVAL;
        if (f.file == tf.file || !is_file_epoll(f.file))    //判断epoll文件句柄是否是epoll类型文件
                goto error_tgt_fput;

        /*
         * epoll adds to the wakeup queue at EPOLL_CTL_ADD time only,
         * so EPOLLEXCLUSIVE is not allowed for a EPOLL_CTL_MOD operation.
         * Also, we do not currently supported nested exclusive wakeups.
         */
        if (ep_op_has_event(op) && (epds.events & EPOLLEXCLUSIVE)) {
                if (op == EPOLL_CTL_MOD)
                        goto error_tgt_fput;
                if (op == EPOLL_CTL_ADD && (is_file_epoll(tf.file) ||
                                (epds.events & ~EPOLLEXCLUSIVE_OK_BITS)))
                        goto error_tgt_fput;
        }

        /*
         * At this point it is safe to assume that the "private_data" contains
         * our own data structure.
         */
        ep = f.file->private_data;
        /*
         * When we insert an epoll file descriptor, inside another epoll file
         * descriptor, there is the change of creating closed loops, which are
         * better be handled here, than in more critical paths. While we are
         * checking for loops we also determine the list of files reachable
         * and hang them on the tfile_check_list, so we can check that we
         * haven't created too many possible wakeup paths.
         *
         * We do not need to take the global 'epumutex' on EPOLL_CTL_ADD when
         * the epoll file descriptor is attaching directly to a wakeup source,
         * unless the epoll file descriptor is nested. The purpose of taking the
         * 'epmutex' on add is to prevent complex toplogies such as loops and
         * deep wakeup paths from forming in parallel through multiple
         * EPOLL_CTL_ADD operations.
         */
        mutex_lock_nested(&ep->mtx, 0);
        if (op == EPOLL_CTL_ADD) {
                if (!list_empty(&f.file->f_ep_links) ||
                                                is_file_epoll(tf.file)) {
                        full_check = 1;
                        mutex_unlock(&ep->mtx);
                        mutex_lock(&epmutex);
                        if (is_file_epoll(tf.file)) {
                                error = -ELOOP;
                                if (ep_loop_check(ep, tf.file) != 0) {
                                        clear_tfile_check_list();
                                        goto error_tgt_fput;
                                }
                        } else
                                list_add(&tf.file->f_tfile_llink,
                                                        &tfile_check_list);
                        mutex_lock_nested(&ep->mtx, 0);
                        if (is_file_epoll(tf.file)) {
                                tep = tf.file->private_data;
                                mutex_lock_nested(&tep->mtx, 1);
                        }
                }
        }               //如果是添加类型操作,判断是否有环。

        /*
         * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
         * above, we can be sure to be able to use the item looked up by
         * ep_find() till we release the mutex.
         */
        epi = ep_find(ep, tf.file, fd);

        error = -EINVAL;
        switch (op) {
        case EPOLL_CTL_ADD: //添加操作
                if (!epi) {
                        epds.events |= EPOLLERR | EPOLLHUP;
                        error = ep_insert(ep, &epds, tf.file, fd, full_check);
                } else
                        error = -EEXIST;
                if (full_check)
                        clear_tfile_check_list();
                break;
        case EPOLL_CTL_DEL: //删除操作
                if (epi)
                        error = ep_remove(ep, epi);
                else
                        error = -ENOENT;
                break;
        case EPOLL_CTL_MOD: //修改等待的类型。
                if (epi) {
                        if (!(epi->event.events & EPOLLEXCLUSIVE)) {
                                epds.events |= EPOLLERR | EPOLLHUP;
                                error = ep_modify(ep, epi, &epds);
                        }
                } else
                        error = -ENOENT;
                break;
        }
        if (tep != NULL)
                mutex_unlock(&tep->mtx);
        mutex_unlock(&ep->mtx);

error_tgt_fput:
        if (full_check)
                mutex_unlock(&epmutex);

        fdput(tf);
error_fput:
        fdput(f);
error_return:

        return error;

最后使用epoll_wait系统调用获取,有事件发生的句柄。代码如下

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
                int, maxevents, int, timeout)       //maxevents 最多等待事件数,timeout超时时间
{
        int error;
        struct fd f;
        struct eventpoll *ep;

        /* The maximum number of event must be greater than zero */
        if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
                return -EINVAL;

        /* Verify that the area passed by the user is writeable */
        if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
                return -EFAULT;

        /* Get the "struct file *" for the eventpoll file */
        f = fdget(epfd);
        if (!f.file)
                return -EBADF;

        /*
         * We have to check that the file structure underneath the fd
         * the user passed to us _is_ an eventpoll file.
         */
        error = -EINVAL;
        if (!is_file_epoll(f.file))
                goto error_fput;

        /*
         * At this point it is safe to assume that the "private_data" contains
         * our own data structure.
         */
        ep = f.file->private_data;

        /* Time to fish for events ... */
        error = ep_poll(ep, events, maxevents, timeout);    //前面都是做安全检查,真正做事的函数是ep_poll

error_fput:
        fdput(f);
        return error;
}

ep_poll代码

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
                   int maxevents, long timeout)
{
        int res = 0, eavail, timed_out = 0;
        unsigned long flags;
        u64 slack = 0;
        wait_queue_entry_t wait;
        ktime_t expires, *to = NULL;

        if (timeout > 0) {
                struct timespec64 end_time = ep_set_mstimeout(timeout);

                slack = select_estimate_accuracy(&end_time);
                to = &expires;
                *to = timespec64_to_ktime(end_time);
        } else if (timeout == 0) {
                /*
                 * Avoid the unnecessary trip to the wait queue loop, if the
                 * caller specified a non blocking operation.
                 */
                timed_out = 1;
                spin_lock_irqsave(&ep->lock, flags);
                goto check_events;
        }       

fetch_events:   

        if (!ep_events_available(ep))
                ep_busy_loop(ep, timed_out);

        spin_lock_irqsave(&ep->lock, flags);

        if (!ep_events_available(ep)) {
                /*
                 * Busy poll timed out.  Drop NAPI ID for now, we can add
                 * it back in when we have moved a socket with a valid NAPI
                 * ID onto the ready list.
                 */
                ep_reset_busy_poll_napi_id(ep);

                /* 
                 * We don't have any available event to return to the caller.
                 * We need to sleep here, and we will be wake up by
                 * ep_poll_callback() when events will become available.
                 */
                init_waitqueue_entry(&wait, current);
                __add_wait_queue_exclusive(&ep->wq, &wait);

                for (;;) {
                        /*
                         * We don't want to sleep if the ep_poll_callback() sends us
                         * a wakeup in between. That's why we set the task state
                         * to TASK_INTERRUPTIBLE before doing the checks.
                         */
                        set_current_state(TASK_INTERRUPTIBLE);
                        /*
                         * Always short-circuit for fatal signals to allow
                         * threads to make a timely exit without the chance of
                         * finding more events available and fetching
                         * repeatedly.
                         */
                        if (fatal_signal_pending(current)) {
                                res = -EINTR;
                                break;
                        }
                        if (ep_events_available(ep) || timed_out)
                                break;
                        if (signal_pending(current)) {
                                res = -EINTR;
                                break;
                        }

                        spin_unlock_irqrestore(&ep->lock, flags);
                        if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
                                timed_out = 1;

                        spin_lock_irqsave(&ep->lock, flags);
                }

                __remove_wait_queue(&ep->wq, &wait);
                __set_current_state(TASK_RUNNING);
        }
check_events:
        /* Is it worth to try to dig for events ? */
        eavail = ep_events_available(ep);

        spin_unlock_irqrestore(&ep->lock, flags);

        /*
         * Try to transfer events to user space. In case we get 0 events and
         * there's still timeout left over, we go trying again in search of
         * more luck.
         */
        if (!res && eavail &&
            !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
                goto fetch_events;          //前面所有的代码都是进行检查和等待就绪队列中有句柄。最后调用ep_send_events将就绪队列中的数据拷贝到events中

        return res;
}

ep_send_events代码

static int ep_send_events(struct eventpoll *ep,
                          struct epoll_event __user *events, int maxevents)
{
        struct ep_send_events_data esed;

        esed.maxevents = maxevents;
        esed.events = events;

        ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
        return esed.res;
}

ep_scan_ready_list函数将rdlist中的句柄考入esed结构体中,最后返回给用户态。在拷贝时,这个函数做了很多保护。
结束语: 这篇大概写了epoll系统调用的执行流程。但是没有写,系统是怎么讲等待红黑树中的句柄加入 就绪队列中的。将在下篇中详细说明。