分类
dpdk analysis

dpdk 网卡驱动注册之虚拟网卡驱动注册

dpdk 中比较常用的虚拟网卡是af_packet类型网卡,这种网卡可以收发ip命令创建的vnet类型网卡数据包

af_packet类型网卡驱动在driver/net/af_packet目录下,主要的文件是rte_eth_af_packet.c文件

在文件的最后有一个 pmd_af_packet_drv结构体

static struct rte_vdev_driver pmd_af_packet_drv =
{
    .probe = rte_pmd_af_packet_probe,
    .remove=rte_pmd_af_packet_remove,
}; //创建驱动结构体

RTE_PMD_REGISTER_VDEV(net_af_packet, pmd_af_packet_drv); //注册驱动
RTE_PMD_REGISTER_ALIAS(net_af_packet, eth_af_packet);  //注册驱动别名
RTE_PMD_REGISTER_PARAM_STRING(net_af_packet,    //驱动参数设置
    "iface=<string> "
    "qpairs=<int> "
    "blocksz=<int> "
    "framesz=<int> "
    "framecnt=<int>");

RTE_PMD_REGISTER_VDEV 宏源代码

#define RTE_PMD_REGISTER_VDEV(nm, vdrv)\
RTE_INIT(vdrvinitfn_ ##vdrv);\    //申明 vdrvinitfn_ ##vdrv 函数 为constructor ,在main函数之前执行
static const char *vdrvinit_ ## nm ## _alias;\  //申明别名指针
static void vdrvinitfn_ ##vdrv(void)\
{\
    (vdrv).driver.name = RTE_STR(nm);\                //设置名称
    (vdrv).driver.alias = vdrvinit_ ## nm ## _alias;\ //设置别名指针指向别名
    rte_eal_vdrv_register(&vdrv);\                    //注册驱动
} \
RTE_PMD_EXPORT_NAME(nm, __COUNTER__)

rte_eal_vdrv_register 驱动注册函数

void rte_eal_vdrv_register(struct rte_vdev_driver *driver)
{
    rte_vdev_bus_register();

    TAILQ_INSERT_TAIL(&vdev_driver_list, driver, next);//注册到虚拟驱动列表中
    rte_eal_driver_register(&driver->driver);   //注册到所有驱动列表中
}

注册完成以后,在处理用户输入参数时,会 调用 rte_eal_devargs_add添加 虚拟设备的参数到devargs_list中

注册虚拟bus,在dpdk中有一个bus的概念,这个概念类似于内核的bus概念,设备都挂载在bus上。因此,dpdk为所有的虚拟设备注册了一个虚拟bus

static struct rte_bus rte_vdev_bus = { //定义虚拟bus结构
    .scan = vdev_scan,
    .probe = vdev_probe,
};

RTE_INIT(rte_vdev_bus_register); //申明rte_vdev_bus_register函数 添加constructor属性,在执行main函数之前执行

static void rte_vdev_bus_register(void)
{
    static int registered;

    if (registered)
            return;

    registered = 1;
    rte_vdev_bus.name = RTE_STR(virtual); //设置bus名字
    rte_bus_register(&rte_vdev_bus); //注册bus
}

rte_bus_scan 函数 扫描 bus上的设备,代码如下

int rte_bus_scan(void)
{
int ret;
struct rte_bus *bus = NULL;

    TAILQ_FOREACH(bus, &rte_bus_list, next) { //遍历 所有类型bus
            ret = bus->scan();   //扫描设备 当前调用 vdev_scan 扫描
            if (ret) {
                    RTE_LOG(ERR, EAL, "Scan for (%s) bus failed.\n",
                            bus->name);
                    return ret;
            }
    }

    return 0;
}

vdev_scan 函数如下

static int vdev_scan(void)
{
    struct rte_vdev_device *dev;
    struct rte_devargs *devargs;

    /* for virtual devices we scan the devargs_list populated via cmdline */

    TAILQ_FOREACH(devargs, &devargs_list, next) { //遍历用户输入的虚拟网卡参数列表

            if (devargs->type != RTE_DEVTYPE_VIRTUAL)
                    continue;

            dev = find_vdev(devargs->virt.drv_name);
            if (dev)
                    continue;

            dev = calloc(1, sizeof(*dev));  //创建虚拟网卡设备
            if (!dev)
                    return -1;

            dev->device.devargs = devargs;
            dev->device.numa_node = SOCKET_ID_ANY;
            dev->device.name = devargs->virt.drv_name;

            rte_eal_device_insert(&dev->device);
            TAILQ_INSERT_TAIL(&vdev_device_list, dev, next); //添加到虚拟设备链表中
    }

    return 0;
}

rte_bus_probe 函数查找 设备的驱动

int rte_bus_probe(void)
{
    int ret;
    struct rte_bus *bus, *vbus = NULL;

    TAILQ_FOREACH(bus, &rte_bus_list, next) { // 遍历所有的bus
            if (!strcmp(bus->name, "virtual")) {
                    vbus = bus; //虚拟bus最后查找驱动
                    continue;
            }

            ret = bus->probe(); //查找驱动
            if (ret) {
                    RTE_LOG(ERR, EAL, "Bus (%s) probe failed.\n",
                            bus->name);
                    return ret;
            }
    }

    if (vbus) {
            ret = vbus->probe(); //查找虚拟bus上的虚拟设备的驱动 调用vdev_probe函数
            if (ret) {
                    RTE_LOG(ERR, EAL, "Bus (%s) probe failed.\n",
                            vbus->name);
                    return ret;
            }
    }

    return 0;
}

vdev_probe 函数如下

static int vdev_probe(void)
{
struct rte_vdev_device *dev;

    /*
     * Note that the dev_driver_list is populated here
     * from calls made to rte_eal_driver_register from constructor functions
     * embedded into PMD modules via the RTE_PMD_REGISTER_VDEV macro
     */

    /* call the init function for each virtual device */
    TAILQ_FOREACH(dev, &vdev_device_list, next) {//遍历bus上的所有设备

            if (dev->device.driver)
                    continue;

            if (vdev_probe_all_drivers(dev)) { //查找设备的驱动
                    RTE_LOG(ERR, EAL, "failed to initialize %s device\n",
                            rte_vdev_device_name(dev));
                    return -1;
            }
    }

    return 0;
}

vdev_probe_all_drivers 函数代码如下

static int vdev_probe_all_drivers(struct rte_vdev_device *dev)
{
    const char *name;
    char *drv_name;
    struct rte_vdev_driver *driver;
    int ret = 1;

    drv_name = parse_driver_arg(rte_vdev_device_args(dev));
    name = drv_name ? drv_name : rte_vdev_device_name(dev); //获取指定的驱动名字

    RTE_LOG(DEBUG, EAL, "Search driver %s to probe device %s\n", name,
            rte_vdev_device_name(dev));

    TAILQ_FOREACH(driver, &vdev_driver_list, next) {//遍历驱动列表
            /*
             * search a driver prefix in virtual device name.
             * For example, if the driver is pcap PMD, driver->name
             * will be "net_pcap", but "name" will be "net_pcapN".
             * So use strncmp to compare.
             */
            if (!strncmp(driver->driver.name, name,
                        strlen(driver->driver.name))) {
                    dev->device.driver = &driver->driver;
                    ret = driver->probe(dev); //找到匹配名字,调用驱动尝试初始化
                    if (ret)
                            dev->device.driver = NULL;
                    goto out;
            }
    }

    /* Give new names precedence over aliases. */
    TAILQ_FOREACH(driver, &vdev_driver_list, next) {//遍历驱动列表
            if (driver->driver.alias &&
                !strncmp(driver->driver.alias, name,
                        strlen(driver->driver.alias))) {//找到匹配别名,尝试初始化
                    dev->device.driver = &driver->driver;
                    ret = driver->probe(dev); //找到匹配别名,尝试初始化
                    if (ret)
                            dev->device.driver = NULL;
                    break;
            }
    }

out:
    free(drv_name);
    return ret;
}

到这里虚拟网卡驱动 及虚拟网卡加载完毕。

分类
dpdk analysis

dpdk 无锁 队列解析

在dpdk中有一个无锁环形队列,用来解决多生产者和多消费者之间的同步,据说无锁以后性能 有很大的提升。这里面主要使用CAS的思想。

我们首先分析一下思路。

无锁队列有四个关键变量。 cons_head cons_tail prod_head prod_tail。

在没有读者和写着消费信息的情况下 cons_head 等于cons_tail prod_head 等于prod_tail

1.生产者拷贝 首先拷贝一份cons_tail prod_head prod_tail。

2.将prod_head 加上生产的数据长度,得到prod_next,判断prod_next是否覆盖cons_tail。如果是,生产失败,否则进入下一步

3.使用compare and swap指令,对比队列中的prod_head 和拷贝的prod_head对比,如果一样,则将队列中的prod_head等于prod_next,如果不一样则返回到第一步重新来过

4.生产数据

5.判断队列中的prod_tail是否等于拷贝后的prod_head,如果等于则将prod_tail等于prod_next,否则等待直到 队列中的prod_tail等于拷贝后的prod_head,再将prod_tail等于prod_next

仔细分析以上5步,我们可以发现,在多生产者同时生产数据时,1-3步是序列化的,多个生产者必须一个一个来, 第4步是可以并行拷贝,第5步必须按照1-3步的序列化顺序来。

当前生产者为了生产数据花费的时候 应该是他前面所有生产者中花费时间最长的,稍微长一点点。因此性能比锁要好很多

分类
dpdk analysis

dpdk 线程管理之 线程创建

解析完参数以后,首先调用 eal_thread_init_master 将主线程绑定到指定的CPU上。代码如下

RTE_PER_LCORE(_lcore_id) = lcore_id; //设置当前线程的绑定CPU
/* set CPU affinity */
if (eal_thread_set_affinity() < 0)  //绑定 线程到指定CPU
     rte_panic("cannot set affinity\n");

eal_thread_set_affinity 代码如下,
unsigned lcore_id = rte_lcore_id(); //获取线程绑定的CPU

    /* acquire system unique id  */
    rte_gettid();

    /* update EAL thread core affinity */
    return rte_thread_set_affinity(&lcore_config[lcore_id].cpuset);//根据cpuset绑定

eal_thread_dump_affinity 函数 获取 主线程绑定了那些cpu。

创建工作线程,

 RTE_LCORE_FOREACH_SLAVE(i) {
            if (pipe(lcore_config[i].pipe_master2slave) < 0)    //创建主线程向工作线程的管道
                    rte_panic("Cannot create pipe\n");
            if (pipe(lcore_config[i].pipe_slave2master) < 0)    //创建工作线程向主线程的管道
                    rte_panic("Cannot create pipe\n");

            lcore_config[i].state = WAIT;

            /* create a thread for each lcore */
            ret = pthread_create(&lcore_config[i].thread_id, NULL,
                                 eal_thread_loop, NULL);       //创建工作线程
            if (ret != 0)
                    rte_panic("Cannot create thread\n");

            /* Set thread_name for aid in debugging. */
            snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
                    "lcore-slave-%d", i);
            ret = rte_thread_setname(lcore_config[i].thread_id,
                                            thread_name); //设置工作线程的名字
            if (ret != 0)
                    RTE_LOG(DEBUG, EAL,
                            "Cannot set name for lcore thread\n");
}

