队列以及锁和条件变量的封装,注意队列的阻塞在生产者消费者代码中实现,代码中的队列只负责任务的生产消费场所
#include <bits/stdc++.h>
#include <pthread.h>
#include <unistd.h>
using namespace std;
struct Data {
int data;
};
class Mutex {
public:
Mutex()
{
pthread_mutex_init(&_mutex, 0);
pthread_cond_init(&_read, 0);
pthread_cond_init(&_write, 0);
}
void lock()
{
pthread_mutex_lock(&_mutex);
}
void unlock()
{
pthread_mutex_unlock(&_mutex);
}
void wait_read()
{
pthread_cond_wait(&_read, &_mutex);
}
void wait_write()
{
pthread_cond_wait(&_write, &_mutex);
}
void notify_read()
{
pthread_cond_signal(&_read);
}
void notify_write()
{
pthread_cond_signal(&_write);
}
void notify_all()
{
pthread_cond_broadcast(&_read);
pthread_cond_broadcast(&_write);
}
void destroy()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_read);
pthread_cond_destroy(&_write);
}
pthread_mutex_t _mutex;
pthread_cond_t _read;
pthread_cond_t _write;
};
class MutexGuard {
public:
MutexGuard()
{
_mutex = new Mutex;
}
~MutexGuard()
{
delete _mutex;
}
Mutex* _mutex;
};
class Queue {
public:
Queue(int capacity)
: _capacity(capacity)
, _len(0)
{
}
bool is_full();
bool is_empty();
void enQueue(const Data& data);
Data deQueue();
int _len;
int _capacity;
deque<Data> queue;
};
bool Queue::is_empty()
{
if (_len == 0) {
return true;
} else {
return false;
}
}
bool Queue::is_full()
{
if (_len >= _capacity) {
return true;
} else {
return false;
}
}
void Queue::enQueue(const Data& data)
{
if (_len >= _capacity) {
cout << "queue full" << endl;
sleep(1);
return;
}
cout << "produce data: " << data.data << endl;
queue.push_back(data);
_len++;
sleep(1);
}
Data Queue::deQueue()
{
if (_len == 0) {
cout << "queue empty" << endl;
}
Data res = queue.front();
cout << "consumer data: " << res.data << endl;
queue.pop_front();
_len--;
sleep(1);
return res;
}
生产者消费者代码,两个条件变量一个互斥锁实现
#include "data.h"
using namespace std;
MutexGuard m;
void* producer(void* q)
{
Queue* queue = (Queue*)q;
Data d;
d.data = 100;
while (1) {
m._mutex->lock();
while (queue->is_full()) {
cout << "queue is full, wait for the consumer..." << endl;
m._mutex->wait_write();
}
queue->enQueue(d);
m._mutex->notify_read();
m._mutex->unlock();
}
return 0;
}
void* consumer(void* q)
{
Queue* queue = (Queue*)q;
while (1) {
m._mutex->lock();
while (queue->is_empty()) {
cout << "queue is empty, wait for the producer..." << endl;
m._mutex->wait_read();
}
queue->deQueue();
m._mutex->notify_write();
m._mutex->unlock();
}
return 0;
}
int main()
{
Queue q(10);
pthread_t producer_tid[6];
pthread_t consumer_tid[3];
for (int i = 0; i < 1; i++) {
pthread_create(&producer_tid[i], 0, producer, (void*)&q);
}
for (int j = 0; j < 1 ; j++) {
pthread_create(&consumer_tid[j], 0, consumer, (void*)&q);
}
sleep(100);
}