协程调度器

图片说明

说明

协程调度器包括一个线程池,包括一个任务池。线程之上封装协程,每个线程都有自己的主协程,在主协程的基础上,可以调取任务,转到其他协程,完成之后再返回主协程。

实现

任务池基本类型

任务池需要有协程类型与function类型,所以需要做一个封装。并且可以指定此任务需要在哪个线程上执行。

struct FiberAndThread{
        Fiber::ptr fiber;
        std::function<void()> cb;
        int thread;
        FiberAndThread(Fiber::ptr f, int thr)
        :fiber(f),thread(thr){
        }
        /**
         * @brief 构造函数
         * @param[in] f 协程指针
         * @param[in] thr 线程id
         * @post *f = nullptr
         */
        FiberAndThread(Fiber::ptr* f, int thr)
            :thread(thr) {
            fiber.swap(*f);
        }

        /**
         * @brief 构造函数
         * @param[in] f 协程执行函数
         * @param[in] thr 线程id
         */
        FiberAndThread(std::function<void()> f, int thr)
            :cb(f), thread(thr) {
        }

        /**
         * @brief 构造函数
         * @param[in] f 协程执行函数指针
         * @param[in] thr 线程id
         * @post *f = nullptr
         */
        FiberAndThread(std::function<void()>* f, int thr)
            :thread(thr) {
            cb.swap(*f);
        }
        /**
         * @brief 无参构造函数
         */
        FiberAndThread()
            :thread(-1) {
        }

        /**
         * @brief 重置数据
         */
        void reset() {
            fiber = nullptr;
            cb = nullptr;
            thread = -1;
        }
    };

向任务池中添加任务

添加任务分两层封装,上层需要锁并且可以批量添加任务,下层是单位操作。

/**
     * @brief 调度协程
     * @param[in] fc 协程或函数
     * @param[in] thread 协程执行的线程id,-1标识任意线程
     */
    template<class FiberOrCb>
    void schedule(FiberOrCb fc, int thread = -1) {
        bool need_tickle = false;
        {
            MutexType::Lock lock(m_mutex);
            need_tickle = scheduleNoLock(fc, thread);
        }

        if(need_tickle) {
            tickle();
        }
    }

    /**
     * @brief 批量调度协程
     * @param[in] begin 协程数组的开始
     * @param[in] end 协程数组的结束
     */
    template<class InputIterator>
    void schedule(InputIterator begin, InputIterator end) {
        bool need_tickle = false;
        {
            MutexType::Lock lock(m_mutex);
            while(begin != end) {
                need_tickle = scheduleNoLock(&*begin, -1) || need_tickle;
                ++begin;
            }
        }
        if(need_tickle) {
            tickle();
        }
    }

template<class FiberOrCb>
    bool scheduleNoLock(FiberOrCb fc, int thread) {
        bool need_tickle = m_fibers.empty();
        FiberAndThread ft(fc, thread);
        if(ft.fiber || ft.cb) {
            m_fibers.push_back(ft);
        }
        return need_tickle;
    }

构造函数

use_caller代表是否将当前新建调度器的线程加入到线程池。

Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name)
    :m_name(name) {
    SYLAR_ASSERT(threads > 0);
    if(use_caller){
        sylar::Fiber::GetThis();
        SYLAR_ASSERT(t_scheduler != this);
        t_scheduler=this;
        threads--;  //当前线程加入,故需新建线程数减一
        m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run,this),0,true));
        //与其他线程不同,主线程的主调度协程是新建的run协程,其他线程的主调度协程与线程的主协程一致
        sylar::Thread::SetName(m_name);
        t_scheduler_fiber = m_rootFiber.get(); //设置当前主调度协程
        m_rootThread = sylar::GetThreadId(); //设置主线程ID
        m_threadIds.push_back(m_rootThread);
    }
    else{
        m_rootThread=-1;
    }
    m_threadCount=threads;  //设置线程池数
}

Scheduler::~Scheduler(){
    SYLAR_ASSERT(m_stopping);
    if(GetThis()==this){ //确认当前线程调度器与析构的调度器一致
        t_scheduler=nullptr;
    }
}

开启与中止调度器

开启调度器主要工作是新建线程,结束调度器主要是设置auto_Stop与Stopping为true为idle()协程创造结束的条件并通过tickle()函数通知其他线程进行结束,再启动主线程的run协程,最后回收各个线程

void Scheduler::start(){
    MutexType::Lock lock(m_mutex);
     if(!m_stopping) {//要想开启调度器,当前调度器状态必须为非结束状态
            return;
       }
    m_stopping=false; 
    SYLAR_ASSERT(m_threads.empty());//当前线程池为空
    m_threads.resize(m_threadCount);
    for(size_t i=0;i<m_threadCount;i++){
        m_threads[i].reset(new Thread(std::bind(&Scheduler::run,this),m_name+"_"+std::to_string(i))); 
//新建各个线程,此处的reset为智能指针的reset函数
        m_threadIds.push_back(m_threads[i]->getId());
    }
    lock.unlock();
  // if(m_rootFiber) {
      //    //m_rootFiber->swapIn();
    //      m_rootFiber->call();
      //    SYLAR_LOG_INFO(g_logger) << "call out " << m_rootFiber->getState();
     // }
}
//检测是否可以停止,并通知各个线程停止,最后回收线程
void Scheduler::stop(){
    m_autoStop=true;  //设置停止条件
    //判断只有主协程,并且线程池为空
 if(m_rootFiber
            && m_threadCount == 0
            && (m_rootFiber->getState() == Fiber::TERM
                || m_rootFiber->getState() == Fiber::INIT)) {
        SYLAR_LOG_INFO(g_logger) << this << " stopped";
        m_stopping = true;
      //判断当前是否只有主线程的run协程并处在非执行状态
        if(stopping()) { //留给子类实现,为子类回收自己的资源做准备
            return;
        }
    }
    if(m_rootThread==-1){ //检测是否使用了use_caller,如果使用了结束调度器的必须为使用了use_caller的线程调用stop
        SYLAR_ASSERT(GetThis()!=t_scheduler);
    }else{
        SYLAR_ASSERT(GetThis()==t_scheduler);
    }

    m_stopping=true;
    for(size_t i=0;i<m_threadCount;i++){
        tickle();//唤醒各个线程,使他们自己执行结束
    }
    if(m_rootFiber){
        tickle();
    }
    if(m_rootFiber){
        if(!stopping()){
            m_rootFiber->call(); //在结束处调用主协程,主要是因为如果一开始就进入主协程,那么主线程中进行任务添加的代码就毫无用处,因为已经结束了。
        }
    }
    std::vector<Thread::ptr> thr;
    {
        MutexType::Lock lock(m_mutex);
        thr.swap(m_threads);
    }
    for(size_t i=0;i<thr.size();i++){ //回收线程
        thr[i]->join();
    }
}

