普通排序的问题
我们知道除了一些特殊排序算法(如基数排序),平均性能最好的是快排。即使是使用快排,当排序规模达到千万级时,不加编译优化,用i7-6700 CPU @ 3.40GHz跑的时间也是秒级或十秒级。
多线程排序设计
普通排序的问题在于只能利用单个CPU能力。那我的初步想法就是利用多线程来提高CPU利用率,看能否提高排序效率。
设计:
- 将排序数组按线程拆分成N个block。
- 每个线程负责各个block的排序,采用快排。
- 所有线程结束时,采用二路归并将N个block排序。
这样最后就将整个数组都排好了。
多线程排序测试结果及分析
给测试环境分配了4个cpu核:
- 当线程数达到4或以上时,4个核跑满。这一点如果是多用户环境很可能是跑不满的,那排序效率肯定就下降了。
- 随着线程数的增加,排序效率先增加后下降,大概在线程数为10-16时达到最大。
线程太多后有切换上的开销,所以性能会下降。
内排属于计算密集型,cpu 4核8线程同时处理运算时效率还不是最高。对cpu来说,排序过程中与内存有很多次交互,我认为这期间cpu仍然有计算能力。所以稍微增加程序线程数还能提高运算效率。
下面附上测试程序。说明:该程序只为测试而用,有好多不规范和可优化的点,这里不一一列举。
#define SORT_NUM 10000000
#define THREAD_NUM 8
#define BLOCK_SIZE (SORT_NUM / THREAD_NUM)
vector<int> g_vec(SORT_NUM);
vector<int> g_mergeresulttmp(SORT_NUM);
void init(vector<int> &vec)
{
srand((unsigned)time(NULL));
for (auto &e : vec)
{
e = rand() % 10000;
}
}
void print(const vector<int> &vec)
{
for (auto e : vec)
{
cout << e << endl;
}
}
int ajust(vector<int> &vec, int beg, int end)
{
int i = beg + 1;
int j = end;
while (i <= j)
{
while (vec[i] <= vec[beg] && i <= j)
i++;
while (vec[j] >= vec[beg] && i <= j)
j--;
if (i < j)
{
swap(vec[i], vec[j]);
i++;
j--;
}
}
swap(vec[beg], vec[j]);
return j;
}
void qsort(vector<int> &vec, int beg, int end)
{
if (beg >= end)
{
return;
}
int loc = ajust(vec, beg, end);
qsort(vec, beg, loc - 1);
qsort(vec, loc + 1, end);
}
void *sortthreadFunc(void *arg)
{
int index = (int)(long)arg;
int beg = BLOCK_SIZE * index;
qsort(g_vec, beg, beg + BLOCK_SIZE - 1);
return NULL;
}
void domerge(vector<int> &vec, int beg1, int beg2)
{
int end1 = beg2 - 1;
int end2 = beg2 + BLOCK_SIZE - 1;
int i = beg1;
int j = beg2;
int k = 0;
while (i <= end1 && j <= end2)
{
if (vec[i] <= vec[j])
{
g_mergeresulttmp[k++] = vec[i++];
}
else
{
g_mergeresulttmp[k++] = vec[j++];
}
}
while (i <= end1)
{
g_mergeresulttmp[k++] = vec[i++];
}
copy_n(g_mergeresulttmp.begin(), k, vec.begin());
}
void merge(vector<int> &vec)
{
for (int i = 1; i < THREAD_NUM; i++)
{
domerge(vec, 0, i * BLOCK_SIZE);
}
}
void threadssort(vector<int> &vec)
{
vector<pthread_t> thrdvec(THREAD_NUM);
for (int i = 0; i < thrdvec.size(); i++)
{
int ret = pthread_create(&thrdvec[i], NULL, sortthreadFunc, (void*)(long)i);
if (ret != 0)
{
cout << "err" << endl;
exit(1);
}
}
for (int i = 0; i < thrdvec.size(); i++)
{
pthread_join(thrdvec[i], NULL);
}
merge(vec);
}
int main()
{
init(g_vec);
clock_t start = clock();
//qsort(g_vec, 0, g_vec.size() - 1);
//sort(g_vec.begin(), g_vec.end());
threadssort(g_vec);
clock_t finish = clock();
double duration = (double)(finish - start) / CLOCKS_PER_SEC;
printf( "%f seconds\n", duration );
//print(g_vec);
return 0;
}