线程起来以后将运行eal_thread_loop 函数, 该函数代码如下

thread_id = pthread_self();    //获取线程id

    /* retrieve our lcore_id from the configuration structure */
    RTE_LCORE_FOREACH_SLAVE(lcore_id) {
            if (thread_id == lcore_config[lcore_id].thread_id) //通过线程id获取配置id
                    break;
    }
    if (lcore_id == RTE_MAX_LCORE)
            rte_panic("cannot retrieve lcore id\n");

    m2s = lcore_config[lcore_id].pipe_master2slave[0];  //获取通讯管道
    s2m = lcore_config[lcore_id].pipe_slave2master[1];

    /* set the lcore ID in per-lcore memory area */
    RTE_PER_LCORE(_lcore_id) = lcore_id;

    /* set CPU affinity */
    if (eal_thread_set_affinity() < 0)  //设置线程CPU绑定
            rte_panic("cannot set affinity\n");

    ret = eal_thread_dump_affinity(cpuset, RTE_CPU_AFFINITY_STR_LEN);

    RTE_LOG(DEBUG, EAL, "lcore %u is ready (tid=%x;cpuset=[%s%s])\n",
            lcore_id, (int)thread_id, cpuset, ret == 0 ? "" : "...");

while (1) {
            void *fct_arg;

            /* wait command */
            do {
                    n = read(m2s, &c, 1);
            } while (n < 0 && errno == EINTR); //等待执行命令

            if (n <= 0)
                    rte_panic("cannot read on configuration pipe\n");

            lcore_config[lcore_id].state = RUNNING; //设置状态为执行

            /* send ack */
            n = 0;
            while (n == 0 || (n < 0 && errno == EINTR)) //回复主线程在执行命令
                    n = write(s2m, &c, 1);
            if (n < 0)
                    rte_panic("cannot write on configuration pipe\n");

            if (lcore_config[lcore_id].f == NULL)
                    rte_panic("NULL function pointer\n");

            /* call the function and store the return value */
            fct_arg = lcore_config[lcore_id].arg;
            ret = lcore_config[lcore_id].f(fct_arg); //执行主线程指定的函数
            lcore_config[lcore_id].ret = ret;
            rte_wmb();
            lcore_config[lcore_id].state = FINISHED; //设置状态为执行完成
    }