线程的run函数

run函数是每个线程的主协程函数,大家都在这个协程循环等待任务池中的任务,如果没有任务就执行idle()协程,也就是一直等待,直到主线程调用stop()才会跳出idle()协程。

void Scheduler::run(){
    SYLAR_LOG_DEBUG(g_logger)<<m_name<<" run";
    setThis();
    if(sylar::GetThreadId()!=m_rootThread){
        t_scheduler_fiber=sylar::Fiber::GetThis().get();
    }
    Fiber::ptr idel_fiber (new Fiber(std::bind(&Scheduler::idle,this)));
    Fiber::ptr cb_fiber;
    FiberAndThread ft;
    while(true){
        ft.reset();
        bool tickle_me=false;
        bool is_active=false;
        {
          //  SYLAR_LOG_DEBUG(g_logger)<<m_name<<"while run";
            MutexType::Lock lock(m_mutex);
            auto it = m_fibers.begin();
            while(it!=m_fibers.end()){ //寻找可执行任务,
                if(it->thread != -1 && sylar::GetThreadId() != it->thread){ //任务指定了线程id,但是当前线程不是,故需要提醒其他线程
                        ++it;
                        tickle_me=true;
                        continue;
                }
                SYLAR_ASSERT(it->fiber || it->cb);
                if(it->fiber && it->fiber->getState() == Fiber::EXEC){
                    ++it;
                    continue;
                }
                ft=*it;
                m_fibers.erase(it++);
                ++m_activeThreadCount; //找到可执行任务,说明活跃线程+1
                is_active=true;
                break;
            }
            tickle_me |= it!=m_fibers.end();
        }
        if(tickle_me){
            tickle();
        }
        if(ft.fiber && (ft.fiber->getState() != Fiber::TERM
        && ft.fiber->getState()!=Fiber::EXCEPT)){ //判断需要执行的任务是否为协程
            ft.fiber->swapIn();
            --m_activeThreadCount;
            if(ft.fiber->getState() == Fiber::READY) {
                schedule(ft.fiber);
            }else if(ft.fiber->getState() != Fiber::TERM
             && ft.fiber->getState() != Fiber::EXCEPT){
                ft.fiber->m_state = Fiber::HOLD;
             }
             ft.reset();
        }else if(ft.cb){
            SYLAR_LOG_INFO(g_logger) << "new fiber cb";
            if(cb_fiber){
                cb_fiber->reset(ft.cb);
            }
            else{
                cb_fiber.reset(new Fiber(ft.cb));
            }
            ft.reset();
            cb_fiber->swapIn();
            --m_activeThreadCount;
            SYLAR_LOG_DEBUG(g_logger)<<t_scheduler_fiber->getState();
             if(cb_fiber->getState() == Fiber::READY) {

                    schedule(cb_fiber);
                    cb_fiber.reset();  //释放
             }else if(cb_fiber->getState() == Fiber::TERM
                || cb_fiber->getState() == Fiber::EXCEPT){

                   cb_fiber->reset(nullptr);  //表示不调用析构函数
              }else {//if(cb_fiber->getState() != Fiber::TERM) {

                   cb_fiber->m_state = Fiber::HOLD;
                   cb_fiber.reset();  //释放
              }

        }else {
                     if(is_active) {
                         --m_activeThreadCount;
                         continue;
                     }
                     if(idel_fiber->getState() == Fiber::TERM) {
                         SYLAR_LOG_INFO(g_logger) << "idle fiber term";
                         break;
                     }

                     ++m_idleThreadCount;
                     idel_fiber->swapIn();
                     --m_idleThreadCount;
                     if(idel_fiber->getState() != Fiber::TERM
                             && idel_fiber->getState() != Fiber::EXCEPT) {
                         idel_fiber->m_state = Fiber::HOLD;//当没有结束时,不需要一直循环,可以将idle切换出去,也就是切换回到主协程
                     }

    }

}
}
void Scheduler::tickle() {
    SYLAR_LOG_INFO(g_logger) << "tickle";
}

bool Scheduler::stopping() {
    MutexType::Lock lock(m_mutex);
    return m_autoStop && m_stopping
        && m_fibers.empty() && m_activeThreadCount == 0;
}

void Scheduler::idle() {
    SYLAR_LOG_INFO(g_logger) << "idle";
    while(!stopping()) {
        sylar::Fiber::YieldToHold();
    }
}