普通排序的问题

我们知道除了一些特殊排序算法(如基数排序),平均性能最好的是快排。即使是使用快排,当排序规模达到千万级时,不加编译优化,用i7-6700 CPU @ 3.40GHz跑的时间也是秒级或十秒级。

多线程排序设计

普通排序的问题在于只能利用单个CPU能力。那我的初步想法就是利用多线程来提高CPU利用率,看能否提高排序效率。
alt

设计:

  1. 将排序数组按线程拆分成N个block。
  2. 每个线程负责各个block的排序,采用快排。
  3. 所有线程结束时,采用二路归并将N个block排序。
    这样最后就将整个数组都排好了。

多线程排序测试结果及分析

给测试环境分配了4个cpu核:

  1. 当线程数达到4或以上时,4个核跑满。这一点如果是多用户环境很可能是跑不满的,那排序效率肯定就下降了。
  2. 随着线程数的增加,排序效率先增加后下降,大概在线程数为10-16时达到最大。

线程太多后有切换上的开销,所以性能会下降。
内排属于计算密集型,cpu 4核8线程同时处理运算时效率还不是最高。对cpu来说,排序过程中与内存有很多次交互,我认为这期间cpu仍然有计算能力。所以稍微增加程序线程数还能提高运算效率。

alt

下面附上测试程序。说明:该程序只为测试而用,有好多不规范和可优化的点,这里不一一列举。

#define SORT_NUM 10000000
#define THREAD_NUM 8
#define BLOCK_SIZE (SORT_NUM / THREAD_NUM)
vector<int> g_vec(SORT_NUM);
vector<int> g_mergeresulttmp(SORT_NUM);

void init(vector<int> &vec)
{
    srand((unsigned)time(NULL));
    for (auto &e : vec)
    {
        e = rand() % 10000;
    }
}

void print(const vector<int> &vec)
{
    for (auto e : vec)
    {
        cout << e << endl;
    }
}

int ajust(vector<int> &vec, int beg, int end)
{    
    int i = beg + 1;
    int j = end;
    while (i <= j)
    {
        while (vec[i] <= vec[beg] && i <= j)
            i++;
        
        while (vec[j] >= vec[beg] && i <= j)
            j--;
        
        if (i < j)
        {
            swap(vec[i], vec[j]);
            i++;
            j--;
        }
    }

    swap(vec[beg], vec[j]);

    return j;
}

void qsort(vector<int> &vec, int beg, int end)
{
    if (beg >= end)
    {
        return;
    }
    
    int loc = ajust(vec, beg, end);
    qsort(vec, beg, loc - 1);
    qsort(vec, loc + 1, end);
}
void *sortthreadFunc(void *arg)
{
    int index = (int)(long)arg;
    int beg = BLOCK_SIZE * index;
    qsort(g_vec, beg, beg + BLOCK_SIZE - 1);
    return NULL;
}

void domerge(vector<int> &vec, int beg1, int beg2)
{
    int end1 = beg2 - 1;
    int end2 = beg2 + BLOCK_SIZE - 1;
    int i = beg1;
    int j = beg2;
    int k = 0;
    while (i <= end1 && j <= end2)
    {
        if (vec[i] <= vec[j])
        {
            g_mergeresulttmp[k++] = vec[i++];
        }
        else
        {
            g_mergeresulttmp[k++] = vec[j++];
        }
    }

    while (i <= end1)
    {
        g_mergeresulttmp[k++] = vec[i++];        
    }

    copy_n(g_mergeresulttmp.begin(), k, vec.begin());
}


void merge(vector<int> &vec)
{
    for (int i = 1; i < THREAD_NUM; i++)
    {
        domerge(vec, 0, i * BLOCK_SIZE);
    }
}

void threadssort(vector<int> &vec)
{
    vector<pthread_t> thrdvec(THREAD_NUM);
    for (int i = 0; i < thrdvec.size(); i++)
    {
        int ret = pthread_create(&thrdvec[i], NULL, sortthreadFunc, (void*)(long)i);
        if (ret != 0)
        {
            cout << "err" << endl;
            exit(1);
        }        
    }

    for (int i = 0; i < thrdvec.size(); i++)
    {
        pthread_join(thrdvec[i], NULL);
    }

    merge(vec);
}

int main()
{ 
    init(g_vec);
    clock_t start = clock();
    //qsort(g_vec, 0, g_vec.size() - 1);
    //sort(g_vec.begin(), g_vec.end());
    threadssort(g_vec);
    clock_t finish = clock();
    double duration = (double)(finish - start) / CLOCKS_PER_SEC;
    printf( "%f seconds\n", duration );
    //print(g_vec);
    return 0;
}