rte_eal_remote_launch 函数指定工作线程执行函数

int m2s = lcore_config[slave_id].pipe_master2slave[1];
    int s2m = lcore_config[slave_id].pipe_slave2master[0];

    if (lcore_config[slave_id].state != WAIT)
            return -EBUSY;

    lcore_config[slave_id].f = f;    //设置要执行的函数
    lcore_config[slave_id].arg = arg; //设置参数

    /* send message */
    n = 0;
    while (n == 0 || (n < 0 && errno == EINTR))
            n = write(m2s, &c, 1);   //发送命令
    if (n < 0)
            rte_panic("cannot write on configuration pipe\n");

    /* wait ack */
    do {
            n = read(s2m, &c, 1);  //等待回复
    } while (n < 0 && errno == EINTR);

    if (n <= 0)
            rte_panic("cannot read on configuration pipe\n");

    return 0;

通过以上分析,我们可以看到dpdk在初始化时,会根据用户命令行参数创建对应的线程,并将线程绑定到指定的cpu上。 用户通过rte_eal_remote_launch接口即可指定特定的线程执行特定的函数。

分类
dpdk analysis

dpdk 线程管理之 用户输入参数解析

在dpdk程序中,最开始都需要调用rte_eal_init 函数用来初始化dpdk环境,在rte_eal_init函数中用来初始化线程的函数是 rte_eal_cpu_init,该函数主要是探测 当且设备上有哪些cpu可用。 该函数 遍历设备上所有的cpu,并检测cpu是否可用。并将探测结果记录在lcore_config中以备用。

for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
            lcore_config[lcore_id].core_index = count;

            /* init cpuset for per lcore config */
            CPU_ZERO(&lcore_config[lcore_id].cpuset);

            /* in 1:1 mapping, record related cpu detected state */
            lcore_config[lcore_id].detected = eal_cpu_detected(lcore_id);//通过判断文件/sys/devices/system/cpu/cpu%u/topology/core_id文件是否存在判断cpu是否可用 可用返回1 不可用返回0
            if (lcore_config[lcore_id].detected == 0) {
                    config->lcore_role[lcore_id] = ROLE_OFF;
                    lcore_config[lcore_id].core_index = -1;
                    continue;
            }

            /* By default, lcore 1:1 map to cpu id */
            CPU_SET(lcore_id, &lcore_config[lcore_id].cpuset);//设置cpuset,1:1设置,线程只能跑在一个核上

            /* By default, each detected core is enabled */
            config->lcore_role[lcore_id] = ROLE_RTE;
            lcore_config[lcore_id].core_id = eal_cpu_core_id(lcore_id);//通过读取/sys/devices/system/cpu/cpu%u/topology/core_id文件内容获取当前指定CPU的core_id
            lcore_config[lcore_id].socket_id = eal_cpu_socket_id(lcore_id);//通过判断/sys/device/system/node文件家中有没有CPUcore_id的文件判断 CPU属于哪个 node
            if (lcore_config[lcore_id].socket_id >= RTE_MAX_NUMA_NODES) //如果获取的socket id 大于系统允许的,则有两种可能,第一是系统出错了,第二是系统没有开启socket。
#ifdef RTE_EAL_ALLOW_INV_SOCKET_ID
                    lcore_config[lcore_id].socket_id = 0;
#else
                    rte_panic("Socket ID (%u) is greater than "
                            "RTE_MAX_NUMA_NODES (%d)\n",
                            lcore_config[lcore_id].socket_id,
                            RTE_MAX_NUMA_NODES);
#endif

            RTE_LOG(DEBUG, EAL, "Detected lcore %u as "
                            "core %u on socket %u\n",
                            lcore_id, lcore_config[lcore_id].core_id,
                            lcore_config[lcore_id].socket_id);
            count++;
    }
config->lcore_count = count; //将可用核数量记录在config数据结构中

接下来,通过 eal_parse_args 获取用户的输入参数,设置启动的线程数,及使用那些核心。eal_parse_args 调用 eal_parse_common_option来解析参数。主要解析一下几个参数

c:可运行的核的16进制 bitmask调用eal_parse_cormask 解析 。关键代码如下

for (i = i - 1; i >= 0 && idx < RTE_MAX_LCORE; i--) { //遍历所有核心
            c = coremask[i];
            if (isxdigit(c) == 0) {//将16进制字符转换成16进制数
                    /* invalid characters */
                    return -1;
            }
            val = xdigit2val(c);
            for (j = 0; j < BITS_PER_HEX && idx < RTE_MAX_LCORE; j++, idx++)//每一位表示4个cpu的使用情况BITS_PER_HEX等于4.
            {
                    if ((1 << j) & val) { //如果不等于0 表示这个cpu需要使用
                            if (!lcore_config[idx].detected) { //如果cpu不存在则报错退出
                                    RTE_LOG(ERR, EAL, "lcore %u "
                                            "unavailable\n", idx);
                                    return -1;
                            }
                            cfg->lcore_role[idx] = ROLE_RTE;  //该核使用
                            lcore_config[idx].core_index = count; //存在则记录号,并保存
                            count++;
                    } else {
                            cfg->lcore_role[idx] = ROLE_OFF; //该核不使用
                            lcore_config[idx].core_index = -1;
                    }
            }
    }

