协程调度器
说明
协程调度器包括一个线程池,包括一个任务池。线程之上封装协程,每个线程都有自己的主协程,在主协程的基础上,可以调取任务,转到其他协程,完成之后再返回主协程。
实现
任务池基本类型
任务池需要有协程类型与function类型,所以需要做一个封装。并且可以指定此任务需要在哪个线程上执行。
struct FiberAndThread{ Fiber::ptr fiber; std::function<void()> cb; int thread; FiberAndThread(Fiber::ptr f, int thr) :fiber(f),thread(thr){ } /** * @brief 构造函数 * @param[in] f 协程指针 * @param[in] thr 线程id * @post *f = nullptr */ FiberAndThread(Fiber::ptr* f, int thr) :thread(thr) { fiber.swap(*f); } /** * @brief 构造函数 * @param[in] f 协程执行函数 * @param[in] thr 线程id */ FiberAndThread(std::function<void()> f, int thr) :cb(f), thread(thr) { } /** * @brief 构造函数 * @param[in] f 协程执行函数指针 * @param[in] thr 线程id * @post *f = nullptr */ FiberAndThread(std::function<void()>* f, int thr) :thread(thr) { cb.swap(*f); } /** * @brief 无参构造函数 */ FiberAndThread() :thread(-1) { } /** * @brief 重置数据 */ void reset() { fiber = nullptr; cb = nullptr; thread = -1; } };
向任务池中添加任务
添加任务分两层封装,上层需要锁并且可以批量添加任务,下层是单位操作。
/** * @brief 调度协程 * @param[in] fc 协程或函数 * @param[in] thread 协程执行的线程id,-1标识任意线程 */ template<class FiberOrCb> void schedule(FiberOrCb fc, int thread = -1) { bool need_tickle = false; { MutexType::Lock lock(m_mutex); need_tickle = scheduleNoLock(fc, thread); } if(need_tickle) { tickle(); } } /** * @brief 批量调度协程 * @param[in] begin 协程数组的开始 * @param[in] end 协程数组的结束 */ template<class InputIterator> void schedule(InputIterator begin, InputIterator end) { bool need_tickle = false; { MutexType::Lock lock(m_mutex); while(begin != end) { need_tickle = scheduleNoLock(&*begin, -1) || need_tickle; ++begin; } } if(need_tickle) { tickle(); } } template<class FiberOrCb> bool scheduleNoLock(FiberOrCb fc, int thread) { bool need_tickle = m_fibers.empty(); FiberAndThread ft(fc, thread); if(ft.fiber || ft.cb) { m_fibers.push_back(ft); } return need_tickle; }
构造函数
use_caller代表是否将当前新建调度器的线程加入到线程池。
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name) :m_name(name) { SYLAR_ASSERT(threads > 0); if(use_caller){ sylar::Fiber::GetThis(); SYLAR_ASSERT(t_scheduler != this); t_scheduler=this; threads--; //当前线程加入,故需新建线程数减一 m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run,this),0,true)); //与其他线程不同,主线程的主调度协程是新建的run协程,其他线程的主调度协程与线程的主协程一致 sylar::Thread::SetName(m_name); t_scheduler_fiber = m_rootFiber.get(); //设置当前主调度协程 m_rootThread = sylar::GetThreadId(); //设置主线程ID m_threadIds.push_back(m_rootThread); } else{ m_rootThread=-1; } m_threadCount=threads; //设置线程池数 } Scheduler::~Scheduler(){ SYLAR_ASSERT(m_stopping); if(GetThis()==this){ //确认当前线程调度器与析构的调度器一致 t_scheduler=nullptr; } }
开启与中止调度器
开启调度器主要工作是新建线程,结束调度器主要是设置auto_Stop与Stopping为true为idle()协程创造结束的条件并通过tickle()函数通知其他线程进行结束,再启动主线程的run协程,最后回收各个线程
void Scheduler::start(){ MutexType::Lock lock(m_mutex); if(!m_stopping) {//要想开启调度器,当前调度器状态必须为非结束状态 return; } m_stopping=false; SYLAR_ASSERT(m_threads.empty());//当前线程池为空 m_threads.resize(m_threadCount); for(size_t i=0;i<m_threadCount;i++){ m_threads[i].reset(new Thread(std::bind(&Scheduler::run,this),m_name+"_"+std::to_string(i))); //新建各个线程,此处的reset为智能指针的reset函数 m_threadIds.push_back(m_threads[i]->getId()); } lock.unlock(); // if(m_rootFiber) { // //m_rootFiber->swapIn(); // m_rootFiber->call(); // SYLAR_LOG_INFO(g_logger) << "call out " << m_rootFiber->getState(); // } } //检测是否可以停止,并通知各个线程停止,最后回收线程 void Scheduler::stop(){ m_autoStop=true; //设置停止条件 //判断只有主协程,并且线程池为空 if(m_rootFiber && m_threadCount == 0 && (m_rootFiber->getState() == Fiber::TERM || m_rootFiber->getState() == Fiber::INIT)) { SYLAR_LOG_INFO(g_logger) << this << " stopped"; m_stopping = true; //判断当前是否只有主线程的run协程并处在非执行状态 if(stopping()) { //留给子类实现,为子类回收自己的资源做准备 return; } } if(m_rootThread==-1){ //检测是否使用了use_caller,如果使用了结束调度器的必须为使用了use_caller的线程调用stop SYLAR_ASSERT(GetThis()!=t_scheduler); }else{ SYLAR_ASSERT(GetThis()==t_scheduler); } m_stopping=true; for(size_t i=0;i<m_threadCount;i++){ tickle();//唤醒各个线程,使他们自己执行结束 } if(m_rootFiber){ tickle(); } if(m_rootFiber){ if(!stopping()){ m_rootFiber->call(); //在结束处调用主协程,主要是因为如果一开始就进入主协程,那么主线程中进行任务添加的代码就毫无用处,因为已经结束了。 } } std::vector<Thread::ptr> thr; { MutexType::Lock lock(m_mutex); thr.swap(m_threads); } for(size_t i=0;i<thr.size();i++){ //回收线程 thr[i]->join(); } }
线程的run函数
run函数是每个线程的主协程函数,大家都在这个协程循环等待任务池中的任务,如果没有任务就执行idle()协程,也就是一直等待,直到主线程调用stop()才会跳出idle()协程。
void Scheduler::run(){ SYLAR_LOG_DEBUG(g_logger)<<m_name<<" run"; setThis(); if(sylar::GetThreadId()!=m_rootThread){ t_scheduler_fiber=sylar::Fiber::GetThis().get(); } Fiber::ptr idel_fiber (new Fiber(std::bind(&Scheduler::idle,this))); Fiber::ptr cb_fiber; FiberAndThread ft; while(true){ ft.reset(); bool tickle_me=false; bool is_active=false; { // SYLAR_LOG_DEBUG(g_logger)<<m_name<<"while run"; MutexType::Lock lock(m_mutex); auto it = m_fibers.begin(); while(it!=m_fibers.end()){ //寻找可执行任务, if(it->thread != -1 && sylar::GetThreadId() != it->thread){ //任务指定了线程id,但是当前线程不是,故需要提醒其他线程 ++it; tickle_me=true; continue; } SYLAR_ASSERT(it->fiber || it->cb); if(it->fiber && it->fiber->getState() == Fiber::EXEC){ ++it; continue; } ft=*it; m_fibers.erase(it++); ++m_activeThreadCount; //找到可执行任务,说明活跃线程+1 is_active=true; break; } tickle_me |= it!=m_fibers.end(); } if(tickle_me){ tickle(); } if(ft.fiber && (ft.fiber->getState() != Fiber::TERM && ft.fiber->getState()!=Fiber::EXCEPT)){ //判断需要执行的任务是否为协程 ft.fiber->swapIn(); --m_activeThreadCount; if(ft.fiber->getState() == Fiber::READY) { schedule(ft.fiber); }else if(ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT){ ft.fiber->m_state = Fiber::HOLD; } ft.reset(); }else if(ft.cb){ SYLAR_LOG_INFO(g_logger) << "new fiber cb"; if(cb_fiber){ cb_fiber->reset(ft.cb); } else{ cb_fiber.reset(new Fiber(ft.cb)); } ft.reset(); cb_fiber->swapIn(); --m_activeThreadCount; SYLAR_LOG_DEBUG(g_logger)<<t_scheduler_fiber->getState(); if(cb_fiber->getState() == Fiber::READY) { schedule(cb_fiber); cb_fiber.reset(); //释放 }else if(cb_fiber->getState() == Fiber::TERM || cb_fiber->getState() == Fiber::EXCEPT){ cb_fiber->reset(nullptr); //表示不调用析构函数 }else {//if(cb_fiber->getState() != Fiber::TERM) { cb_fiber->m_state = Fiber::HOLD; cb_fiber.reset(); //释放 } }else { if(is_active) { --m_activeThreadCount; continue; } if(idel_fiber->getState() == Fiber::TERM) { SYLAR_LOG_INFO(g_logger) << "idle fiber term"; break; } ++m_idleThreadCount; idel_fiber->swapIn(); --m_idleThreadCount; if(idel_fiber->getState() != Fiber::TERM && idel_fiber->getState() != Fiber::EXCEPT) { idel_fiber->m_state = Fiber::HOLD;//当没有结束时,不需要一直循环,可以将idle切换出去,也就是切换回到主协程 } } } } void Scheduler::tickle() { SYLAR_LOG_INFO(g_logger) << "tickle"; } bool Scheduler::stopping() { MutexType::Lock lock(m_mutex); return m_autoStop && m_stopping && m_fibers.empty() && m_activeThreadCount == 0; } void Scheduler::idle() { SYLAR_LOG_INFO(g_logger) << "idle"; while(!stopping()) { sylar::Fiber::YieldToHold(); } }