Jul
17
前几天讨论遇到一个涉及区间覆盖的数据统计,发现很适合使用线段树来解决,于是重新回顾了一下这个好几年前学过的东西,凭着残存的理解,好了好久才勉强写了出来,感觉自己确实是没有搞算法的天赋,在边界处理的时候磕磕碰碰的,需要改好几次才能写对,不够干净利落。
不过能从繁杂的业务中抽出来写写纯粹的数据结构和算法,有点回到学校的状态,感觉也蛮不错。
以前在学校折腾算法的时候,从yyt同学的分享的ppt学到了这个数据结构,印象比较深的是,ppt上说,对于一个长度是 x 的线段,使用数组(元素 i 的左右节点分别是 2*i 和 2*i+1 )来记录的话,需要大小约为 3*x 的数组,但是在实际做题的时候却发现越界了,后来仔细去挖这个地方,才发现,其实应该是找到一个 y = 2^n 满足 2^(n-1) < x <= 2^n,所需的数组长度为 2y 。
这次是先写了一个C++的class(偷懒用的struct),配合一些c-style的函数,让python用ctypes载入使用。然后兴起写了个Python的版本对比,跑了个简单的case,发现性能居然相差200+倍,做了一些改进,才领悟到,对于python来说,其实用数组建树比起直接用对象指针关联建树并没有太大优势,而用对象建树的好处是,如果不是极端情况,可以lazy load左右子树(当然,数组也可以lazy initialization子节点,但不能减少内存占用)。改起来也不难,于是就验证了一下,效果相当好,甚至比C++还快(因为测试case太简单,几乎没有展开子树),此外这种方式节点的数量可以减少到2 * x - 1(但是相应地每个节点需要增加指针)。
另外遇到一个问题是迭代,Python的迭代器如果使用Generator语法(yield),写起来和用起来都特别自然,可是到C++就完全不一样了,形式上想要达到类似的效果比较累,保存和还原现场比较辛苦(不过这个case还好),试着实现了一个版本,但是需要遍历所有的节点感觉不太好,后来还是改成了偷懒的写法(直接生成整个结果集,在结果集上迭代)。
最后,不成熟的小代码放在了这里:https://github.com/felix021/mycodes/tree/master/segtree
[update] 到数据集上实际跑了一下,C++版还是跑赢了几倍的速度,这还是没有做lazy init的情况,回头抽空再写个版本验证一下吧~
不过能从繁杂的业务中抽出来写写纯粹的数据结构和算法,有点回到学校的状态,感觉也蛮不错。
以前在学校折腾算法的时候,从yyt同学的分享的ppt学到了这个数据结构,印象比较深的是,ppt上说,对于一个长度是 x 的线段,使用数组(元素 i 的左右节点分别是 2*i 和 2*i+1 )来记录的话,需要大小约为 3*x 的数组,但是在实际做题的时候却发现越界了,后来仔细去挖这个地方,才发现,其实应该是找到一个 y = 2^n 满足 2^(n-1) < x <= 2^n,所需的数组长度为 2y 。
这次是先写了一个C++的class(偷懒用的struct),配合一些c-style的函数,让python用ctypes载入使用。然后兴起写了个Python的版本对比,跑了个简单的case,发现性能居然相差200+倍,做了一些改进,才领悟到,对于python来说,其实用数组建树比起直接用对象指针关联建树并没有太大优势,而用对象建树的好处是,如果不是极端情况,可以lazy load左右子树(当然,数组也可以lazy initialization子节点,但不能减少内存占用)。改起来也不难,于是就验证了一下,效果相当好,甚至比C++还快(因为测试case太简单,几乎没有展开子树),此外这种方式节点的数量可以减少到2 * x - 1(但是相应地每个节点需要增加指针)。
另外遇到一个问题是迭代,Python的迭代器如果使用Generator语法(yield),写起来和用起来都特别自然,可是到C++就完全不一样了,形式上想要达到类似的效果比较累,保存和还原现场比较辛苦(不过这个case还好),试着实现了一个版本,但是需要遍历所有的节点感觉不太好,后来还是改成了偷懒的写法(直接生成整个结果集,在结果集上迭代)。
最后,不成熟的小代码放在了这里:https://github.com/felix021/mycodes/tree/master/segtree
[update] 到数据集上实际跑了一下,C++版还是跑赢了几倍的速度,这还是没有做lazy init的情况,回头抽空再写个版本验证一下吧~
Oct
29
转置二维数组:
utf-8字符串转为utf-8字符数组:
按显示宽度截取utf-8字符串
让进程在后台运行(detached process),出乎意料地简单
function transpose($array) {
array_unshift($array, null);
return call_user_func_array('array_map', $array);
}
array_unshift($array, null);
return call_user_func_array('array_map', $array);
}
utf-8字符串转为utf-8字符数组:
function utf8_str2arr($str)
{
preg_match_all("/./u", $str, $arr);
return $arr[0];
}
{
preg_match_all("/./u", $str, $arr);
return $arr[0];
}
按显示宽度截取utf-8字符串
function substr_width($str, $start, $width)
{
$arr = utf8_str2arr($str);
$arr_ret = [];
$i = 0;
while ($width > 0 and $i < count($arr))
{
$arr_ret[] = $arr[$start + $i];
if (strlen($arr_ret[$i]) == 1) //ascii,width=1
$width -= 1;
else
$width -= 2;
$i++;
}
if ($width < 0)
array_pop($arr_ret);
return join('', $arr_ret);
}
{
$arr = utf8_str2arr($str);
$arr_ret = [];
$i = 0;
while ($width > 0 and $i < count($arr))
{
$arr_ret[] = $arr[$start + $i];
if (strlen($arr_ret[$i]) == 1) //ascii,width=1
$width -= 1;
else
$width -= 2;
$i++;
}
if ($width < 0)
array_pop($arr_ret);
return join('', $arr_ret);
}
让进程在后台运行(detached process),出乎意料地简单
pclose(popen("nohup $cmd &", 'r'));
Jul
30
简单地说,MPI 就是个并行计算框架,模型也很直接——就是多进程。和hadoop不同,它不提供计算任务的map和reduce,只提供了一套通信接口,需要程序员来完成这些任务;它也不提供冗余容错等机制,完全依赖于其下层的可靠性。但是因为把控制权几乎完全交给了程序员,所以有很大的灵活性,可以最大限度地榨取硬件性能。超级计算机上的运算任务,基本上都是使用MPI来开发的。
~ 下载编译安装:
现在貌似一般都用MPICH,开源的MPI库,可以从这里获取: http://www.mpich.org/ ,现在的最新版本是3.0.4,编译安装过程可以参考安装包里的README的说明,基本步骤如下(万恶的configure):
$ wget http://www.mpich.org/static/downloads/3.0.4/mpich-3.0.4.tar.gz
$ tar zxf mpich-3.0.4.tar.gz
$ cd mpich-3.0.4
$ mkdir ~/mpich
$ ./configure --prefix=$HOME/mpich --disable-f77 --disable-fc 2>&1 | tee c.txt #我禁用了fortran的支持
$ make -j4 2>&1 | tee m.txt
$ make install 2>&1 | tee i.txt
$ echo 'export PATH=$PATH:~/mpich/bin' >> ~/.bashrc
下面给出三个例子,参考教程:http://wenku.baidu.com/view/ee8bf3390912a216147929f3.html (注:22页有BUG,它把 MPI_Comm_XXX 错写成了 MPI_Common_xxx //包括全大写版本,共四处),给出了MPI框架中最常用、最基础的6个API的例子。更复杂的API可以参考mpich的手册。这些例子只是简单地演示了MPI框架的使用;实际上在使用MPI开发并行计算的软件时,还需要考虑到很多方面的问题,这里就不展开说了(其实真相是我也不会-.-,有兴趣的话可以请教 @momodi 和 @dumbear 两位)。
1. 最简单的:Hello world
代码如下: hello.c
编译:
$ mpicc -o hello hello.c
运行:
$ mpiexec -n 4 ./hello
Hello world!
Hello world!
Hello world!
Hello world!
可以看到这里启动了4个进程。注意 -n 和 4 之间一定要有空格,否则会报错。
2. 进程间通信
MPI最基本的通信接口是 MPI_Send/MPI_Recv:
编译运行:
$ mpicc comm.c
$ mpiexec -n 4 ./a.out
I'm 0 of 4
from 1: hello from 1
from 2: hello from 2
I'm 1 of 4
I'm 2 of 4
from 3: hello from 3
I'm 3 of 4
3. 来个复杂点的:数数前1亿个自然数里有几个 雷劈数
代码后附,答案是97(真少),不过这不是重点,重点是MPI对硬件的利用率是怎样 :D
测试机器是 16核 AMD Opteron 6128HE @2GHz,32G内存
单进程(无MPI版本):56.9s
4进程:15.3s
8进程:7.85s
12进程:5.35s
考虑到16核跑满可能会受到其他进程的影响(性能不稳定,4.2~4.9s),这个数据就不列进来比较了。
可以看出来,在这个例子里,因为通信、同步只有在计算完之后才有那么一点点,所以在SMP架构下,耗费的时间基本上是跟进程数成反比的,说明MPI框架对硬件性能的利用率还是相当高的。
具体代码如下:
~ 下载编译安装:
现在貌似一般都用MPICH,开源的MPI库,可以从这里获取: http://www.mpich.org/ ,现在的最新版本是3.0.4,编译安装过程可以参考安装包里的README的说明,基本步骤如下(万恶的configure):
引用
$ wget http://www.mpich.org/static/downloads/3.0.4/mpich-3.0.4.tar.gz
$ tar zxf mpich-3.0.4.tar.gz
$ cd mpich-3.0.4
$ mkdir ~/mpich
$ ./configure --prefix=$HOME/mpich --disable-f77 --disable-fc 2>&1 | tee c.txt #我禁用了fortran的支持
$ make -j4 2>&1 | tee m.txt
$ make install 2>&1 | tee i.txt
$ echo 'export PATH=$PATH:~/mpich/bin' >> ~/.bashrc
下面给出三个例子,参考教程:http://wenku.baidu.com/view/ee8bf3390912a216147929f3.html (注:22页有BUG,它把 MPI_Comm_XXX 错写成了 MPI_Common_xxx //包括全大写版本,共四处),给出了MPI框架中最常用、最基础的6个API的例子。更复杂的API可以参考mpich的手册。这些例子只是简单地演示了MPI框架的使用;实际上在使用MPI开发并行计算的软件时,还需要考虑到很多方面的问题,这里就不展开说了(其实真相是我也不会-.-,有兴趣的话可以请教 @momodi 和 @dumbear 两位)。
1. 最简单的:Hello world
代码如下: hello.c
#include <stdio.h>
#include <mpi.h>
int main(int argc, char *argv[])
{
MPI_Init(&argc, &argv); //初始化MPI环境
printf("Hello world!\n");
MPI_Finalize(); //结束MPI环境
return 0;
}
#include <mpi.h>
int main(int argc, char *argv[])
{
MPI_Init(&argc, &argv); //初始化MPI环境
printf("Hello world!\n");
MPI_Finalize(); //结束MPI环境
return 0;
}
编译:
$ mpicc -o hello hello.c
运行:
$ mpiexec -n 4 ./hello
Hello world!
Hello world!
Hello world!
Hello world!
可以看到这里启动了4个进程。注意 -n 和 4 之间一定要有空格,否则会报错。
2. 进程间通信
MPI最基本的通信接口是 MPI_Send/MPI_Recv:
#include <stdio.h>
#include <mpi.h>
int main(int argc, char *argv[])
{
int myid, numprocs, source, msg_tag = 0;
char msg[100];
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs); //共启动几个进程
MPI_Comm_rank(MPI_COMM_WORLD, &myid); //当前进程的编号(0~n-1)
printf("I'm %d of %d\n", myid, numprocs);
if (myid != 0)
{
int len = sprintf(msg, "hello from %d", myid);
MPI_Send(msg, len, MPI_CHAR, 0, msg_tag, MPI_COMM_WORLD); //向id=0的进程发送信息
}
else
{
for (source = 1; source < numprocs; source++)
{
//从id=source的进程接受消息
MPI_Recv(msg, 100, MPI_CHAR, source, msg_tag, MPI_COMM_WORLD, &status);
printf("from %d: %s\n", source, msg);
}
}
MPI_Finalize();
return 0;
}
#include <mpi.h>
int main(int argc, char *argv[])
{
int myid, numprocs, source, msg_tag = 0;
char msg[100];
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs); //共启动几个进程
MPI_Comm_rank(MPI_COMM_WORLD, &myid); //当前进程的编号(0~n-1)
printf("I'm %d of %d\n", myid, numprocs);
if (myid != 0)
{
int len = sprintf(msg, "hello from %d", myid);
MPI_Send(msg, len, MPI_CHAR, 0, msg_tag, MPI_COMM_WORLD); //向id=0的进程发送信息
}
else
{
for (source = 1; source < numprocs; source++)
{
//从id=source的进程接受消息
MPI_Recv(msg, 100, MPI_CHAR, source, msg_tag, MPI_COMM_WORLD, &status);
printf("from %d: %s\n", source, msg);
}
}
MPI_Finalize();
return 0;
}
编译运行:
$ mpicc comm.c
$ mpiexec -n 4 ./a.out
I'm 0 of 4
from 1: hello from 1
from 2: hello from 2
I'm 1 of 4
I'm 2 of 4
from 3: hello from 3
I'm 3 of 4
3. 来个复杂点的:数数前1亿个自然数里有几个 雷劈数
代码后附,答案是97(真少),不过这不是重点,重点是MPI对硬件的利用率是怎样 :D
测试机器是 16核 AMD Opteron 6128HE @2GHz,32G内存
单进程(无MPI版本):56.9s
4进程:15.3s
8进程:7.85s
12进程:5.35s
考虑到16核跑满可能会受到其他进程的影响(性能不稳定,4.2~4.9s),这个数据就不列进来比较了。
可以看出来,在这个例子里,因为通信、同步只有在计算完之后才有那么一点点,所以在SMP架构下,耗费的时间基本上是跟进程数成反比的,说明MPI框架对硬件性能的利用率还是相当高的。
具体代码如下:
#include <stdio.h>
#include <mpi.h>
int is_lp(long long x)
{
long long t = x * x, i = 10;
while (i < t)
{
long long l = t / i, r = t % i;
if (l + r == x)
return 1;
i *= 10;
}
return 0;
}
int main(int argc, char *argv[])
{
int myid, numprocs, source;
const int N = 100000000;
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
printf("I'm %d of %d\n", myid, numprocs);
int start = myid * (N / numprocs), stop = (myid + 1) * (N / numprocs);
if (myid == numprocs - 1)
stop = N;
printf("start from %d to %d\n", start, stop);
int ans = 0, i;
for (i = start; i < stop; i++)
if (is_lp(i))
ans += 1;
printf("%d finished calculation with %d numbers\n", myid, ans);
if (myid != 0)
{
MPI_Send(&ans, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);
}
else
{
int tmp;
for (source = 1; source < numprocs; source++)
{
MPI_Recv(&tmp, 1, MPI_INT, source, 0, MPI_COMM_WORLD, &status);
printf("from %d: %d\n", source, tmp);
ans += tmp;
}
printf("final ans: %d\n", ans);
}
MPI_Finalize();
return 0;
}
#include <mpi.h>
int is_lp(long long x)
{
long long t = x * x, i = 10;
while (i < t)
{
long long l = t / i, r = t % i;
if (l + r == x)
return 1;
i *= 10;
}
return 0;
}
int main(int argc, char *argv[])
{
int myid, numprocs, source;
const int N = 100000000;
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
printf("I'm %d of %d\n", myid, numprocs);
int start = myid * (N / numprocs), stop = (myid + 1) * (N / numprocs);
if (myid == numprocs - 1)
stop = N;
printf("start from %d to %d\n", start, stop);
int ans = 0, i;
for (i = start; i < stop; i++)
if (is_lp(i))
ans += 1;
printf("%d finished calculation with %d numbers\n", myid, ans);
if (myid != 0)
{
MPI_Send(&ans, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);
}
else
{
int tmp;
for (source = 1; source < numprocs; source++)
{
MPI_Recv(&tmp, 1, MPI_INT, source, 0, MPI_COMM_WORLD, &status);
printf("from %d: %d\n", source, tmp);
ans += tmp;
}
printf("final ans: %d\n", ans);
}
MPI_Finalize();
return 0;
}
Jul
29
nth_element是一个用烂了的面试题,09年的时候我也曾经跑过一点数据(回头一看好像有点不太对,那时候CPU有那么慢吗?应该是当时没有把读取数据的时间给去掉吧),昨天看到 从一道笔试题谈算法优化(上) 和 从一道笔试题谈算法优化(下) 这个系列,里面提到了从简单到复杂的6种算法,根据作者的说法,经过各种优化以后solution6达到了相当的效率。受到作者启发,我也想到了一种算法,于是花了些时间,把常用的几种算法实现了进行对比(包括对比stl里的nth_element)。
回到 nth_element 的问题:从 n 个数里头,找出其中的 top k (top可以是找最大 也可以是找最小)。简单起见,后面都以找最小为例。
那篇文章里的后面的3种算法和我的算法都是基于它的solution3进行优化的,所以先介绍一下solution3:
1. 将 n 个数的前 k 个拷贝出来
2. 找出这 k 个数的最大值 m
3. 对于每一个剩下的 n - k 个数 i :如果 i 小于 m ,将 m 替换为 i ,然后再找出新的 k 个数中的最大值 m
4. 返回过滤后得到的 k 个数
虽然第3步的判断条件成立时,这一步是 O(k) 的复杂度,但是在大部分情况下,这个判断条件都不成立,所以它需要执行的次数很少(这种效果越往后越明显,因为 m 的值越来越小,可以滤掉更多的数)。但是对于极端情况(或者接近极端的情况)——如果数组完全是降序排列的话,那么这个条件每次都会成立,会导致算法退化成 O(n * m) ,性能就不可接受了。
solution4~6的优化我觉得读起来有点晦涩,有兴趣的同学可以自己去看,这里不展开说了。它们都存在算法退化的情况。
我的算法可能理解起来要简单些,基本思路是偷懒:把缓冲区设置为 2 * k ,在扫数组的时候,如果这个数比当前保留的最大值还要小,就把它塞进缓冲区,直到缓冲区塞满了,再排序、取最大值、删多余的数。
1. 开辟一个 2 × k 的临时数组 r
2. 将 N[1..k] 拷贝到 r[1..k],并记录 r 的末尾 e = k + 1
3. 找出 r[1..k] 中的最大值 m
4. 对于每一个剩下的 n - k 个数 i:
4.1 如果 i 小于 m,将 m 塞到 r 的末尾 r[e];e = e + 1
4.1.1 如果 e == 2×k(r被塞满了),对 r 进行排序,得出前k个数中的最大值;抛弃后面的k个数,e = k + 1
5. 对 r[1..e] 进行排序
6. 返回 r[1..k]
这个算法的实际效果很好,在 n = 1亿、k = 1万 的情况下,对于随机的n个数,4.1条件成立的次数通常只有十几次,几乎可以忽略,因此大约只需要扫描整个数组时间的2~3倍即可。
但是这个算法同样存在退化的问题:如果全都是倒序排列的话,也变成O(n*m)了。幸而解决方案也很简单:对这个数组进行 n / 100 次的随机化(考虑到随机化耗时也比较大,100这个数字是试了几次以后发现的合适值,不一定最优,但是都差不多了):
p.s. 很不幸的是随机化这个方法对于solution3来说虽然提升也很明显,但是远远不够。solution4~6也不太行。
这里给出随机情况下和完全倒序情况下的性能对比吧(ubuntu 12.04 x86_64,i5 2400@3.1G):
补充说明一下,nth_element算法在面试的时候可能给出的 n 会更大,以至于在内存中存不下,这时候通常会认为用堆来实现最好——因为只要O(k)的空间就可以搞定,而且最差时间复杂度是O(logk * n),但是要注意logk的常数实际上是很大的,所以你可以看到前面的数据里 堆算法 的效率并不是最好的。事实上这里给出的所有算法(不考虑随机化的话)也都只需要O(k)的空间;如果需要随机化的话也很简单,分段处理,只要保证每段能在内存中保存下来就行了。
最后上代码存档(为了方便测试用了些全局变量,看起来可能有点挫):
回到 nth_element 的问题:从 n 个数里头,找出其中的 top k (top可以是找最大 也可以是找最小)。简单起见,后面都以找最小为例。
那篇文章里的后面的3种算法和我的算法都是基于它的solution3进行优化的,所以先介绍一下solution3:
1. 将 n 个数的前 k 个拷贝出来
2. 找出这 k 个数的最大值 m
3. 对于每一个剩下的 n - k 个数 i :如果 i 小于 m ,将 m 替换为 i ,然后再找出新的 k 个数中的最大值 m
4. 返回过滤后得到的 k 个数
虽然第3步的判断条件成立时,这一步是 O(k) 的复杂度,但是在大部分情况下,这个判断条件都不成立,所以它需要执行的次数很少(这种效果越往后越明显,因为 m 的值越来越小,可以滤掉更多的数)。但是对于极端情况(或者接近极端的情况)——如果数组完全是降序排列的话,那么这个条件每次都会成立,会导致算法退化成 O(n * m) ,性能就不可接受了。
solution4~6的优化我觉得读起来有点晦涩,有兴趣的同学可以自己去看,这里不展开说了。它们都存在算法退化的情况。
我的算法可能理解起来要简单些,基本思路是偷懒:把缓冲区设置为 2 * k ,在扫数组的时候,如果这个数比当前保留的最大值还要小,就把它塞进缓冲区,直到缓冲区塞满了,再排序、取最大值、删多余的数。
1. 开辟一个 2 × k 的临时数组 r
2. 将 N[1..k] 拷贝到 r[1..k],并记录 r 的末尾 e = k + 1
3. 找出 r[1..k] 中的最大值 m
4. 对于每一个剩下的 n - k 个数 i:
4.1 如果 i 小于 m,将 m 塞到 r 的末尾 r[e];e = e + 1
4.1.1 如果 e == 2×k(r被塞满了),对 r 进行排序,得出前k个数中的最大值;抛弃后面的k个数,e = k + 1
5. 对 r[1..e] 进行排序
6. 返回 r[1..k]
这个算法的实际效果很好,在 n = 1亿、k = 1万 的情况下,对于随机的n个数,4.1条件成立的次数通常只有十几次,几乎可以忽略,因此大约只需要扫描整个数组时间的2~3倍即可。
但是这个算法同样存在退化的问题:如果全都是倒序排列的话,也变成O(n*m)了。幸而解决方案也很简单:对这个数组进行 n / 100 次的随机化(考虑到随机化耗时也比较大,100这个数字是试了几次以后发现的合适值,不一定最优,但是都差不多了):
void rand_factor()
{
//randomization
const int factor = 100;
int r, t, i;
for (i = n / factor; i > 0; i--)
{
r = rand() % n;
t = a[0], a[0] = a[r], a[r] = t;
}
}
{
//randomization
const int factor = 100;
int r, t, i;
for (i = n / factor; i > 0; i--)
{
r = rand() % n;
t = a[0], a[0] = a[r], a[r] = t;
}
}
p.s. 很不幸的是随机化这个方法对于solution3来说虽然提升也很明显,但是远远不够。solution4~6也不太行。
这里给出随机情况下和完全倒序情况下的性能对比吧(ubuntu 12.04 x86_64,i5 2400@3.1G):
引用
随机数据
========
随机化生成数据: 7.087s
遍历耗时: 0.053s
1/100随机化耗时: 0.064s
STL的nth_element: 0.841s
最大堆+随机化: 0.162s
Solution3+随机化: 3.121s
solution4+随机化: 2.825s
solution6+随机化: 0.249s
我的算法+随机化: 0.163s
倒序数据
========
无随机化生成数据: 0.150s
遍历耗时: 0.052s
1/100随机化耗时: 0.067s
STL的nth_element: 1.420s
最大堆+随机化: 0.301s
Solution3+随机化: 67.552s
solution4+随机化: 66.405s
solution6+随机化: 2.388s
我的算法+随机化: 0.170s
========
随机化生成数据: 7.087s
遍历耗时: 0.053s
1/100随机化耗时: 0.064s
STL的nth_element: 0.841s
最大堆+随机化: 0.162s
Solution3+随机化: 3.121s
solution4+随机化: 2.825s
solution6+随机化: 0.249s
我的算法+随机化: 0.163s
倒序数据
========
无随机化生成数据: 0.150s
遍历耗时: 0.052s
1/100随机化耗时: 0.067s
STL的nth_element: 1.420s
最大堆+随机化: 0.301s
Solution3+随机化: 67.552s
solution4+随机化: 66.405s
solution6+随机化: 2.388s
我的算法+随机化: 0.170s
补充说明一下,nth_element算法在面试的时候可能给出的 n 会更大,以至于在内存中存不下,这时候通常会认为用堆来实现最好——因为只要O(k)的空间就可以搞定,而且最差时间复杂度是O(logk * n),但是要注意logk的常数实际上是很大的,所以你可以看到前面的数据里 堆算法 的效率并不是最好的。事实上这里给出的所有算法(不考虑随机化的话)也都只需要O(k)的空间;如果需要随机化的话也很简单,分段处理,只要保证每段能在内存中保存下来就行了。
最后上代码存档(为了方便测试用了些全局变量,看起来可能有点挫):
Apr
16
今天看到80386+里 BSF/BSR 这对指令貌似挺有意思,Bit Scan Forward/Reverse,它们的作用是正向/逆向扫描一个WORD(16bit)/DWORD(32bit),找出第一个等于1的bit编号。比如对于BX=0x1010,执行 BSF CX, BX 得到的CX=4,而 BSR CX, BX 得到的CX=12。
咋一看是个神器啊。《编程之美》什么的,不是还有一道面试题说的就是怎样快速求出一个整数中bit=1的数量么。这货加上个while循环和移位,秒杀。最新版的Intel指令手册并没有明确说它需要多少个时钟周期,不过这里的数据指出,在80486+,BSF需要6~43个时钟周期才能完成(BSR则需要6~104个clocks),也就是说,它似乎并没有被特别优化,而是使用了CPU内的微码来完成循环,所以不要指望能带来性能上的BONUS,详情可参见这篇:求整数中比特为1的二进制位数。
这篇文章里面给出的用上BSF/BSR的代码是:
想试试这段代码,不过蛋疼的是这是VS的汇编语法,GCC不能识别,于是终于下了决心翻出GCC-Inline-Assembly-HOWTO,看了下,其实也不是很复杂,简单改了下,变成这样:
跑了一下,果然效率不行,跟最差的直接循环在同一个量级。
仔细看了下代码,发现这里用的是BSR,而且还用上了n和count两个变量,感觉不太爽,于是完全改成汇编,写成这样:
结果。。。基本没变。嗯,所以就这样吧。。。
咋一看是个神器啊。《编程之美》什么的,不是还有一道面试题说的就是怎样快速求出一个整数中bit=1的数量么。这货加上个while循环和移位,秒杀。最新版的Intel指令手册并没有明确说它需要多少个时钟周期,不过这里的数据指出,在80486+,BSF需要6~43个时钟周期才能完成(BSR则需要6~104个clocks),也就是说,它似乎并没有被特别优化,而是使用了CPU内的微码来完成循环,所以不要指望能带来性能上的BONUS,详情可参见这篇:求整数中比特为1的二进制位数。
这篇文章里面给出的用上BSF/BSR的代码是:
int f4(unsigned int num) {
int count = 0;
while(num) {
int n;
__asm {
bsr eax, num
mov n, eax
}
++count;
num ^= (1 << n);
}
return count;
}
int count = 0;
while(num) {
int n;
__asm {
bsr eax, num
mov n, eax
}
++count;
num ^= (1 << n);
}
return count;
}
想试试这段代码,不过蛋疼的是这是VS的汇编语法,GCC不能识别,于是终于下了决心翻出GCC-Inline-Assembly-HOWTO,看了下,其实也不是很复杂,简单改了下,变成这样:
int f4(unsigned int num) {
int count = 0;
while(num) {
int n;
__asm__ (
"bsr %1, %%eax\n\t"
"movl %%eax, %0\n\t"
: "=r" (n)
: "r" (num)
: "%eax"
);
++count;
num ^= (1 << n);
}
return count;
}
int count = 0;
while(num) {
int n;
__asm__ (
"bsr %1, %%eax\n\t"
"movl %%eax, %0\n\t"
: "=r" (n)
: "r" (num)
: "%eax"
);
++count;
num ^= (1 << n);
}
return count;
}
跑了一下,果然效率不行,跟最差的直接循环在同一个量级。
仔细看了下代码,发现这里用的是BSR,而且还用上了n和count两个变量,感觉不太爽,于是完全改成汇编,写成这样:
int f4(unsigned int num) { //注:x86下一般都是用EAX来存函数的返回值
asm (
"movl $0, %%eax\n\t" //这里是eax存的就是count
"jmp f4_cmp\n"
"f4_loop:\n\t"
"bsf %0, %%ecx\n\t" //ecx=bsf(num)
"incb %%cl\n\t" //cl += 1
"shrl %%cl, %0\n\t" //num >>= cl; shr貌似只接受 cl 和立即数?
"incl %%eax\n" //count++
"f4_cmp:\n\t"
"cmp $0, %0\n\t"
"jnz f4_loop\n\t"
:
: "r"(num)
: "%eax", "%ecx"
);
}
asm (
"movl $0, %%eax\n\t" //这里是eax存的就是count
"jmp f4_cmp\n"
"f4_loop:\n\t"
"bsf %0, %%ecx\n\t" //ecx=bsf(num)
"incb %%cl\n\t" //cl += 1
"shrl %%cl, %0\n\t" //num >>= cl; shr貌似只接受 cl 和立即数?
"incl %%eax\n" //count++
"f4_cmp:\n\t"
"cmp $0, %0\n\t"
"jnz f4_loop\n\t"
:
: "r"(num)
: "%eax", "%ecx"
);
}
结果。。。基本没变。嗯,所以就这样吧。。。
Mar
18
在RSA算法里头经常要用到“求x的n次方模m”这样的过程,通常使用O(log(n))的模重复平方算法来实现,提高效率。
其实这是大二学的《信息安全数学基础》里面的内容,那时为了考试需要(手算+写出很罗嗦的过程),还专门写了代码放在Blog空间里考试的时候用—……
同样O(log(n))的递归算法其实很容易理解:
但是SICP要求写一个迭代版的。印象中我是记得可以把 n 写成二进制(比如n=13, 1101),然后一位一位地推。
试推了一下从高位到低位,倒是挺简单的,记B[i]为第 i 位的值,通过A[i] = A[i+1]^2 * x^B[i] 从高到低计算出A[0],就得到结果了。但是问题是,为了从高到低计算,又得一次递归,似乎不能满足要求。
于是只好反过来,从低位往高位推。这个过程其实也挺简单的,举例来说:
当n=13,即二进制的 1101 = 1 * 2^3 + 1 * 2^2 + 0 * 2^1 + 1 * 2^0时,最终结果
ans = x^n % m
= x^(1 * 2^3 + 1 * 2^2 + 0 * 2^1 + 1 * 2^0) % m
= [x^(1 * 2^3)] * [x^(1 * 2^2)] * [(x ^ (0 * 2^1)] * [x ^ (1 * 2^0)] % m
也就是说,从低到高,在第 i 位的时候,将 x^(Bit[i] * 2^i) % m 乘到结果中即可。这里可以稍微变换一下:仅当Bit[i] == 1的时候,将x^(2^i) % m乘进去即可。所以这里可以用一个辅助的变量 b 来保存 x^(2^i) % m,在每次迭代的过程中 b = b^2 % m 。
于是实现就容易了:
//网上搜了下模重复平方算法,居然没有靠谱的算法解释,看来可能还是这个算法太简单了吧。。。
其实这是大二学的《信息安全数学基础》里面的内容,那时为了考试需要(手算+写出很罗嗦的过程),还专门写了代码放在Blog空间里考试的时候用—……
同样O(log(n))的递归算法其实很容易理解:
/* C */
int square(int x) { return x * x; } /* 这里可不能用宏哟 */
int fast_mod(int x, int n, int m)
{
if (n == 0)
return 1;
else if (n % 2 == 0)
return square(fast_mod(x, n / 2, m)) % m;
else
return x * fast_mod(x, n - 1, m) % m;
}
#Python
fast_mod = lambda x, n, m: 1 if n == 0 else fast_mod(x, n / 2, m) ** 2 % m if n % 2 == 0 else x * fast_mod(x, n - 1, m) % m
;Scheme
(define (even? n) (= (remainder n 2) 0))
(define (mod-fast x n m)
(define (square x) (* x x))
(cond ((= n 0) 1)
((even? n) (remainder (square (mod-fast x (/ n 2) m)) m))
(else (remainder (* x (mod-fast x (- n 1) m)) m))))
(mod-fast 79 24 33)
;16
int square(int x) { return x * x; } /* 这里可不能用宏哟 */
int fast_mod(int x, int n, int m)
{
if (n == 0)
return 1;
else if (n % 2 == 0)
return square(fast_mod(x, n / 2, m)) % m;
else
return x * fast_mod(x, n - 1, m) % m;
}
#Python
fast_mod = lambda x, n, m: 1 if n == 0 else fast_mod(x, n / 2, m) ** 2 % m if n % 2 == 0 else x * fast_mod(x, n - 1, m) % m
;Scheme
(define (even? n) (= (remainder n 2) 0))
(define (mod-fast x n m)
(define (square x) (* x x))
(cond ((= n 0) 1)
((even? n) (remainder (square (mod-fast x (/ n 2) m)) m))
(else (remainder (* x (mod-fast x (- n 1) m)) m))))
(mod-fast 79 24 33)
;16
但是SICP要求写一个迭代版的。印象中我是记得可以把 n 写成二进制(比如n=13, 1101),然后一位一位地推。
试推了一下从高位到低位,倒是挺简单的,记B[i]为第 i 位的值,通过A[i] = A[i+1]^2 * x^B[i] 从高到低计算出A[0],就得到结果了。但是问题是,为了从高到低计算,又得一次递归,似乎不能满足要求。
于是只好反过来,从低位往高位推。这个过程其实也挺简单的,举例来说:
当n=13,即二进制的 1101 = 1 * 2^3 + 1 * 2^2 + 0 * 2^1 + 1 * 2^0时,最终结果
ans = x^n % m
= x^(1 * 2^3 + 1 * 2^2 + 0 * 2^1 + 1 * 2^0) % m
= [x^(1 * 2^3)] * [x^(1 * 2^2)] * [(x ^ (0 * 2^1)] * [x ^ (1 * 2^0)] % m
也就是说,从低到高,在第 i 位的时候,将 x^(Bit[i] * 2^i) % m 乘到结果中即可。这里可以稍微变换一下:仅当Bit[i] == 1的时候,将x^(2^i) % m乘进去即可。所以这里可以用一个辅助的变量 b 来保存 x^(2^i) % m,在每次迭代的过程中 b = b^2 % m 。
于是实现就容易了:
/* C */
int fast_mod_iter(int x, int n, int m)
{
int a = 1, b = x; //i=0的时候b = x^(2^0) = x
while (n)
{
if (n % 2 == 1)
a = a * b % m;
b = b * b % m;
n /= 2;
}
return a;
}
#Python
def fast_mod(x, n, m):
a = 1
b = x
while True:
if n == 0:
return a
if n % 2 == 1:
a = a * b % m
b = b * b % m
n /= 2
;Scheme
(define (even? n) (= (remainder n 2) 0))
(define (mod-fast-iter x n m)
(define (iter a b n)
(cond ((= n 0) a)
((even? n)
(iter a (remainder (* b b) m) (/ n 2)))
(else
(iter (remainder (* a b) m) (* b b) (/ (- n 1) 2)))))
(iter 1 x n))
(mod-fast-iter 79 24 33)
;16
int fast_mod_iter(int x, int n, int m)
{
int a = 1, b = x; //i=0的时候b = x^(2^0) = x
while (n)
{
if (n % 2 == 1)
a = a * b % m;
b = b * b % m;
n /= 2;
}
return a;
}
#Python
def fast_mod(x, n, m):
a = 1
b = x
while True:
if n == 0:
return a
if n % 2 == 1:
a = a * b % m
b = b * b % m
n /= 2
;Scheme
(define (even? n) (= (remainder n 2) 0))
(define (mod-fast-iter x n m)
(define (iter a b n)
(cond ((= n 0) a)
((even? n)
(iter a (remainder (* b b) m) (/ n 2)))
(else
(iter (remainder (* a b) m) (* b b) (/ (- n 1) 2)))))
(iter 1 x n))
(mod-fast-iter 79 24 33)
;16
//网上搜了下模重复平方算法,居然没有靠谱的算法解释,看来可能还是这个算法太简单了吧。。。
Mar
18
最近在看SICP,抛弃旧的世界观和方法论压力很大,不过还是很有收获的,比如学习了个O(log(n))的fibonacci算法,大涨姿势啊。
想起前几天看到的某个笔试题,说是用最快的办法计算fibonacci数列的第n项。虽然我知道它是有个通项公式的,但是不适用于精确计算,因此写了个迭代的算法,自以为已经很好了,现在看了真是too simple sometimes naive了。
众所皆知 fibonacci 的定义是f(n) = f(n - 1) + f(n - 2); f(1) = f(2) = 1(从f(1)=1开始算起)
不幸的是这样树形展开效率太低了,当n=42的时候,C语言需要的时间已经超过1s了。
因此需要改成迭代版,使用 (a, b) <- (a + b, a) 这样的方式。
虽然O(n)的效率已经有显著的提升,但是由于这个数列的增长超过了2^n,所以当对于稍大的n,就需要使用大整数的运算,效率很低。python版的代码,当n=300,000的时候,需要超过1s才能得出结果;Scheme版则需要大约9s。
SICP的练习1-19里面则提到一个O(log(n))的巧妙算法:将计算fibonacci的每次迭代 (a, b) <- (a + b, a) 表示为一个变换T[p=0, q=1],具体表示为(似乎是用矩阵乘法倒推过来的)
通过计算 T[pq](T[pq](a, b)) (即对(a, b)进行两次T[pq]变换),可以得到
也就是说——可以通过类似分治计算 《a的n次方模b》 的算法来计算fibonacci了!
给出了算法以后,代码就容易写了:
经过这样的优化以后,效率显著,对于n=300,000,python版仅需要不到0.04s,而Scheme版也仅需要0.12s即可得出结果。
p.s. 以上代码均经过测试。
想起前几天看到的某个笔试题,说是用最快的办法计算fibonacci数列的第n项。虽然我知道它是有个通项公式的,但是不适用于精确计算,因此写了个迭代的算法,自以为已经很好了,现在看了真是too simple sometimes naive了。
众所皆知 fibonacci 的定义是f(n) = f(n - 1) + f(n - 2); f(1) = f(2) = 1(从f(1)=1开始算起)
#Python版:
fibonacci = lambda n: 1 if n <= 2 else fibonacci(n - 1) + fibonacci(n - 2)
/* C版 */
int fibonacci(int n) {
return n <= 2 ? 1 : fibonacci(n - 1) + fibonacci(n - 2);
}
;Scheme版
(define (fibonacci n) (if (<= n 2) 1 (+ (fibonacci (- n 1)) (fibonacci (- n 2)))))
fibonacci = lambda n: 1 if n <= 2 else fibonacci(n - 1) + fibonacci(n - 2)
/* C版 */
int fibonacci(int n) {
return n <= 2 ? 1 : fibonacci(n - 1) + fibonacci(n - 2);
}
;Scheme版
(define (fibonacci n) (if (<= n 2) 1 (+ (fibonacci (- n 1)) (fibonacci (- n 2)))))
不幸的是这样树形展开效率太低了,当n=42的时候,C语言需要的时间已经超过1s了。
因此需要改成迭代版,使用 (a, b) <- (a + b, a) 这样的方式。
#Python版
def fibonacci(n):
a = 1; b = 0;
for i in range(n):
a, b = a + b, a
return b
/* C版 */
int fibonacci(int n) {
int a = 1, b = 0, c;
while (n--) {
c = a;
a = a + b;
b = c;
}
return b;
}
;Scheme版
(define (fibonacci n)
(define (iter n a b)
(if (= n 0) b (iter (- n 1) (+ a b) a)))
(iter n 1 0))
def fibonacci(n):
a = 1; b = 0;
for i in range(n):
a, b = a + b, a
return b
/* C版 */
int fibonacci(int n) {
int a = 1, b = 0, c;
while (n--) {
c = a;
a = a + b;
b = c;
}
return b;
}
;Scheme版
(define (fibonacci n)
(define (iter n a b)
(if (= n 0) b (iter (- n 1) (+ a b) a)))
(iter n 1 0))
虽然O(n)的效率已经有显著的提升,但是由于这个数列的增长超过了2^n,所以当对于稍大的n,就需要使用大整数的运算,效率很低。python版的代码,当n=300,000的时候,需要超过1s才能得出结果;Scheme版则需要大约9s。
SICP的练习1-19里面则提到一个O(log(n))的巧妙算法:将计算fibonacci的每次迭代 (a, b) <- (a + b, a) 表示为一个变换T[p=0, q=1],具体表示为(似乎是用矩阵乘法倒推过来的)
引用
T[pq](a, b) = (a(p+q) + bq, aq + bp)
通过计算 T[pq](T[pq](a, b)) (即对(a, b)进行两次T[pq]变换),可以得到
引用
(a((pp+qq) + (2pq+qq)) + b(2pq+qq), a(2pq+qq) + b(pp+qq))
记 p' = pp + qq, q' = 2pq+qq 则有
T[p'q'](a, b) = T[pq](T[pq](a, b)) = (a(p' + q') + bq', aq' + bp')
于是
f(n+1) = T[pq]n(f(1))
记 p' = pp + qq, q' = 2pq+qq 则有
T[p'q'](a, b) = T[pq](T[pq](a, b)) = (a(p' + q') + bq', aq' + bp')
于是
f(n+1) = T[pq]n(f(1))
也就是说——可以通过类似分治计算 《a的n次方模b》 的算法来计算fibonacci了!
给出了算法以后,代码就容易写了:
#Python版
def fibonacci(n):
def iter(a, b, p, q, n):
if n == 0:
return b
elif n % 2 == 0:
return iter(a, b, p * p + q * q, 2 * p * q + q * q, n / 2)
else:
return iter(a * (p + q) + b * q, a * q + b * p, p, q, n - 1)
return iter(1, 0, 0, 1, n)
/* C版 */
typedef unsigned long long ull;
ull fibo_iter(ull a, ull b, ull p, ull q, int n) {
if (n == 0)
return b;
else if (n % 2 == 0)
return fibo_iter(a, b, p *p + q * q, 2 * p * q + q * q, n / 2);
else
return fibo_iter(a * (p + q) + b * q, a * q + b * p, p, q, n - 1);
}
ull fibonacci(int n) {
return fibo_iter(1, 0, 0, 1, n);
}
/* Scheme版 */
(define (even? n) (= (remainder n 2) 0))
(define (fibonacci n)
(define (iter a b p q n)
(cond ((= n 0) b)
((even? n)
(iter
a
b
(+ (* p p) (* q q))
(+ (* 2 p q) (* q q))
(/ n 2)))
(else
(iter
(+ (* a (+ p q)) (* b q))
(+ (* a q) (* b p))
p
q
(- n 1)))))
(iter 1 0 0 1 n))
def fibonacci(n):
def iter(a, b, p, q, n):
if n == 0:
return b
elif n % 2 == 0:
return iter(a, b, p * p + q * q, 2 * p * q + q * q, n / 2)
else:
return iter(a * (p + q) + b * q, a * q + b * p, p, q, n - 1)
return iter(1, 0, 0, 1, n)
/* C版 */
typedef unsigned long long ull;
ull fibo_iter(ull a, ull b, ull p, ull q, int n) {
if (n == 0)
return b;
else if (n % 2 == 0)
return fibo_iter(a, b, p *p + q * q, 2 * p * q + q * q, n / 2);
else
return fibo_iter(a * (p + q) + b * q, a * q + b * p, p, q, n - 1);
}
ull fibonacci(int n) {
return fibo_iter(1, 0, 0, 1, n);
}
/* Scheme版 */
(define (even? n) (= (remainder n 2) 0))
(define (fibonacci n)
(define (iter a b p q n)
(cond ((= n 0) b)
((even? n)
(iter
a
b
(+ (* p p) (* q q))
(+ (* 2 p q) (* q q))
(/ n 2)))
(else
(iter
(+ (* a (+ p q)) (* b q))
(+ (* a q) (* b p))
p
q
(- n 1)))))
(iter 1 0 0 1 n))
经过这样的优化以后,效率显著,对于n=300,000,python版仅需要不到0.04s,而Scheme版也仅需要0.12s即可得出结果。
p.s. 以上代码均经过测试。
Nov
27
昨天关注了下 微信公众平台,的确是个好东西,赞一下tx近来的开放。
它的api 乍一看还是挺简单的,于是就写了个小东西玩玩。
但是在开发的过程中遇到了多个坑,通过有故事王国的@ctqmumu同学把问题转了过去。实际开发的过程中遇到了好几个问题,但是其他问题(包括发一条消息请求我两次)都莫名其妙消失了,只有一个问题(具体见后文)通过被喷的方式解决了。
总之最后是可以用了,于是多花了点时间,把代码完善了下,写好了注释和样例,放在了Google Code上面,有需要的同学可以拿去用。
下载地址:http://code.google.com/p/mmsdk/downloads/list
注:目前callback url仅支持80端口(@2012.11.27)
#UPDATE 2012.11.29 添加了对图片消息的支持(样例)、对调试的支持(DUMP请求/回复的xml文件),测试SAE可用(调试功能除外)
== 分割线,下面是纯吐槽 ==
它的api 乍一看还是挺简单的,于是就写了个小东西玩玩。
但是在开发的过程中遇到了多个坑,通过有故事王国的@ctqmumu同学把问题转了过去。实际开发的过程中遇到了好几个问题,但是其他问题(包括发一条消息请求我两次)都莫名其妙消失了,只有一个问题(具体见后文)通过被喷的方式解决了。
总之最后是可以用了,于是多花了点时间,把代码完善了下,写好了注释和样例,放在了Google Code上面,有需要的同学可以拿去用。
下载地址:http://code.google.com/p/mmsdk/downloads/list
注:目前callback url仅支持80端口(@2012.11.27)
#UPDATE 2012.11.29 添加了对图片消息的支持(样例)、对调试的支持(DUMP请求/回复的xml文件),测试SAE可用(调试功能除外)
== 分割线,下面是纯吐槽 ==