l:指定可运行核的编号,处理函数是eal_parse_corelist 关键代码如下

 min = RTE_MAX_LCORE;
    do {
            while (isblank(*corelist))
                    corelist++;
            if (*corelist == '\0')
                    return -1;
            errno = 0;
            idx = strtoul(corelist, &end, 10); //将字符串转换成无符号整型
            if (errno || end == NULL)
                    return -1;
            while (isblank(*end))
                    end++;
            if (*end == '-') { //如果最后一个不能转换的是-说明指定的是一个序列
                    min = idx;
            } else if ((*end == ',') || (*end == '\0')) {
                    max = idx;
                    if (min == RTE_MAX_LCORE) //如果min是初始值,说明是一个数,将min等于max
                            min = idx;
                    for (idx = min; idx <= max; idx++) {
                            if (cfg->lcore_role[idx] != ROLE_RTE) {
                                    cfg->lcore_role[idx] = ROLE_RTE; //设置核可用
                                    lcore_config[idx].core_index = count;//记录核的序号
                                    count++;
                            }
                    }
                    min = RTE_MAX_LCORE;
            } else
                    return -1;
            corelist = end + 1;
    } while (*end != '\0');

–lcores COREMAP:指定工作线程可以工作在多个核上。解释如下:eal_parse_lcore函数处理。

/*
* The format pattern: --lcores='<lcores[@cpus]>[<,lcores[@cpus]>...]'
* lcores, cpus could be a single digit/range or a group.
* '(' and ')' are necessary if it's a group.
* If not supply '@cpus', the value of cpus uses the same as lcores.
* e.g. '1,2@(5-7),(3-5)@(0,2),(0,6),7-8' means start 9 EAL thread as below
*   lcore 0 runs on cpuset 0x41 (cpu 0,6)
*   lcore 1 runs on cpuset 0x2 (cpu 1)
*   lcore 2 runs on cpuset 0xe0 (cpu 5,6,7)
*   lcore 3,4,5 runs on cpuset 0x5 (cpu 0,2)
*   lcore 6 runs on cpuset 0x41 (cpu 0,6)
*   lcore 7 runs on cpuset 0x80 (cpu 7)
*   lcore 8 runs on cpuset 0x100 (cpu 8)

do {
            while (isblank(*lcores))
                    lcores++;
            if (*lcores == '\0')
                    goto err;

            lflags = 0;

            /* record lcore_set start point */
            lcore_start = lcores;

            /* go across a complete bracket */
            if (*lcore_start == '(') {
                    lcores += strcspn(lcores, ")");
                    if (*lcores++ == '\0')
                            goto err;
            }

            /* scan the separator '@', ','(next) or '\0'(finish) */
            lcores += strcspn(lcores, "@,");

            if (*lcores == '@') {
                    /* explicit assign cpu_set */
                    offset = eal_parse_set(lcores + 1, set, RTE_DIM(set)); //获取绑定的cpu序列
                    if (offset < 0)
                            goto err;

                    /* prepare cpu_set and update the end cursor */
                    if (0 > convert_to_cpuset(&cpuset,
                                              set, RTE_DIM(set)))
                            goto err;
                    end = lcores + 1 + offset;
            } else { /* ',' or '\0' */
                    /* haven't given cpu_set, current loop done */
                    end = lcores;

                    /* go back to check <number>-<number> */
                    offset = strcspn(lcore_start, "(-");
                    if (offset < (end - lcore_start) &&
                        *(lcore_start + offset) != '(')
                            lflags = 1;
            }

            if (*end != ',' && *end != '\0')
                    goto err;

            /* parse lcore_set from start point */
            if (0 > eal_parse_set(lcore_start, set, RTE_DIM(set)))//获取线程序列
                    goto err;

            /* without '@', by default using lcore_set as cpu_set */
            if (*lcores != '@' &&
                0 > convert_to_cpuset(&cpuset, set, RTE_DIM(set)))
                    goto err;

            /* start to update lcore_set */
            for (idx = 0; idx < RTE_MAX_LCORE; idx++) {
                    if (!set[idx])
                            continue;

                    if (cfg->lcore_role[idx] != ROLE_RTE) {
                            lcore_config[idx].core_index = count;//标记线程号
                            cfg->lcore_role[idx] = ROLE_RTE;//设置线程可用
                            count++;
                    }

                    if (lflags) {
                            CPU_ZERO(&cpuset);
                            CPU_SET(idx, &cpuset);
                    }
                    rte_memcpy(&lcore_config[idx].cpuset, &cpuset,
                               sizeof(rte_cpuset_t));//设置线程的 CPUSET
            }

            lcores = end + 1;
    } while (*end != '\0');

–master-lcore ID:设置主线程工作的核。处理函数是eal_parse_master_lcore 关键代码如下

cfg->master_lcore = (uint32_t) strtol(arg, &parsing_end, 0); //设置主工作线程的核
    if (errno || parsing_end[0] != 0)
            return -1;
    if (cfg->master_lcore >= RTE_MAX_LCORE)
            return -1;
    master_lcore_parsed = 1;
分类
dpdk analysis

dpdk 内存 管理浅析—-内存申请和释放

1.dpdk 内存分配 dpdk在分配内存时都会调用rte_malloc_socket 函数接口分配内存。该函数 申明如下

void *rte_malloc_socket(const char *type, size_t size, unsigned align, int socket_arg)

参数

type 用来保存内存分配的名字

size 分配的内存大小

align 分配的内存对齐

socket_arg 在那个socket上分配内存

rte_malloc_socket 函数主要有三个功能

a.获取分配内存属于哪个socket 代码如下:

  if (!rte_eal_has_hugepages())
          socket_arg = SOCKET_ID_ANY; //如果没有大页则根据运行线程的cpu所在的socket分配内存

  if (socket_arg == SOCKET_ID_ANY)
          socket = malloc_get_numa_socket();// 获取运行线程的cpu所在socket
  else
          socket = socket_arg; //用户指定了socket。按照用户指定socket分配

