协程调度器
说明
协程调度器包括一个线程池,包括一个任务池。线程之上封装协程,每个线程都有自己的主协程,在主协程的基础上,可以调取任务,转到其他协程,完成之后再返回主协程。
实现
任务池基本类型
任务池需要有协程类型与function类型,所以需要做一个封装。并且可以指定此任务需要在哪个线程上执行。
struct FiberAndThread{
Fiber::ptr fiber;
std::function<void()> cb;
int thread;
FiberAndThread(Fiber::ptr f, int thr)
:fiber(f),thread(thr){
}
/**
* @brief 构造函数
* @param[in] f 协程指针
* @param[in] thr 线程id
* @post *f = nullptr
*/
FiberAndThread(Fiber::ptr* f, int thr)
:thread(thr) {
fiber.swap(*f);
}
/**
* @brief 构造函数
* @param[in] f 协程执行函数
* @param[in] thr 线程id
*/
FiberAndThread(std::function<void()> f, int thr)
:cb(f), thread(thr) {
}
/**
* @brief 构造函数
* @param[in] f 协程执行函数指针
* @param[in] thr 线程id
* @post *f = nullptr
*/
FiberAndThread(std::function<void()>* f, int thr)
:thread(thr) {
cb.swap(*f);
}
/**
* @brief 无参构造函数
*/
FiberAndThread()
:thread(-1) {
}
/**
* @brief 重置数据
*/
void reset() {
fiber = nullptr;
cb = nullptr;
thread = -1;
}
};向任务池中添加任务
添加任务分两层封装,上层需要锁并且可以批量添加任务,下层是单位操作。
/**
* @brief 调度协程
* @param[in] fc 协程或函数
* @param[in] thread 协程执行的线程id,-1标识任意线程
*/
template<class FiberOrCb>
void schedule(FiberOrCb fc, int thread = -1) {
bool need_tickle = false;
{
MutexType::Lock lock(m_mutex);
need_tickle = scheduleNoLock(fc, thread);
}
if(need_tickle) {
tickle();
}
}
/**
* @brief 批量调度协程
* @param[in] begin 协程数组的开始
* @param[in] end 协程数组的结束
*/
template<class InputIterator>
void schedule(InputIterator begin, InputIterator end) {
bool need_tickle = false;
{
MutexType::Lock lock(m_mutex);
while(begin != end) {
need_tickle = scheduleNoLock(&*begin, -1) || need_tickle;
++begin;
}
}
if(need_tickle) {
tickle();
}
}
template<class FiberOrCb>
bool scheduleNoLock(FiberOrCb fc, int thread) {
bool need_tickle = m_fibers.empty();
FiberAndThread ft(fc, thread);
if(ft.fiber || ft.cb) {
m_fibers.push_back(ft);
}
return need_tickle;
}构造函数
use_caller代表是否将当前新建调度器的线程加入到线程池。
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string& name)
:m_name(name) {
SYLAR_ASSERT(threads > 0);
if(use_caller){
sylar::Fiber::GetThis();
SYLAR_ASSERT(t_scheduler != this);
t_scheduler=this;
threads--; //当前线程加入,故需新建线程数减一
m_rootFiber.reset(new Fiber(std::bind(&Scheduler::run,this),0,true));
//与其他线程不同,主线程的主调度协程是新建的run协程,其他线程的主调度协程与线程的主协程一致
sylar::Thread::SetName(m_name);
t_scheduler_fiber = m_rootFiber.get(); //设置当前主调度协程
m_rootThread = sylar::GetThreadId(); //设置主线程ID
m_threadIds.push_back(m_rootThread);
}
else{
m_rootThread=-1;
}
m_threadCount=threads; //设置线程池数
}
Scheduler::~Scheduler(){
SYLAR_ASSERT(m_stopping);
if(GetThis()==this){ //确认当前线程调度器与析构的调度器一致
t_scheduler=nullptr;
}
}
开启与中止调度器
开启调度器主要工作是新建线程,结束调度器主要是设置auto_Stop与Stopping为true为idle()协程创造结束的条件并通过tickle()函数通知其他线程进行结束,再启动主线程的run协程,最后回收各个线程
void Scheduler::start(){
MutexType::Lock lock(m_mutex);
if(!m_stopping) {//要想开启调度器,当前调度器状态必须为非结束状态
return;
}
m_stopping=false;
SYLAR_ASSERT(m_threads.empty());//当前线程池为空
m_threads.resize(m_threadCount);
for(size_t i=0;i<m_threadCount;i++){
m_threads[i].reset(new Thread(std::bind(&Scheduler::run,this),m_name+"_"+std::to_string(i)));
//新建各个线程,此处的reset为智能指针的reset函数
m_threadIds.push_back(m_threads[i]->getId());
}
lock.unlock();
// if(m_rootFiber) {
// //m_rootFiber->swapIn();
// m_rootFiber->call();
// SYLAR_LOG_INFO(g_logger) << "call out " << m_rootFiber->getState();
// }
}
//检测是否可以停止,并通知各个线程停止,最后回收线程
void Scheduler::stop(){
m_autoStop=true; //设置停止条件
//判断只有主协程,并且线程池为空
if(m_rootFiber
&& m_threadCount == 0
&& (m_rootFiber->getState() == Fiber::TERM
|| m_rootFiber->getState() == Fiber::INIT)) {
SYLAR_LOG_INFO(g_logger) << this << " stopped";
m_stopping = true;
//判断当前是否只有主线程的run协程并处在非执行状态
if(stopping()) { //留给子类实现,为子类回收自己的资源做准备
return;
}
}
if(m_rootThread==-1){ //检测是否使用了use_caller,如果使用了结束调度器的必须为使用了use_caller的线程调用stop
SYLAR_ASSERT(GetThis()!=t_scheduler);
}else{
SYLAR_ASSERT(GetThis()==t_scheduler);
}
m_stopping=true;
for(size_t i=0;i<m_threadCount;i++){
tickle();//唤醒各个线程,使他们自己执行结束
}
if(m_rootFiber){
tickle();
}
if(m_rootFiber){
if(!stopping()){
m_rootFiber->call(); //在结束处调用主协程,主要是因为如果一开始就进入主协程,那么主线程中进行任务添加的代码就毫无用处,因为已经结束了。
}
}
std::vector<Thread::ptr> thr;
{
MutexType::Lock lock(m_mutex);
thr.swap(m_threads);
}
for(size_t i=0;i<thr.size();i++){ //回收线程
thr[i]->join();
}
}线程的run函数
run函数是每个线程的主协程函数,大家都在这个协程循环等待任务池中的任务,如果没有任务就执行idle()协程,也就是一直等待,直到主线程调用stop()才会跳出idle()协程。
void Scheduler::run(){
SYLAR_LOG_DEBUG(g_logger)<<m_name<<" run";
setThis();
if(sylar::GetThreadId()!=m_rootThread){
t_scheduler_fiber=sylar::Fiber::GetThis().get();
}
Fiber::ptr idel_fiber (new Fiber(std::bind(&Scheduler::idle,this)));
Fiber::ptr cb_fiber;
FiberAndThread ft;
while(true){
ft.reset();
bool tickle_me=false;
bool is_active=false;
{
// SYLAR_LOG_DEBUG(g_logger)<<m_name<<"while run";
MutexType::Lock lock(m_mutex);
auto it = m_fibers.begin();
while(it!=m_fibers.end()){ //寻找可执行任务,
if(it->thread != -1 && sylar::GetThreadId() != it->thread){ //任务指定了线程id,但是当前线程不是,故需要提醒其他线程
++it;
tickle_me=true;
continue;
}
SYLAR_ASSERT(it->fiber || it->cb);
if(it->fiber && it->fiber->getState() == Fiber::EXEC){
++it;
continue;
}
ft=*it;
m_fibers.erase(it++);
++m_activeThreadCount; //找到可执行任务,说明活跃线程+1
is_active=true;
break;
}
tickle_me |= it!=m_fibers.end();
}
if(tickle_me){
tickle();
}
if(ft.fiber && (ft.fiber->getState() != Fiber::TERM
&& ft.fiber->getState()!=Fiber::EXCEPT)){ //判断需要执行的任务是否为协程
ft.fiber->swapIn();
--m_activeThreadCount;
if(ft.fiber->getState() == Fiber::READY) {
schedule(ft.fiber);
}else if(ft.fiber->getState() != Fiber::TERM
&& ft.fiber->getState() != Fiber::EXCEPT){
ft.fiber->m_state = Fiber::HOLD;
}
ft.reset();
}else if(ft.cb){
SYLAR_LOG_INFO(g_logger) << "new fiber cb";
if(cb_fiber){
cb_fiber->reset(ft.cb);
}
else{
cb_fiber.reset(new Fiber(ft.cb));
}
ft.reset();
cb_fiber->swapIn();
--m_activeThreadCount;
SYLAR_LOG_DEBUG(g_logger)<<t_scheduler_fiber->getState();
if(cb_fiber->getState() == Fiber::READY) {
schedule(cb_fiber);
cb_fiber.reset(); //释放
}else if(cb_fiber->getState() == Fiber::TERM
|| cb_fiber->getState() == Fiber::EXCEPT){
cb_fiber->reset(nullptr); //表示不调用析构函数
}else {//if(cb_fiber->getState() != Fiber::TERM) {
cb_fiber->m_state = Fiber::HOLD;
cb_fiber.reset(); //释放
}
}else {
if(is_active) {
--m_activeThreadCount;
continue;
}
if(idel_fiber->getState() == Fiber::TERM) {
SYLAR_LOG_INFO(g_logger) << "idle fiber term";
break;
}
++m_idleThreadCount;
idel_fiber->swapIn();
--m_idleThreadCount;
if(idel_fiber->getState() != Fiber::TERM
&& idel_fiber->getState() != Fiber::EXCEPT) {
idel_fiber->m_state = Fiber::HOLD;//当没有结束时,不需要一直循环,可以将idle切换出去,也就是切换回到主协程
}
}
}
}
void Scheduler::tickle() {
SYLAR_LOG_INFO(g_logger) << "tickle";
}
bool Scheduler::stopping() {
MutexType::Lock lock(m_mutex);
return m_autoStop && m_stopping
&& m_fibers.empty() && m_activeThreadCount == 0;
}
void Scheduler::idle() {
SYLAR_LOG_INFO(g_logger) << "idle";
while(!stopping()) {
sylar::Fiber::YieldToHold();
}
}
京公网安备 11010502036488号