REUSEPORT 和REUSEADDR是差不多的功能,都是一个端口可以多个socket进行监听。差别在于 REUSEPORT可以做负载,且不会引起惊群效应。
REUSEPORT设置方式和REUSEADDR设置方式一样都是通过setsockopt进行设置,最终在sk上设置sk_reuseport的值。
在绑定时,会调用一系列函数,这些函数在REUSEADDR中已经讲过。最终会调用到udp_lib_lport_inuse2函数
static int udp_lib_lport_inuse2(struct net *net, __u16 num,
struct udp_hslot *hslot2,
struct sock *sk)
{
struct sock *sk2;
kuid_t uid = sock_i_uid(sk);
int res = 0;
spin_lock(&hslot2->lock);
udp_portaddr_for_each_entry(sk2, &hslot2->head) {
if (net_eq(sock_net(sk2), net) &&
sk2 != sk &&
(udp_sk(sk2)->udp_port_hash == num) &&
(!sk2->sk_reuse || !sk->sk_reuse) &&
(!sk2->sk_bound_dev_if || !sk->sk_bound_dev_if ||
sk2->sk_bound_dev_if == sk->sk_bound_dev_if) &&
inet_rcv_saddr_equal(sk, sk2, true)) { //首先检查是否有冲突,如果有冲突走if中的逻辑
if (sk2->sk_reuseport && sk->sk_reuseport &&
!rcu_access_pointer(sk->sk_reuseport_cb) && //判断两个socket 是否都开启了sk_reuseport功能,并且最新的socket没有初始化sk_reuseport_cb。则表示端口可用返回
uid_eq(uid, sock_i_uid(sk2))) {
res = 0;
} else {
res = 1;
}
break;
}
}
spin_unlock(&hslot2->lock);
return res;
}
返回到 udp_lib_get_port函数中。会执行以下代码
found:
inet_sk(sk)->inet_num = snum;
udp_sk(sk)->udp_port_hash = snum;
udp_sk(sk)->udp_portaddr_hash ^= snum;
if (sk_unhashed(sk)) {
if (sk->sk_reuseport &&
udp_reuseport_add_sock(sk, hslot)) { //将sk按照resuseport处理以下
inet_sk(sk)->inet_num = 0;
udp_sk(sk)->udp_port_hash = 0;
udp_sk(sk)->udp_portaddr_hash ^= snum;
goto fail_unlock;
}
sk_add_node_rcu(sk, &hslot->head);
hslot->count++;
sock_prot_inuse_add(sock_net(sk), sk->sk_prot, 1);
hslot2 = udp_hashslot2(udptable, udp_sk(sk)->udp_portaddr_hash);
spin_lock(&hslot2->lock);
if (IS_ENABLED(CONFIG_IPV6) && sk->sk_reuseport &&
sk->sk_family == AF_INET6)
hlist_add_tail_rcu(&udp_sk(sk)->udp_portaddr_node,
&hslot2->head);
else
hlist_add_head_rcu(&udp_sk(sk)->udp_portaddr_node,
&hslot2->head); //添加sk到hash表中
hslot2->count++;
spin_unlock(&hslot2->lock);
}
sock_set_flag(sk, SOCK_RCU_FREE);
error = 0;
udp_reuseport_add_sock 函数是是新加函数,这个函数是REUSEPORT的关键。代码分析。
static int udp_reuseport_add_sock(struct sock *sk, struct udp_hslot *hslot)
{
struct net *net = sock_net(sk);
kuid_t uid = sock_i_uid(sk);
struct sock *sk2;
sk_for_each(sk2, &hslot->head) {
if (net_eq(sock_net(sk2), net) &&
sk2 != sk &&
sk2->sk_family == sk->sk_family &&
ipv6_only_sock(sk2) == ipv6_only_sock(sk) &&
(udp_sk(sk2)->udp_port_hash == udp_sk(sk)->udp_port_hash) &&
(sk2->sk_bound_dev_if == sk->sk_bound_dev_if) &&
sk2->sk_reuseport && uid_eq(uid, sock_i_uid(sk2)) && //同一用户才能共享。
inet_rcv_saddr_equal(sk, sk2, false)) {
return reuseport_add_sock(sk, sk2); //端口已经被使用多次
}
}
return reuseport_alloc(sk); //这个端口第一次被使用
}
reuseport_alloc函数如下
int reuseport_alloc(struct sock *sk)
{
struct sock_reuseport *reuse;
/* bh lock used since this function call may precede hlist lock in
* soft irq of receive path or setsockopt from process context
*/
spin_lock_bh(&reuseport_lock);
/* Allocation attempts can occur concurrently via the setsockopt path
* and the bind/hash path. Nothing to do when we lose the race.
*/
if (rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock)))
goto out;
reuse = __reuseport_alloc(INIT_SOCKS);
if (!reuse) {
spin_unlock_bh(&reuseport_lock);
return -ENOMEM;
}
reuse->socks[0] = sk;
reuse->num_socks = 1;
rcu_assign_pointer(sk->sk_reuseport_cb, reuse); //创建并初始化reuse结构体
out:
spin_unlock_bh(&reuseport_lock);
return 0;
}
第一个开启reuseport的socket 初始化流程使用的是上面的流程,后续使用reuseport_add_sock,代码如下
int reuseport_add_sock(struct sock *sk, struct sock *sk2)
{
struct sock_reuseport *old_reuse, *reuse;
if (!rcu_access_pointer(sk2->sk_reuseport_cb)) {
int err = reuseport_alloc(sk2);
if (err)
return err;
} //如果sk2没有reuse结构体,分配一个新的
spin_lock_bh(&reuseport_lock);
reuse = rcu_dereference_protected(sk2->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock));
old_reuse = rcu_dereference_protected(sk->sk_reuseport_cb,
lockdep_is_held(&reuseport_lock));
if (old_reuse && old_reuse->num_socks != 1) { //如果sk已经有resue结构体,并且已经和其他sk组合,则失败。
spin_unlock_bh(&reuseport_lock);
return -EBUSY;
}
if (reuse->num_socks == reuse->max_socks) { //如果超过最多组合数,则扩整最多组合数。
reuse = reuseport_grow(reuse);
if (!reuse) {
spin_unlock_bh(&reuseport_lock);
return -ENOMEM;
}
}
reuse->socks[reuse->num_socks] = sk; //将新的sk加入reuse组合里面
/* paired with smp_rmb() in reuseport_select_sock() */
smp_wmb();
reuse->num_socks++;
rcu_assign_pointer(sk->sk_reuseport_cb, reuse);
spin_unlock_bh(&reuseport_lock);
if (old_reuse)
call_rcu(&old_reuse->rcu, reuseport_free_rcu); //释放就得reuse。如果需要释放的话。
return 0;
}
这个函数很简单,就是将所有reuseport的sk放在一个数组里面。等接收到数据包时,可以根据数组来里面的sk数量通过一定的算法来负载均衡。(没有最大限制,初始化最大存放128个,如果超过则以2的指数增长,直到不能分配更大的内存为止。)
通过目的地址和目的端口找到sk以后,如果发现sk有reuse_port标志,则会调用reuseport_select_sock,使用原地址和源端口,目的地址 目的端口做hash。选择组合里面的一个sk。代码如下
hash = udp_ehashfn(net, daddr, hnum,
saddr, sport);
result = reuseport_select_sock(sk, hash, skb,
sizeof(struct udphdr));
struct sock *reuseport_select_sock(struct sock *sk,
u32 hash,
struct sk_buff *skb,
int hdr_len)
{
struct sock_reuseport *reuse;
struct bpf_prog *prog;
struct sock *sk2 = NULL;
u16 socks;
rcu_read_lock();
reuse = rcu_dereference(sk->sk_reuseport_cb);
/* if memory allocation failed or add call is not yet complete */
if (!reuse)
goto out;
prog = rcu_dereference(reuse->prog);
socks = READ_ONCE(reuse->num_socks);
if (likely(socks)) {
/* paired with smp_wmb() in reuseport_add_sock() */
smp_rmb();
if (prog && skb)
sk2 = run_bpf(reuse, socks, prog, skb, hdr_len); //bpf简直无处不在啊,现在内核里面各处都可以执行bpf,以后可以在点位或者kprobe给 内核下各种各样的规则啊。
/* no bpf or invalid bpf result: fall back to hash usage */
if (!sk2)
sk2 = reuse->socks[reciprocal_scale(hash, socks)]; //根据hash选择合适的sk返回
}
out:
rcu_read_unlock();
return sk2;
}
然后,返回之后,会唤醒选定的sk对应的进程,接受数据包处理。
reuseport优势
如果在没有reuseport之前,我们使用一个进程等待网络事件,然后通过IPC通知其他进程处理业务,这中间涉及到IPC,并且可能引起进程切换,降低性能。 又有REUSEPORT后,可以使用多个进程同时在一个端口等待,根据4元组,将数据hash到不同的进程处理,不需要IPC。可以提升性能。