b.根据获取的socket,调用malloc_heap_alloc分配内存

  ret = malloc_heap_alloc(&mcfg->malloc_heaps[socket], type,
                            size, 0, align == 0 ? 1 : align, 0);

c. 如果用户没有指定socket,且分配失败,从其他socket上分配内存

  if (ret != NULL || socket_arg != SOCKET_ID_ANY)//如果用户制定了socket 则直接失败
            return ret;

    /* try other heaps */
    for (i = 0; i < RTE_MAX_NUMA_NODES; i++) {
            /* we already tried this one */
            if (i == socket)
                    continue;

            ret = malloc_heap_alloc(&mcfg->malloc_heaps[i], type,
                                    size, 0, align == 0 ? 1 : align, 0); //从其他socket上分配内存
            if (ret != NULL)
                    return ret;
    }

内存堆管理函数

void* malloc_heap_alloc(struct malloc_heap *heap,
            const char *type __attribute__((unused)), size_t size, unsigned flags,
            size_t align, size_t bound)

参数基本与rte_malloc_socket函数一致。(注意 type 阐述其实没有任何卵用)。关键代码如下:

elem = find_suitable_element(heap, size, flags, align, bound);//在堆中找到合适的内存块(大小大于要求大小)
elem = malloc_elem_alloc(elem, size, align, bound);//根据需求将内存块列分成需要的大小,并将剩余块保存

find_suitable_element 函数,dpdk在管理内存时将内存块按照内存块大小的对数作为索引,插入到队列链表中,当有内存分配需求是,根据需要的大小的对数查找空闲内存。代码如下:

for (idx = malloc_elem_free_list_index(size);           //计算所需内存的大小的对数,以此为索引,检索内存
                    idx < RTE_HEAP_NUM_FREELISTS; idx++) {
            for (elem = LIST_FIRST(&heap->free_head[idx]);
                            !!elem; elem = LIST_NEXT(elem, free_list)) {
                    if (malloc_elem_can_hold(elem, size, align, bound)) { //检查内存块是否满足要求,内存块大小大于所需大小
                            if (check_hugepage_sz(flags, elem->ms->hugepage_sz))
                                    return elem;
                            if (alt_elem == NULL)
                                    alt_elem = elem;
                    }
            }
    }

    if ((alt_elem != NULL) && (flags & RTE_MEMZONE_SIZE_HINT_ONLY))
            return alt_elem;   //如果找到合适的内存块则返回,否者返回空 ,分配失败

malloc_elem_alloc 函数,这个函数根据需求大小和对齐要求将内存进行切分。代码如下

struct malloc_elem *new_elem = elem_start_pt(elem, size, align, bound); //计算新内存块的元数据位置
const size_t old_elem_size = (uintptr_t)new_elem - (uintptr_t)elem; //计算分配剩下的内存块大小,
const size_t trailer_size = elem->size - old_elem_size - size -    //计算对齐造成的尾部数据块大小
        MALLOC_ELEM_OVERHEAD;

elem_free_list_remove(elem);   //将内存块元数据从空闲内存库链表中剥离下来

if (trailer_size > MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) { //如果尾部对齐造成空隙够大,则创建一个新的空闲内存块
            /* split it, too much free space after elem */
            struct malloc_elem *new_free_elem =
                            RTE_PTR_ADD(new_elem, size + MALLOC_ELEM_OVERHEAD);

            split_elem(elem, new_free_elem);
            malloc_elem_free_list_insert(new_free_elem);
}

if (old_elem_size < MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) { //判断截取以后的内存是否足够大,如果不足够大则当做分配内存块的pad存在。返回新分配的内存块
            /* don't split it, pad the element instead */
            elem->state = ELEM_BUSY;
            elem->pad = old_elem_size;

            /* put a dummy header in padding, to point to real element header */
            if (elem->pad > 0){ /* pad will be at least 64-bytes, as everything
                                 * is cache-line aligned */
                    new_elem->pad = elem->pad;
                    new_elem->state = ELEM_PAD;
                    new_elem->size = elem->size - elem->pad;
                    set_header(new_elem);
            }

            return new_elem;
}

    /* we are going to split the element in two. The original element
     * remains free, and the new element is the one allocated.
     * Re-insert original element, in case its new size makes it
     * belong on a different list.
     */
    split_elem(elem, new_elem); //将内存块列分成两个内存块,根据需要直接列分。 这里跟伙伴系统有些许差异,伙伴系统要求,列分的内存块必须等大。这里列分时,按照需要大小列分。例如一共有100个内存,分配10个内存,当前方法是直接列分成90 -- 10.而不是按照伙伴系统的 先列分成50 -50 在列分成25 -25 再列分成12 -13 。然后再分配。
    new_elem->state = ELEM_BUSY;  //分配的内存设置为 忙碌
    malloc_elem_free_list_insert(elem);  //将列分以后的内存块插入到空闲列表中

    return new_elem; //返回分配的内存块

malloc_elem_free_list_insert 函数,代码如下:

idx = malloc_elem_free_list_index(elem->size - MALLOC_ELEM_HEADER_LEN);//计算对数
elem->state = ELEM_FREE;  //设置状态为空闲
LIST_INSERT_HEAD(&elem->heap->free_head[idx], elem, free_list); //插入对应的列表中

2.dpdk内存释放 释放是调用的函数接口为rte_free,该函数直接调用malloc_elem_free函数,代码如下:

if (!malloc_elem_cookies_ok(elem) || elem->state != ELEM_BUSY)  判断内存是否被越界写,或者是否两次释放
            return -1;

    rte_spinlock_lock(&(elem->heap->lock));
    size_t sz = elem->size - sizeof(*elem);
    uint8_t *ptr = (uint8_t *)&elem[1];
    struct malloc_elem *next = RTE_PTR_ADD(elem, elem->size); //获取内存块的下一个相邻内存块元数据
    if (next->state == ELEM_FREE){   //如果下一个内存块也空闲,那么合并当前内存块和下一个内存块。
            /* remove from free list, join to this one */
            elem_free_list_remove(next);
            join_elem(elem, next);
            sz += sizeof(*elem);
    }

    /* check if previous element is free, if so join with it and return,
     * need to re-insert in free list, as that element's size is changing
     */
    if (elem->prev != NULL && elem->prev->state == ELEM_FREE) { //判断上一个内存块是否空闲,如果上一个内存块也空闲,将上一个内存块和当前内存块合并。
            elem_free_list_remove(elem->prev);
            join_elem(elem->prev, elem);
            sz += sizeof(*elem);
            ptr -= sizeof(*elem);
            elem = elem->prev;
    }
    malloc_elem_free_list_insert(elem); //将合并以后的内存块插入到空闲列表中共下次使用。

