分类
linux network

linux socket REUSEPORT UDP socket内核代码分析

REUSEPORT 和REUSEADDR是差不多的功能,都是一个端口可以多个socket进行监听。差别在于 REUSEPORT可以做负载,且不会引起惊群效应。
REUSEPORT设置方式和REUSEADDR设置方式一样都是通过setsockopt进行设置,最终在sk上设置sk_reuseport的值。
在绑定时,会调用一系列函数,这些函数在REUSEADDR中已经讲过。最终会调用到udp_lib_lport_inuse2函数

static int udp_lib_lport_inuse2(struct net *net, __u16 num,
                                struct udp_hslot *hslot2,
                                struct sock *sk)
{
        struct sock *sk2;
        kuid_t uid = sock_i_uid(sk);
        int res = 0;

        spin_lock(&hslot2->lock);
        udp_portaddr_for_each_entry(sk2, &hslot2->head) {
                if (net_eq(sock_net(sk2), net) &&
                    sk2 != sk &&
                    (udp_sk(sk2)->udp_port_hash == num) &&
                    (!sk2->sk_reuse || !sk->sk_reuse) &&
                    (!sk2->sk_bound_dev_if || !sk->sk_bound_dev_if ||
                     sk2->sk_bound_dev_if == sk->sk_bound_dev_if) &&
                    inet_rcv_saddr_equal(sk, sk2, true)) {          //首先检查是否有冲突,如果有冲突走if中的逻辑
                        if (sk2->sk_reuseport && sk->sk_reuseport &&
                            !rcu_access_pointer(sk->sk_reuseport_cb) &&     //判断两个socket 是否都开启了sk_reuseport功能,并且最新的socket没有初始化sk_reuseport_cb。则表示端口可用返回
                            uid_eq(uid, sock_i_uid(sk2))) {
                                res = 0;
                        } else {
                                res = 1;
                        }
                        break;
                }
        }
        spin_unlock(&hslot2->lock);
        return res;
}

返回到 udp_lib_get_port函数中。会执行以下代码

found:
        inet_sk(sk)->inet_num = snum;
        udp_sk(sk)->udp_port_hash = snum;
        udp_sk(sk)->udp_portaddr_hash ^= snum;
        if (sk_unhashed(sk)) {
                if (sk->sk_reuseport &&
                    udp_reuseport_add_sock(sk, hslot)) {            //将sk按照resuseport处理以下
                        inet_sk(sk)->inet_num = 0;
                        udp_sk(sk)->udp_port_hash = 0;
                        udp_sk(sk)->udp_portaddr_hash ^= snum;
                        goto fail_unlock;
                }

                sk_add_node_rcu(sk, &hslot->head);
                hslot->count++;
                sock_prot_inuse_add(sock_net(sk), sk->sk_prot, 1);

                hslot2 = udp_hashslot2(udptable, udp_sk(sk)->udp_portaddr_hash);
                spin_lock(&hslot2->lock);
                if (IS_ENABLED(CONFIG_IPV6) && sk->sk_reuseport &&
                    sk->sk_family == AF_INET6)
                        hlist_add_tail_rcu(&udp_sk(sk)->udp_portaddr_node,
                                           &hslot2->head);
                else
                        hlist_add_head_rcu(&udp_sk(sk)->udp_portaddr_node,
                                           &hslot2->head);          //添加sk到hash表中
                hslot2->count++;
                spin_unlock(&hslot2->lock);
        }
        sock_set_flag(sk, SOCK_RCU_FREE);
        error = 0;

udp_reuseport_add_sock 函数是是新加函数,这个函数是REUSEPORT的关键。代码分析。

static int udp_reuseport_add_sock(struct sock *sk, struct udp_hslot *hslot)
{
        struct net *net = sock_net(sk);
        kuid_t uid = sock_i_uid(sk);
        struct sock *sk2;

        sk_for_each(sk2, &hslot->head) {
                if (net_eq(sock_net(sk2), net) &&
                    sk2 != sk &&
                    sk2->sk_family == sk->sk_family &&
                    ipv6_only_sock(sk2) == ipv6_only_sock(sk) &&
                    (udp_sk(sk2)->udp_port_hash == udp_sk(sk)->udp_port_hash) &&
                    (sk2->sk_bound_dev_if == sk->sk_bound_dev_if) &&
                    sk2->sk_reuseport && uid_eq(uid, sock_i_uid(sk2)) &&        //同一用户才能共享。
                    inet_rcv_saddr_equal(sk, sk2, false)) {
                        return reuseport_add_sock(sk, sk2); //端口已经被使用多次
                }
        }

        return reuseport_alloc(sk);         //这个端口第一次被使用
}

reuseport_alloc函数如下

