Nov 2

glibc的qsort源码阅读笔记 不指定

felix021 @ 2010-11-2 21:42 [IT » 程序设计] 评论(1) , 引用(0) , 阅读(23368) | Via 本站原创 | |
C标准中的qsort函数,可以对任意类型的数组进行快速排序,用起来也还算方便,不过行为有一点诡异。加上之前曾经看过一篇文章(沈大写的,曾经登上过ecom的双周刊,居然在这个youalab.com也等出来了-。-),说是qsort不是线程安全的,所以把glibc的代码翻出来,看看qsort的具体实现。

用Ubuntu的话,找代码就很容易了:
引用
$ apt-get source libc6
$ cd eglibc-xxxx
$ grep -r . -Hne qsort | grep void

可以看到qsort是在stdlib.h里面的,源码在msort.c 302行,调用了qsort_r函数:
void
qsort (void *b, size_t n, size_t s, __compar_fn_t cmp)
{
  return qsort_r (b, n, s, (__compar_d_fn_t) cmp, NULL);                                                     
}
其中__compar_fn_t就是 int (*)(const void *, const void *)类型的函数指针,__compar_d_fn_t则是 int (*)(const void *, const void *, void *)类型的函数指针(为什么还有个void *呢?很神奇~求解)。

qsort_r函数也在msort.c中,约164行起。为了榨干CPU的性能,写了很多代码来优化效率,其流程大致是:

1. 判断额外所需内存大小,如果很小(<1024B),在栈上分配;否则获取系统内存参数——如果比可用内存小,试图在堆上分配;如果所需内存超过物理内存(为防出现不得不使用交换分区的情况致使性能恶化,故不分配),或者分配失败(可用空间不足),使用性能较差的stdlib/qsort.c中的_quicksort函数来排序。
    1.1. _quicksort位于 stdlib/qsort.c,对快排进行了两个改进:1. 将递归转化为递推; 2. 使用插入排序来处理4个元素以内的区间。不过用于交换两个元素的宏SWAP(a, b, size)没有额外的优化,比较意外(本来以为会考虑一次多个字节拷贝,或者使用duff device来减少循环)。

2. 分配成功的情况下,使用额外分配的空间,调用 msort_with_tmp() 函数来进行排序。
    2.1. 如果单个元素的大小超过32个字节,那么使用间接排序,分配空间时就有额外的判断,如果需要间接排序,则额外分配2n个指针的空间,前n个空间用于存放原指针,后n个空间用于按照所指元素大小存放排序好的指针。最后再按照排序好的指针顺序将元素拷贝。(注释中提到了Knuth vol. 3 (2nd ed.) exercise 5.2-10.,应该是指这个算法出自Knuth的《The Art of Computer Programming》卷3吧)
    2.2. 否则使用直接排序。这里也进行了一些优化,主要是为msort_with_tmp()提供p.var参数,完成针对元素的大小进行拷贝的优化:默认是使用gcc内置的__builtin_mempcpy来拷贝。
    2.3. msort_with_tmp对典型的快排也进行了非常赞的优化。
    2.3.1. 从msort_with_tmp的函数名和代码可以看出,这个算法不是就地排序,而是使用了第一步分配的O(N)的额外空间,这样可以让排序时的拷贝效率更高(这个优化应该对CPU的cache命中率也有较大的提高)。
    2.3.2. qsort_r传进来的参数p有个属性var
    (1) 默认情况下p.var = 4,使用mempcpy来进行拷贝。
    (2) 如果单个元素大于32bytes,p.var = 3, 表示是间接排序,拷贝的是指针,不是元素
    (3) 如果数组是按uint32_t对其的,且元素的大小是uint32_t的倍数,则可以进一步优化到每次按照uint32_t/uint64_t/unsigned long来进行拷贝,一次拷贝4或者8个字节,此时p.var = 0,1,2表示按照uint32_t, uint64_t, unsigned long拷贝。

直接在里头做了些注释,有兴趣的话可以更仔细地看看。可以的话自己去下了glibc的源码阅读,更有意思:D

struct msort_param
{
  size_t s;
  size_t var;
  __compar_d_fn_t cmp;
  void *arg;
  char *t;
};