3.总结 dpdk的内存管理方式还是比较简单。主要体现在两个地方的改进。

第一,使用大页作为内存的载体。减少TLB刷新。

第二,改进了伙伴系统,将内存可以随意切分。而不是必须想伙伴系统一样,必须二分。这样改进,的确可以加快内存分配速度。(当系统只有大内存块时,需要分配小内存块,伙伴系统需要递归切分。而当前方式不需要递归)。但是会增加外部碎片的概率。可能dpdk是为快速处理数据包而设计的。不会大量保持内存,因此,外部碎片对程序影响不大。

第三,内存分配区分 socket。根据socket所在位置来分配内存,减少夸socket 内存访问。

分类
dpdk analysis

dpdk 内存 管理浅析—-内存管理初始化

内存相关命令行参数解析。 在dpdk程序中,最开始都需要调用rte_eal_init 函数用来初始化dpdk环境。在rte_eal_init中 关于初始化内存的管理的代码如下

eal_log_level_parse(argc, argv); //初始化 internal_config
fctret = eal_parse_args(argc, argv); // 解析命令行参数
if (internal_config.no_hugetlbfs == 0 &&
                internal_config.process_type != RTE_PROC_SECONDARY &&
                internal_config.xen_dom0_support == 0 &&
                eal_hugepage_info_init() < 0)//获取当前设备大页信息
        rte_panic("Cannot get hugepage information\n");
if (internal_config.memory == 0 && internal_config.force_sockets == 0) {
        if (internal_config.no_hugetlbfs)
                internal_config.memory = MEMSIZE_IF_NO_HUGE_PAGE;
}
eal_hugedirs_unlock();
if (rte_eal_memory_init() < 0)//初始化大页信息
     rte_panic("Cannot init memory\n");
eal_hugedirs_unlock();
if (rte_eal_memzone_init() < 0)//初始化内存管理
    rte_panic("Cannot init memzone\n");

1.初始化 internal_config 结构体 在 eal_log_level_parse 函数中调用eal_reset_internal_config 函数初始化代码如下:

void eal_reset_internal_config(struct internal_config *internal_cfg)
{
    int i;
    internal_cfg->memory=0;                                   //内存大小
    internal_cfg->force_nrank = 0;                            //内存 rank数量
    internal_cfg->force_nchannel = 0;                         //内存 channel 数量
    internal_cfg->hugefile_prefix = HUGEFILE_PREFIX_DEFAULT;
    internal_cfg->hugepage_dir = NULL;                       //大页目录
    internal_cfg->force_sockets = 0;                         //numa node 数量
    /* zero out the NUMA config */
    for (i = 0; i < RTE_MAX_NUMA_NODES; i++)
            internal_cfg->socket_mem[i] = 0;
    /* zero out hugedir descriptors */
    for (i = 0; i < MAX_HUGEPAGE_SIZES; i++)
            internal_cfg->hugepage_info[i].lock_descriptor = -1;
    internal_cfg->base_virtaddr = 0;
    internal_cfg->syslog_facility = LOG_DAEMON;
    /* default value from build option */
#if RTE_LOG_LEVEL >= RTE_LOG_DEBUG
    internal_cfg->log_level = RTE_LOG_INFO;
#else
    internal_cfg->log_level = RTE_LOG_LEVEL;
#endif

    internal_cfg->xen_dom0_support = 0;
    /* if set to NONE, interrupt mode is determined automatically */
    internal_cfg->vfio_intr_mode = RTE_INTR_MODE_NONE;
#ifdef RTE_LIBEAL_USE_HPET
    internal_cfg->no_hpet = 0;
#else
    internal_cfg->no_hpet = 1;
#endif
    internal_cfg->vmware_tsc_map = 0;
    internal_cfg->create_uio_dev = 0;
}
  1. 解析命令行参数填充internal_config 结构体

解析分为两部分,第一部分是通过调用eal_parse_common_option完成关键代码如下:

case 'm':                                   //总共使用多少M内存,
    conf->memory = atoi(optarg);
    conf->memory *= 1024ULL;
    conf->memory *= 1024ULL;
    mem_parsed = 1;
    break;
/* force number of channels */
case 'n':                                   //channel数量
    conf->force_nchannel = atoi(optarg);
    if (conf->force_nchannel == 0) {
           RTE_LOG(ERR, EAL, "invalid channel number\n");
           return -1;
    }
    break;
/* force number of ranks */
case 'r':                                //rank 数量
    conf->force_nrank = atoi(optarg);
    if (conf->force_nrank == 0 ||
        conf->force_nrank > 16) {
           RTE_LOG(ERR, EAL, "invalid rank number\n");
           return -1;
    }
    break;
case OPT_NO_HUGE_NUM:                   //是否开启大页
    conf->no_hugetlbfs = 1;
    break;

第二部分在internal_config中直接解析 关键代码

case OPT_HUGE_DIR_NUM:                         //获取大页的挂在目录
        internal_config.hugepage_dir = optarg;
        break;

case OPT_FILE_PREFIX_NUM:                     //设置大页文件的前缀
        internal_config.hugefile_prefix = optarg;
        break;

case OPT_SOCKET_MEM_NUM:                      //设置每个num node 上内存的数量
        if (eal_parse_socket_mem(optarg) < 0) {
              RTE_LOG(ERR, EAL, "invalid parameters for --"
                            OPT_SOCKET_MEM "\n");
                      eal_usage(prgname);
                      ret = -1;
                      goto out;
        }
        break;
  1. 获取设备当前大页信息 eal_hugepage_info_init

a.通过打开/sys/kernel/mm/hugepages目录。根据子目录的目录数量得出当前系统上有几种类型的大页。

b.通过目录名字获取每种类型的大页页面大小。 c.通过大小和/proc/mount文件确定每种类型的大页挂在在系统的那个位置。(没有挂载的大页类型不能用,如果用户指定了大页目录,只能使用指定的大页目录)