int reuseport_alloc(struct sock *sk)
{
        struct sock_reuseport *reuse;

        /* bh lock used since this function call may precede hlist lock in
         * soft irq of receive path or setsockopt from process context
         */
        spin_lock_bh(&reuseport_lock);

        /* Allocation attempts can occur concurrently via the setsockopt path
         * and the bind/hash path.  Nothing to do when we lose the race.
         */
        if (rcu_dereference_protected(sk->sk_reuseport_cb,
                                      lockdep_is_held(&reuseport_lock)))
                goto out;

        reuse = __reuseport_alloc(INIT_SOCKS);
        if (!reuse) {
                spin_unlock_bh(&reuseport_lock);
                return -ENOMEM;
        }

        reuse->socks[0] = sk;
        reuse->num_socks = 1;
        rcu_assign_pointer(sk->sk_reuseport_cb, reuse);     //创建并初始化reuse结构体

out:
        spin_unlock_bh(&reuseport_lock);

        return 0;
}

第一个开启reuseport的socket 初始化流程使用的是上面的流程,后续使用reuseport_add_sock,代码如下

int reuseport_add_sock(struct sock *sk, struct sock *sk2)
{       
        struct sock_reuseport *old_reuse, *reuse;

        if (!rcu_access_pointer(sk2->sk_reuseport_cb)) {
                int err = reuseport_alloc(sk2);

                if (err)
                        return err;
        }       //如果sk2没有reuse结构体,分配一个新的

        spin_lock_bh(&reuseport_lock);
        reuse = rcu_dereference_protected(sk2->sk_reuseport_cb,
                                          lockdep_is_held(&reuseport_lock));
        old_reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
                                             lockdep_is_held(&reuseport_lock));
        if (old_reuse && old_reuse->num_socks != 1) {       //如果sk已经有resue结构体,并且已经和其他sk组合,则失败。
                spin_unlock_bh(&reuseport_lock);
                return -EBUSY;
        }

        if (reuse->num_socks == reuse->max_socks) {     //如果超过最多组合数,则扩整最多组合数。
                reuse = reuseport_grow(reuse);
                if (!reuse) {
                        spin_unlock_bh(&reuseport_lock);
                        return -ENOMEM;
                }
        }

        reuse->socks[reuse->num_socks] = sk;        //将新的sk加入reuse组合里面
        /* paired with smp_rmb() in reuseport_select_sock() */
        smp_wmb();
        reuse->num_socks++;
        rcu_assign_pointer(sk->sk_reuseport_cb, reuse);

        spin_unlock_bh(&reuseport_lock);

        if (old_reuse)
                call_rcu(&old_reuse->rcu, reuseport_free_rcu);      //释放就得reuse。如果需要释放的话。
        return 0;
}

这个函数很简单,就是将所有reuseport的sk放在一个数组里面。等接收到数据包时,可以根据数组来里面的sk数量通过一定的算法来负载均衡。(没有最大限制,初始化最大存放128个,如果超过则以2的指数增长,直到不能分配更大的内存为止。)
通过目的地址和目的端口找到sk以后,如果发现sk有reuse_port标志,则会调用reuseport_select_sock,使用原地址和源端口,目的地址 目的端口做hash。选择组合里面的一个sk。代码如下

                                hash = udp_ehashfn(net, daddr, hnum,
                                                   saddr, sport);
                                result = reuseport_select_sock(sk, hash, skb,
                                                        sizeof(struct udphdr));
struct sock *reuseport_select_sock(struct sock *sk,
                                   u32 hash,
                                   struct sk_buff *skb,
                                   int hdr_len)
{
        struct sock_reuseport *reuse;
        struct bpf_prog *prog; 
        struct sock *sk2 = NULL;
        u16 socks;

        rcu_read_lock();
        reuse = rcu_dereference(sk->sk_reuseport_cb);

        /* if memory allocation failed or add call is not yet complete */
        if (!reuse)
                goto out;

        prog = rcu_dereference(reuse->prog);
        socks = READ_ONCE(reuse->num_socks);
        if (likely(socks)) {
                /* paired with smp_wmb() in reuseport_add_sock() */
                smp_rmb();

                if (prog && skb)
                        sk2 = run_bpf(reuse, socks, prog, skb, hdr_len);        //bpf简直无处不在啊,现在内核里面各处都可以执行bpf,以后可以在点位或者kprobe给 内核下各种各样的规则啊。

                /* no bpf or invalid bpf result: fall back to hash usage */
                if (!sk2)
                        sk2 = reuse->socks[reciprocal_scale(hash, socks)];      //根据hash选择合适的sk返回
        }

out:
        rcu_read_unlock();
        return sk2;
}

然后,返回之后,会唤醒选定的sk对应的进程,接受数据包处理。

reuseport优势
如果在没有reuseport之前,我们使用一个进程等待网络事件,然后通过IPC通知其他进程处理业务,这中间涉及到IPC,并且可能引起进程切换,降低性能。 又有REUSEPORT后,可以使用多个进程同时在一个端口等待,根据4元组,将数据hash到不同的进程处理,不需要IPC。可以提升性能。

发表评论

电子邮件地址不会被公开。 必填项已用*标注