void qsort_r (void *b, size_t n, size_t s, __compar_d_fn_t cmp, void *arg)
{
    size_t size = n * s; //计算需要的额外空间的大小
    char *tmp = NULL;
    struct msort_param p;

    /* For large object sizes use indirect sorting.  */
    if (s > 32) size = 2 * n * sizeof (void *) + s;
    //如果单个元素很大(>32bytes),则用间接的排序,需要内存更多
    //间接排序的意思是,交换的时候只交换指针,最后再把元素按照
    //排好的指针顺序拷贝。这样可以减少大量的拷贝时间。

    if (size < 1024) //对于小数组,用alloca,在栈上分配临时内存
        /* The temporary array is small, so put it on the stack.  */
        p.t = __alloca (size);
    else
    {
        //获取一些系统内存的参数。如果消耗太多空间,可能会用
        //到虚拟内存,使得性能急剧下降
        /* We should avoid allocating too much memory since this might
          have to be backed up by swap space.  */
        static long int phys_pages; //物理内存页数
        static int pagesize; //页大小(字节/页)

        if (phys_pages == 0)
        {
            phys_pages = __sysconf (_SC_PHYS_PAGES);
            //__sysconf函数在sysdeps/posix/sysconf.c中
            //_SC_PHYS_PAGES对应到函数__get_phys_pages()
            //位于文件sysdeps/unix/sysv/linux/getsysstats.c中
            //通过phys_pages_info()打开/proc/meminfo来读取内存信息
            //(这就定位到了qsort打开文件的问题)

            if (phys_pages == -1)
                /* Error while determining the memory size.  So let's
                  assume there is enough memory.  Otherwise the
                  implementer should provide a complete implementation of
                  the `sysconf' function.  */
                phys_pages = (long int) (~0ul >> 1);

            /* The following determines that we will never use more than
              a quarter of the physical memory.  */
            phys_pages /= 4;

            pagesize = __sysconf (_SC_PAGESIZE);
        }
        //注意,上面这一段if会产生竞争,出现线程安全安全:
        //如果两个线程都调用qsort,当线程1获取了phys_pages之后,线程2
        //才到达if,线程2就会跳过这一段代码,直接执行下面的if语句——
        //而此时pagesize尚未初始化(=0),于是就会出现除零错误,导致
        //core dump

        /* Just a comment here.  We cannot compute (phys_pages * pagesize)
          and compare the needed amount of memory against this value.
          The problem is that some systems might have more physical
          memory then can be represented with a `size_t' value (when
          measured in bytes.  */

        /* If the memory requirements are too high don't allocate memory.  */
        //如果所需的内存页数大于总的可用内存,则不分配内存(防止swap降低性能)
        if (size / pagesize > (size_t) phys_pages)
        {
            //直接使用stdlib/qsort.c中的 _quicksort 进行排序
            _quicksort (b, n, s, cmp, arg);
            return;
        }

        /* It's somewhat large, so malloc it.  */
        //内存大于阈值(1024),小于可用内存,因此应当用malloc,在堆上分配
        int save = errno;
        tmp = malloc (size);
        __set_errno (save);
        if (tmp == NULL)
        {
            /* Couldn't get space, so use the slower algorithm
              that doesn't need a temporary array.  */
            //分配内存失败,使用 _quicksort 进行排序
            _quicksort (b, n, s, cmp, arg);
            return;
        }
        p.t = tmp;
    }

    p.s = s;
    p.var = 4; //这个属性决定在排序时如何最大化利用CPU来拷贝
              //单个元素。默认是逐字节拷贝。
    p.cmp = cmp;
    p.arg = arg;

    if (s > 32)
    {
        /* Indirect sorting.  */
        //间接排序:先把各个元素的指针进行排序,然后再将元素逐个拷贝。
        char *ip = (char *) b;
        void **tp = (void **) (p.t + n * sizeof (void *));
        void **t = tp;
        void *tmp_storage = (void *) (tp + n);

        while ((void *) t < tmp_storage)
        {
            *t++ = ip;
            ip += s;
        }
        p.s = sizeof (void *);
        p.var = 3;
        msort_with_tmp (&p, p.t + n * sizeof (void *), n);

        /* tp[0] .. tp[n - 1] is now sorted, copy around entries of
          the original array.  Knuth vol. 3 (2nd ed.) exercise 5.2-10.  */
        char *kp;
        size_t i;
        for (i = 0, ip = (char *) b; i < n; i++, ip += s)
            if ((kp = tp[i]) != ip)
            {
                size_t j = i;
                char *jp = ip;
                memcpy (tmp_storage, ip, s);

                do
                {
                    size_t k = (kp - (char *) b) / s;
                    tp[j] = jp;
                    memcpy (jp, kp, s);
                    j = k;
                    jp = kp;
                    kp = tp[k];
                }
                while (kp != ip);

                tp[j] = jp;
                memcpy (jp, tmp_storage, s);
            }
    }
    else
    {
        if ((s & (sizeof (uint32_t) - 1)) == 0
                && ((char *) b - (char *) 0) % __alignof__ (uint32_t) == 0)
        {
            //如果元素是sizeof(uint32_t)的倍数,且数组是uint32_t对其的
            //那么可以充分利用内存总线,一次拷贝4个、8个字节。
            if (s == sizeof (uint32_t))
                p.var = 0; //使用uint32_t来赋值拷贝,一次4个字节
            else if (s == sizeof (uint64_t)
                    && ((char *) b - (char *) 0) % __alignof__ (uint64_t) == 0)
                p.var = 1; //uint64_t,一次64个字节
            else if ((s & (sizeof (unsigned long) - 1)) == 0
                    && ((char *) b - (char *) 0)
                    % __alignof__ (unsigned long) == 0)
                p.var = 2; //一次sizeof(ulong)个字节, x86=4, x86_64=8
        }
        msort_with_tmp (&p, b, n); //递归地排序
    }
    free (tmp);
}




欢迎扫码关注:




转载请注明出自 ,如是转载文则注明原出处,谢谢:)
RSS订阅地址: https://www.felix021.com/blog/feed.php
treblih Email
2010-11-5 10:15
当然有用 感谢分享
分页: 1/1 第一页 1 最后页
发表评论
表情
emotemotemotemotemot
emotemotemotemotemot
emotemotemotemotemot
emotemotemotemotemot
emotemotemotemotemot
打开HTML
打开UBB
打开表情
隐藏
记住我
昵称   密码   *非必须
网址   电邮   [注册]