8. Portable Job Queue
Thread pool.
Start cpp section to pthread/pthread_work_fifo.hpp[1
/1
]
1: #line 1352 "./lpsrc/flx_pthread.pak"
2:
3:
4:
5:
6:
7:
8:
9: namespace flx { namespace pthread {
10:
11:
12: class PTHREAD_EXTERN worker_task
13: {
14: public:
15: virtual ~worker_task() {}
16:
17:
18: virtual void doit() = 0;
19:
20:
21:
22: virtual void finished() = 0;
23: };
24:
25:
26: class PTHREAD_EXTERN worker_fifo
27: {
28: flx_mutex_t nlock;
29: int nthreads;
30: sleep_queue_t fifo;
31:
32: static void thread_start(void*);
33: bool thread_loop_body();
34: void stop_worker_thread();
35: void start_worker_thread();
36:
37: public:
38: worker_fifo(int n, int m);
39: ~worker_fifo();
40: void add_worker_task(worker_task* task);
41: int get_nthreads();
42: void set_nthreads(int);
43: };
44:
45: }}
46:
47:
Start cpp section to pthread/pthread_work_fifo.cpp[1
/1
]
1: #line 1400 "./lpsrc/flx_pthread.pak"
2:
3:
4: namespace flx { namespace pthread {
5:
6: int worker_fifo::get_nthreads() {
7: flx_mutex_locker_t dummy(nlock);
8: return nthreads;
9: }
10:
11: void worker_fifo::set_nthreads(int n)
12: {
13: flx_mutex_locker_t dummy(nlock);
14: while(nthreads<n) start_worker_thread();
15: while(nthreads>n) stop_worker_thread();
16: }
17:
18: void worker_fifo::start_worker_thread()
19: {
20: ++nthreads;
21:
22: flx_detached_thread_t().init(thread_start, this);
23: }
24:
25: worker_fifo::worker_fifo(int n, int m) : nthreads(0), fifo(n)
26: {
27: set_nthreads(m);
28: }
29:
30: void
31: worker_fifo::stop_worker_thread()
32: {
33:
34: --nthreads;
35: add_worker_task(NULL);
36: }
37:
38: worker_fifo::~worker_fifo()
39: {
40: while(nthreads>0)stop_worker_thread();
41: fifo.wait_until_empty();
42: }
43:
44:
45: void
46: worker_fifo::thread_start(void* udat)
47: {
48: worker_fifo* fio = (worker_fifo*)udat;
49: while(fio->thread_loop_body()) ;
50: }
51:
52:
53:
54: bool
55: worker_fifo::thread_loop_body()
56: {
57: worker_task* req = (worker_task*)fifo.dequeue();
58:
59:
60: if(!req) return false;
61:
62: req->doit();
63: req->finished();
64:
65: return true;
66: }
67:
68: void
69: worker_fifo::add_worker_task(worker_task* task)
70: {
71:
72: fifo.enqueue(task);
73: }
74:
75: }}
76:
77:
Start felix section to lib/pthread.flx[1
/1
]
1: #line 1478 "./lpsrc/flx_pthread.pak"
2:
3:
4: header pthread_hxx = '#include "pthread_thread.hpp"';
5: header mutex_hxx = '#include "pthread_mutex.hpp"';
6: header condv_hxx = '#include "pthread_condv.hpp"';
7: header counter_hxx = '#include "pthread_counter.hpp"';
8: header semaphore_hxx = '#include "pthread_semaphore.hpp"';
9: header monitor_hxx = '#include "pthread_monitor.hpp"';
10: header work_fifo_hxx = '#include "pthread_work_fifo.hpp"';
11:
12: module Pthread
13: {
14: requires package "flx_pthread";
15: open C_hack;
16:
17: type job_queue = "flx::pthread::worker_fifo*" requires work_fifo_hxx;
18:
19: gen mk_job_queue: int * int -> job_queue = "new flx::pthread::worker_fifo($1,$2)";
20:
21: proc spawn_pthread(p:1->0)
22: {
23: var con = start p;
24: var fthr = mk_thread con;
25: svc$ svc_spawn_pthread fthr;
26: }
27:
28: type mutex = "flx_mutex_t" requires mutex_hxx;
29: proc lock: lvalue[mutex] = "$1.lock();";
30: proc unlock: lvalue[mutex] = "$1.unlock();";
31: proc lock (m:&mutex) { lock$ *m; }
32: proc unlock (m:&mutex) { unlock$ *m; }
33:
34:
35: type pchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
36:
37: fun mk_pchannel[t]: 1->pchannel[t] =
38: "new flx::pthread::monitor_t()"
39: ;
40:
41: proc _read[t]: pchannel[t] * ptr[ptr[t]] = "*$2 = (?1*)($1->dequeue());";
42:
43: proc read[t](v:&t,chan:pchannel[t]) {
44: var p : ptr[t];
45: _read (chan, addr p);
46: *v = *p;
47: }
48:
49: proc _write[t]: pchannel[t] * ptr[t] = "$1->enqueue((void*)$2);";
50: proc write[t](chan:pchannel[t], v:t) {
51: var ps = cast[ptr[t]]$ xnew v;
52: _write (chan,ps);
53: }
54: }
55: