When the queue is empty, the dequeue operation waits on a semaphore signaled by the enqueue operation.
1: #line 1215 "./lpsrc/flx_pthread.pak" 2: #ifndef __SLEEPQUEUE__ 3: #define __SLEEPQUEUE__ 4: #include <flx_pthread_config.hpp> 5: #include "pthread_mutex.hpp" 6: #include "pthread_condv.hpp" 7: 8: // interface for a consumer/producer queue. threads requesting a resource 9: // that isn't there block until one is available. push/pop re-entrant 10: 11: namespace flx { namespace pthread { 12: 13: // ******************************************************** 14: /// Thread safe bounded queue. 15: /// 16: /// The queue can be locked by setting bound=0. 17: /// In this state it can only be unlocked by setting a non-zero bound. 18: /// 19: /// If the bound is set to 1 (the default), 20: /// then the queue is always either empty or full. 21: /// An empty queue blocks readers until a writer sends some data. 22: /// A full queue blocks writers, until a reader reads the data. 23: /// Note that when the queue is empty a writer can write data 24: /// and continues without waiting for the data to be read. 25: // ******************************************************** 26: 27: class PTHREAD_EXTERN sleep_queue_t { 28: flx_condv_t size_changed; 29: void *lame_opaque; 30: size_t bound; 31: public: 32: flx_mutex_t member_lock; 33: sleep_queue_t(size_t); 34: ~sleep_queue_t(); 35: void enqueue(void*); 36: void* dequeue(); 37: void resize(size_t); 38: void wait_until_empty(); 39: }; 40: 41: }} // namespace pthread, flx 42: #endif 43:
1: #line 1259 "./lpsrc/flx_pthread.pak" 2: #include "pthread_sleep_queue.hpp" 3: #include <queue> // stl to the bloated rescue 4: #include <string.h> // strerror 5: 6: using namespace std; 7: 8: namespace flx { namespace pthread { 9: typedef queue<void*> void_queue; 10: 11: #define ELTQ ((void_queue*)lame_opaque) 12: 13: sleep_queue_t::sleep_queue_t(size_t n) : bound(n) 14: { 15: lame_opaque = new void_queue; 16: } 17: 18: // Much care is needed deleting a queue. 19: // A safe method is possible .. but not provided here 20: sleep_queue_t::~sleep_queue_t() 21: { 22: delete ELTQ; 23: } 24: 25: void sleep_queue_t::wait_until_empty() { 26: flx_mutex_locker_t l(member_lock); 27: while(!ELTQ->empty()) 28: size_changed.wait(&member_lock); 29: } 30: 31: void 32: sleep_queue_t::enqueue(void* elt) 33: { 34: flx_mutex_locker_t l(member_lock); 35: while(ELTQ->size() >= bound) // guard against spurious wakeups! 36: size_changed.wait(&member_lock); 37: ELTQ->push(elt); 38: size_changed.broadcast(); // cannot return an error 39: } 40: 41: void* 42: sleep_queue_t::dequeue() 43: { 44: flx_mutex_locker_t l(member_lock); 45: while(ELTQ->empty()) // guard against spurious wakeups! 46: size_changed.wait(&member_lock); 47: void *elt = ELTQ->front(); 48: ELTQ->pop(); 49: size_changed.broadcast(); 50: return elt; 51: } 52: 53: void 54: sleep_queue_t::resize(size_t n) 55: { 56: flx_mutex_locker_t l(member_lock); 57: bound = n; 58: // get things rolling again 59: size_changed.broadcast(); 60: } 61: 62: }} 63: 64: