线程封装使用的是pthread线程库,信号量使用的是sem系列,然后锁的封装有读写锁pthread_rwlock、互斥量pthread_mutex、pthread_spinlock、CAS原子锁。
1.线程封装
线程类不提供默认复制、拷贝构造与左值构造,所以将三个默认函数删除掉。
线程类提供五个属性,m_id为pid_t类型,为全局线程id,可以用syscall(SYS_gettid)获取。m_thread是pthread_t类型,在当前进程中唯一标识线程,可以用pthread_self()函数获取。
m_cb是function类型的属性,是当前线程最终调用的函数。m_name是当前线程名称可以用于日志输出,调试等等。
class Thread{ public: typedef std::shared_ptr<Thread> ptr; Thread(const Thread &) =delete; Thread(const Thread &&) =delete; Thread & operator = (const Thread &) =delete; ~Thread(); static void SetName(const std::string & name); static const std::string& GetName(); //静态方法不能为const方法 static Thread* GetThis(); Thread(std::function <void ()> cb,const std::string &name); void join(); const std::string& getName() const { return m_name; } const pid_t getId() const { return m_id; } private: static void* run(void* arg); private: pid_t m_id=-1; pthread_t m_thread=0; std::function<void ()> m_cb; std::string m_name; Semaphore m_semaphore; };
在Thread类中提供了三个静态方法是用于处理两个静态属性。这两个静态属性分别用于获取当前执行线程的指针与线程名称, sylar::Thread::GetThis(),sylar::Thread::GetName()。run()函数是注册在pthread_create()中的,在run()函数中会完成对两个静态属性的初始化。pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());能够完成在操作系统设置线程名称的任务。
在构造函数中有m_semaphore.wait(),在run()函数中调用真正线程函数之前调用thread->m_semaphore.notify()。是为了保证在完成线程类的构造函数之前,线程已经开始执行了,不然在构造完成之后,线程可能还没开始执行,在离开作用域时会执行析构函数,造成线程还没开始就被pthread_detach()了。
static thread_local Thread * t_thread = nullptr; static thread_local std::string t_thread_name = "NUKNOW"; void Thread::SetName(const std::string & name){ if(t_thread){ t_thread->m_name=name; } t_thread_name=name; } const std::string & Thread::GetName(){ return t_thread_name; } void * Thread::run(void * arg){ Thread* thread = (Thread*)arg; t_thread = thread; t_thread_name = thread->m_name; thread->m_id = sylar::GetThreadId(); pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str()); std::function<void()> cb; cb.swap(thread->m_cb); thread->m_semaphore.notify(); cb(); return 0; }
2.信号量
信号量的封装使用的是sem库,信号量同样不允许默认拷贝,复制构造,左值构造,所以将默认函数删除。
其中只用一个属性为m_semaphore,为sem_t类型。
class Semaphore { public: /** * @brief 构造函数 * @param[in] count 信号量值的大小 */ Semaphore(uint32_t count = 0); Semaphore(const Semaphore &)=delete; Semaphore(const Semaphore &&)=delete; Semaphore & operator =(const Semaphore &) =delete; /** * @brief 析构函数 :wq */ ~Semaphore(); /** * @brief 获取信号量 */ void wait(); /** * @brief 释放信号量 */ void notify(); private: sem_t m_semaphore; };
在Semaphore中有几个函数,在构造函数中进行sem_init初始化,使用传参的方式获取信号量的大小。在析构函数中进行sem_destroy()。
Semaphore::Semaphore(uint32_t count) { if(sem_init(&m_semaphore, 0, count)) { throw std::logic_error("sem_init error"); } } Semaphore::~Semaphore() { sem_destroy(&m_semaphore); } void Semaphore::wait() { if(sem_wait(&m_semaphore)) { throw std::logic_error("sem_wait error"); } } void Semaphore::notify() { if(sem_post(&m_semaphore)) { throw std::logic_error("sem_post error"); } }
3.互斥量
对于互斥量的封装有两层,一层是对不同的锁进行类封装,使其对外提供统一接口。比如pthread_mutex、pthread_rwlock、pthread_spinlock、CAS等等。第二层是对已经做好对外进行统一封装的锁进行模版封装,使其能有对所有锁类实行统一使用。
1 锁的第一层封装
锁的第一层封装,对于普通锁对外使用lock(),unlock()。读写锁对外使用wrlock(),rdlock(),unlock()。
class Mutex{ public: typedef ScopedLockImpl<Mutex> Lock; Mutex(){ pthread_mutex_init(&m_mutex,nullptr); } ~Mutex(){ pthread_mutex_destroy(&m_mutex); } void lock(){ pthread_mutex_lock(&m_mutex); } void unlock(){ pthread_mutex_unlock(&m_mutex); } private: pthread_mutex_t m_mutex; }; //自旋锁 class Spinlock { public: /// 局部锁 typedef ScopedLockImpl<Spinlock> Lock; /** * @brief 构造函数 */ Spinlock() { pthread_spin_init(&m_mutex, 0); } /** * @brief 析构函数 */ ~Spinlock() { pthread_spin_destroy(&m_mutex); } /** * @brief 上锁 */ void lock() { pthread_spin_lock(&m_mutex); } /** * @brief 解锁 */ void unlock() { pthread_spin_unlock(&m_mutex); } private: /// 自旋锁 pthread_spinlock_t m_mutex; }; /** * @brief 原子锁 */ class CASLock { public: /// 局部锁 typedef ScopedLockImpl<CASLock> Lock; /** * @brief 构造函数 */ CASLock() { m_mutex.clear(); } /** * @brief 析构函数 */ ~CASLock() { } /** * @brief 上锁 */ void lock() { while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire)); } /** * @brief 解锁 */ void unlock() { std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release); } private: /// 原子状态 volatile std::atomic_flag m_mutex; }; class RWMutex{ public: /// 局部读锁 typedef ReadScopedLockImpl<RWMutex> ReadLock; /// 局部写锁 typedef WriteScopedLockImpl<RWMutex> WriteLock; /** * @brief 构造函数 */ RWMutex() { pthread_rwlock_init(&m_lock, nullptr); } /** * @brief 析构函数 */ ~RWMutex() { pthread_rwlock_destroy(&m_lock); } /** * @brief 上读锁 */ void rdlock() { pthread_rwlock_rdlock(&m_lock); } /** * @brief 上写锁 */ void wrlock() { pthread_rwlock_wrlock(&m_lock); } /** * @brief 解锁 */ void unlock() { pthread_rwlock_unlock(&m_lock); } private: /// 读写锁 pthread_rwlock_t m_lock; };
2.锁的第二层封装
第二层封装使用模版类,统一了普通锁与读写锁对外的使用接口,但是其实第二层封装是直接使用到第一层封装里面去的,因为我们想要使用锁还是要定义第一层封装的类型才可以使用。所以才有了在第一层封装中将各个模版类型定义成自己的typedef。
template<class T> struct ScopedLockImpl { public: /** * @brief 构造函数 * @param[in] mutex Mutex */ ScopedLockImpl(T& mutex) :m_mutex(mutex) { m_mutex.lock(); m_locked = true; } /** * @brief 析构函数,自动释放锁 */ ~ScopedLockImpl() { unlock(); } /** * @brief 加锁 */ void lock() { if(!m_locked) { m_mutex.lock(); m_locked = true; } } /** * @brief 解锁 */ void unlock() { if(m_locked) { m_mutex.unlock(); m_locked = false; } } private: /// mutex T& m_mutex; /// 是否已上锁 bool m_locked; }; /** * @brief 局部读锁模板实现 */ template<class T> struct ReadScopedLockImpl { public: /** * @brief 构造函数 * @param[in] mutex 读写锁 */ ReadScopedLockImpl(T& mutex) :m_mutex(mutex) { m_mutex.rdlock(); m_locked = true; } /** * @brief 析构函数,自动释放锁 */ ~ReadScopedLockImpl() { unlock(); } /** * @brief 上读锁 */ void lock() { if(!m_locked) { m_mutex.rdlock(); m_locked = true; } } /** * @brief 释放锁 */ void unlock() { if(m_locked) { m_mutex.unlock(); m_locked = false; } } private: /// mutex T& m_mutex; /// 是否已上锁 bool m_locked; }; /** * @brief 局部写锁模板实现 */ template<class T> struct WriteScopedLockImpl { public: /** * @brief 构造函数 * @param[in] mutex 读写锁 */ WriteScopedLockImpl(T& mutex) :m_mutex(mutex) { m_mutex.wrlock(); m_locked = true; } /** * @brief 析构函数 */ ~WriteScopedLockImpl() { unlock(); } /** * @brief 上写锁 */ void lock() { if(!m_locked) { m_mutex.wrlock(); m_locked = true; } } /** * @brief 解锁 */ void unlock() { if(m_locked) { m_mutex.unlock(); m_locked = false; } } private: /// Mutex T& m_mutex; /// 是否已上锁 bool m_locked; };
3.在日志系统与配置系统中使用互斥量
在日志系统与配置系统使用互斥量是为了保证线程安全,因为日志系统写多读少所以使用Mutex,在配置系统中是读多写少,所以使用的是读写锁RWmutex。
在日志系统中需要进行互斥量添加的分别是Logger日志器,LogAppender日志输出地。在日志器中log()方法以及添加删除输出地函数需要和添加获取输出格式的函数添加锁,在日志输出地中需要对log()函数以及设置formatter的地方设置锁。还有一个是在日志管理类LoggerManager中需要加锁,在获取root日志器以及其他日志器的时候需要添加锁。
在配置系统中需要加读写锁的是ConfigVar模版类以及Config配置管理类,在ConfigVar中需要加锁的是Tostring()读锁,setValue()写锁,getValue()读锁,并且在添加回调机制等操作中也需要添加锁。
在Config管理类中,lookup()需要写锁,第二个lookup()需要读锁,LookupBase()需要读锁。在静态方法void Config::Visit(std::function<void(ConfigVarBase::ptr)> cb) 中,为了方便输出Config中的配置参数,特意可以用回调函数的方法去按照自己的定制进行输出,所以这里也需要读锁。
static RWMutexType& GetMutex() { static RWMutexType s_mutex; return s_mutex; }
上段代码是为了能够在获取锁之前就已经定义了锁,所以用静态局部变量的方法去获取锁。