d.通过/sys/kernel/mm/hugepages/目录下每个子目录中空闲大页和保留大页数量 计算出 可用大页数量。并将可用数据全部记录在numa node 0上,后续会区分。

e.将所有信息记录在internal_config.hugepage_info数组中,并根据页面大小由大到小排序。 关键代码如下:

dir = opendir(sys_dir_path);
****
hpi = &internal_config.hugepage_info[num_sizes];
hpi->hugepage_sz =
       rte_str_to_size(&dirent->d_name[dirent_start_len]);
hpi->hugedir = get_hugepage_dir(hpi->hugepage_sz);
****
hpi->num_pages[0] = get_num_hugepages(dirent->d_name);
****
qsort(&internal_config.hugepage_info[0], num_sizes,
          sizeof(internal_config.hugepage_info[0]), compare_hpi);

4.初始化大页信息 rte_eal_memory_init 初始化大页信息时会判断当前进程是不是主进程,如果是则执行rte_eal_hugepage_init 进行大页初始化 a.没有大页情况,直接调用mmap 分配内存 代码如下

if (internal_config.no_hugetlbfs) {
         addr = mmap(NULL, internal_config.memory, PROT_READ | PROT_WRITE,
                         MAP_PRIVATE | MAP_ANONYMOUS, 0, 0);
         if (addr == MAP_FAILED) {
                RTE_LOG(ERR, EAL, "%s: mmap() failed: %s\n", __func__,
                        strerror(errno));
                return -1;
         }
         mcfg->memseg[0].phys_addr = (phys_addr_t)(uintptr_t)addr;
         mcfg->memseg[0].addr = addr;
         mcfg->memseg[0].hugepage_sz = RTE_PGSIZE_4K;
         mcfg->memseg[0].len = internal_config.memory;
         mcfg->memseg[0].socket_id = 0;
         return 0;
}

b.在有大页情况下。会根据前面获取 的大页信息,初始化大页

1).在挂在大页目录下创建mmap映射文件。并获取 映射后的虚拟地址。如果全部创建成功 说明内存足够,如部分创建失败,则说明内存不够。缩小大小

 pages_new = map_all_hugepages(&tmp_hp[hp_offset], hpi, 1);

2).通过虚拟内存地址获取物理内存地址读取/proc/self/pagemap文件可以获取。

 find_physaddrs(&tmp_hp[hp_offset], hpi)

3).通过读取/proc/self/numa_maps文件获取每一个大页所属node。

find_numasocket(&tmp_hp[hp_offset], hpi)

4).按照物理地址由小到大排序。

qsort(&tmp_hp[hp_offset], hpi->num_pages[0],
                  sizeof(struct hugepage_file), cmp_physaddr);

5).将物理地址连续的大页虚拟地址也连续映射。

map_all_hugepages(&tmp_hp[hp_offset], hpi, 0)

6).解除第一次映射的虚拟地址

unmap_all_hugepages_orig(&tmp_hp[hp_offset], hpi)

7).统计每个socket每种类型的大页数量

for (i = 0; i < nr_hugefiles; i++) {
     int socket = tmp_hp[i].socket_id;

     /* find a hugepage info with right size and increment num_pages */
     const int nb_hpsizes = RTE_MIN(MAX_HUGEPAGE_SIZES,
           (int)internal_config.num_hugepage_sizes);
     for (j = 0; j < nb_hpsizes; j++) {
           if (tmp_hp[i].size ==internal_config.hugepage_info[j].hugepage_sz) {
                internal_config.hugepage_info[j].num_pages[socket]++;
            }
    }
}

8).将以上收集的数据存储到共享内存中,以便其他程序使用。根据收集数据将连续的物理内存视作一个内存块,初始化 mem_config内存管理数据结构的memseg数组,

for (i = 0; i < nr_hugefiles; i++) {
      new_memseg = 0;

      /* if this is a new section, create a new memseg */
      if (i == 0)
             new_memseg = 1;
      else if (hugepage[i].socket_id != hugepage[i-1].socket_id)
             new_memseg = 1;
      else if (hugepage[i].size != hugepage[i-1].size)
             new_memseg = 1;

#ifdef RTE_ARCH_PPC_64
      /* On PPC64 architecture, the mmap always start from higher
      * virtual address to lower address. Here, both the physical
      * address and virtual address are in descending order */
      else if ((hugepage[i-1].physaddr - hugepage[i].physaddr) !=
          hugepage[i].size)
             new_memseg = 1;
      else if (((unsigned long)hugepage[i-1].final_va -
          (unsigned long)hugepage[i].final_va) != hugepage[i].size)
              new_memseg = 1;
#else
      else if ((hugepage[i].physaddr - hugepage[i-1].physaddr) !=
          hugepage[i].size)
              new_memseg = 1;
      else if (((unsigned long)hugepage[i].final_va -
          (unsigned long)hugepage[i-1].final_va) != hugepage[i].size)
              new_memseg = 1;
#endif

      if (new_memseg) {
             j += 1;
             if (j == RTE_MAX_MEMSEG)
                     break;

             mcfg->memseg[j].phys_addr = hugepage[i].physaddr;
             mcfg->memseg[j].addr = hugepage[i].final_va;
             mcfg->memseg[j].len = hugepage[i].size;
             mcfg->memseg[j].socket_id = hugepage[i].socket_id;
             mcfg->memseg[j].hugepage_sz = hugepage[i].size;
      }
      /* continuation of previous memseg */
      else {
#ifdef RTE_ARCH_PPC_64
      /* Use the phy and virt address of the last page as segment
       * address for IBM Power architecture */
              mcfg->memseg[j].phys_addr = hugepage[i].physaddr;
              mcfg->memseg[j].addr = hugepage[i].final_va;
#endif
              mcfg->memseg[j].len += mcfg->memseg[j].hugepage_sz;
      }
      hugepage[i].memseg_id = j;
}

5.内存管理初始化 rte_eal_memzone_init 该函数最后调用rte_eal_malloc_heap_init对内存进行初始化

rte_eal_malloc_heap_init函数根据memseg 属于哪个numa node 调用malloc_heap_add_memseg初始化memsege内存元素 并加入对应的numa node 堆中

malloc_heap_add_memseg初始化代码如下:

static void
malloc_heap_add_memseg(struct malloc_heap *heap, struct rte_memseg *ms)
{
    /* allocate the memory block headers, one at end, one at start */
    struct malloc_elem *start_elem = (struct malloc_elem *)ms->add //memseg中第一个元素
    struct malloc_elem *end_elem = RTE_PTR_ADD(ms->addr,   //memseg中最后一个元素
                    ms->len - MALLOC_ELEM_OVERHEAD);
    end_elem = RTE_PTR_ALIGN_FLOOR(end_elem, RTE_CACHE_LINE_SIZE);
    const size_t elem_size = (uintptr_t)end_elem - (uintptr_t)start_elem; //memseg中第一个元素大小

    malloc_elem_init(start_elem, heap, ms, elem_size); //初始化memseg中第一个元素大小,并与堆关联,初始化时会根据编译选项做debuge校验。
    malloc_elem_mkend(end_elem, start_elem);
    malloc_elem_free_list_insert(start_elem);          //根据元素大小,将元素插入到堆中空闲队列组中,空闲队列组的下表表示该队列中存放的空闲元素的大小大于2的下表次幂。
    heap->total_size += elem_size;          //堆的总大小增加
}

6.总结 dpdk的内存初始化,大致到此。下一节将讲述,dpdk如何分配和释放内存

分类
dpdk analysis

dlsym参数 RTLD_NEXT详解

最近在看dpdk的代码,看到examples/performance-thread/pthread_shim/pthread_shim.c文件 有一段宏

static void *__libc_dl_handle = RTLD_NEXT;
#define get_addr_of_loaded_symbol(name) do {                            \
        char *error_str;                                                \
        _sys_pthread_funcs.f_##name = dlsym(__libc_dl_handle, (#name)); \
        error_str = dlerror();                                          \
        if (error_str != NULL) {                                        \
                fprintf(stderr, "%s\n", error_str);                     \
        }                                                               \
} while (0)

宏里面 dlsym函数的用法让我觉得好奇怪,之前没见过。于是用man 查看了手册 RTLD_NEXT Find the next occurrence of the desired symbol in the search order after the current object. This allows one to provide a wrapper around a function in another shared object, so that, for example, the definition of a function in a preloaded shared object (see LD_PRELOAD in ld.so(8)) can find and invoke the “real” function provided in another shared object (or for that matter, the “next” definition of the function in cases where there are multiple lay‐ ers of preloading). 大概意思就是说,传入这个参数,找到的函数指针是后面第一次出现这个函数名的函数指针。理解上有点模糊,我写了几行代码加深理解。 文件 first_one.c

#include <stdio.h>
void print_message()
{
    printf("the first lib~~\n");
}
void first()
{
    printf("init first\n");
}

编译成动态库

gcc -fpic -c first_one.c
gcc --share first_one.o -o libfirst_one.so

文件 second_one.c

#include <stdio.h>
void print_message()
{
    printf("the second lib~~\n");
}

void second()
{
    printf("init second \n");
}

编译成动态库

gcc -fpic -c second_one.c
gcc --share second_one.o -o libsecond_one.so

文件 wrap.c

# define RTLD_NEXT      ((void *) -1l)
#include <stdio.h>
#include <dlfcn.h>
#include <errno.h>
void(*f)();
void load_func() __attribute__((constructor));
void load_func()
{
    f = (void(*)())dlsym(RTLD_NEXT,"print_message");
    char *error_str;
    error_str = dlerror();
    if (error_str != NULL) {
        printf("%s\n", error_str);
    }
    printf("load func first f=%p\n",f);

}
void print_message()
{
    printf("the wrap lib~~\n");
    f();
}

编译成动态库

gcc -fpic -c wrap.c
gcc --share wrap.o -o libwrap.so

文件 main.c

void print_message();
void first();
void second();
int main()
{
    first();
    second();
    print_message();
    return 0;
}

编译成 目标文件

gcc -c main.c

第一种方式生成链接文件

gcc -o first main.o  -lwrap -lfirst_one  -lsecond_one -ldl -L.

第二种方式生成连接文件

gcc -o second main.o  -lwrap -lsecond_one -lfirst_one  -ldl -L.

设置执行时环境变量

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:.

查看库加载顺序

duanxc@DESKTOP-LVGREDM:~/dlysm$ ldd first
    linux-vdso.so.1 =>  (0x00007fffee2e9000)
    libwrap.so (0x00007f586c1a0000)
    libfirst_one.so (0x00007f586bf90000)
    libsecond_one.so (0x00007f586bd80000)
    libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007f586bb70000)
    libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007f586b7a0000)
    /lib64/ld-linux-x86-64.so.2 (0x00007f586c400000)

duanxc@DESKTOP-LVGREDM:~/dlysm$ ldd second
    linux-vdso.so.1 =>  (0x00007fffc2321000)
    libwrap.so (0x00007fddd1d40000)
    libsecond_one.so (0x00007fddd1b30000)
    libfirst_one.so (0x00007fddd1910000)
    libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fddd1700000)
    libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fddd1330000)
    /lib64/ld-linux-x86-64.so.2 (0x00007fddd2000000)

查看执行结果

duanxc@DESKTOP-LVGREDM:~/dlysm$ ./first
load func first f=0x7f600f3e06c0
init first
init second
the wrap lib~~
the first lib~~

duanxc@DESKTOP-LVGREDM:~/dlysm$ ./second
load func first f=0x7f92457e06c0
init first
init second
the wrap lib~~
the second lib~~

分析 库libfirst_one.so 和库libsecond_one.so中都有print_message函数,根据库的加载顺序,出入RTLD_NEXT作为句柄的dlsym函数会返回对应的函数指针,当libfirst_one.so先加载则返回这个库中的函数地址,反之则返回libsecond_on.so中的函数指针。 如上例,我们可以通过这个特性对libc中的库函数进行封装,用于测试目的。如wrap.c中所写,先获取libc中想要封装的函数名称的指针,然后在wrap.c中写对应函数名实现。在实现时,先做一些记录或者统计,然后再通过指针调用libc中的函数。当程序开发完毕,不需要记录或统计值时,在连接时,不连接wrap库即可。