8. Portable Job Queue

Thread pool.
Start cpp section to pthread/pthread_work_fifo.hpp[1 /1 ]
     1: #line 1352 "./lpsrc/flx_pthread.pak"
     2: #ifndef __FLX_PTHREAD_WORKER_FIFO_H__
     3: #define __FLX_PTHREAD_WORKER_FIFO_H__
     4: #include <flx_pthread_config.hpp>
     5: #include "pthread_thread.hpp"
     6: #include "pthread_mutex.hpp"
     7: #include "pthread_sleep_queue.hpp"
     8: 
     9: namespace flx { namespace pthread {
    10: 
    11: /// Class of jobs to be queued in fifo for execution.
    12: class PTHREAD_EXTERN worker_task
    13: {
    14: public:
    15:   virtual ~worker_task() {}   // c++ should do this automatically
    16: 
    17:   /// function called by worker thread to carry out user job
    18:   virtual void doit() = 0;
    19: 
    20:   /// function called by worker thread after doit() is completed
    21:   /// used to notify job completion
    22:   virtual void finished() = 0; // finished hook (mi serve start gancia?)
    23: };
    24: 
    25: /// Job scheduler, executes jobs in turn from queue
    26: class PTHREAD_EXTERN worker_fifo
    27: {
    28:   flx_mutex_t nlock;
    29:   int nthreads;                 /// scheduled number of threads
    30:   sleep_queue_t fifo;
    31: 
    32:   static void thread_start(void*); // thread entry point, passed this
    33:   bool thread_loop_body();      // returns keep going flag
    34:   void stop_worker_thread();
    35:   void start_worker_thread();
    36: 
    37: public:
    38:   worker_fifo(int n, int m);   /// n: Q bound, m: # of threads
    39:   ~worker_fifo();
    40:   void add_worker_task(worker_task* task);
    41:   int get_nthreads();
    42:   void set_nthreads(int);
    43: };
    44: 
    45: }} // namespace pthread, flx
    46: #endif  // __WORKER_FIFO__
    47: 
End cpp section to pthread/pthread_work_fifo.hpp[1]
Start cpp section to pthread/pthread_work_fifo.cpp[1 /1 ]
     1: #line 1400 "./lpsrc/flx_pthread.pak"
     2: #include <stdio.h>    // printf
     3: #include "pthread_work_fifo.hpp"
     4: namespace flx { namespace pthread {
     5: 
     6: int worker_fifo::get_nthreads() {
     7:   flx_mutex_locker_t dummy(nlock);
     8:   return nthreads;
     9: }
    10: 
    11: void worker_fifo::set_nthreads(int n)
    12: {
    13:   flx_mutex_locker_t dummy(nlock);
    14:   while(nthreads<n) start_worker_thread();
    15:   while(nthreads>n) stop_worker_thread();
    16: }
    17: 
    18: void worker_fifo::start_worker_thread()
    19: {
    20:   ++nthreads;
    21:   //fprintf(stderr,"Spawn detached worker thread, count=%d\n",nthreads);
    22:   flx_detached_thread_t().init(thread_start, this);
    23: }
    24: 
    25: worker_fifo::worker_fifo(int n, int m) : nthreads(0), fifo(n)
    26: {
    27:   set_nthreads(m);
    28: }
    29: 
    30: void
    31: worker_fifo::stop_worker_thread()
    32: {
    33:   //fprintf(stderr,"Kill detached worker thread, count=%d\n",nthreads);
    34:   --nthreads;
    35:   add_worker_task(NULL);    // thread safe takedown.
    36: }
    37: 
    38: worker_fifo::~worker_fifo()
    39: {
    40:   while(nthreads>0)stop_worker_thread();
    41:   fifo.wait_until_empty();
    42: }
    43: 
    44: // io thread entry point, passed this
    45: void
    46: worker_fifo::thread_start(void* udat)
    47: {
    48:   worker_fifo*  fio = (worker_fifo*)udat;
    49:   while(fio->thread_loop_body()) ;
    50: }
    51: 
    52: // dequeues one task and executes it, calling finished hook. interprets
    53: // null task as a request to exit.
    54: bool
    55: worker_fifo::thread_loop_body()
    56: {
    57:   worker_task*  req = (worker_task*)fifo.dequeue();
    58:   //fprintf(stderr,"dequeued worker_task (%p)\n", req);
    59: 
    60:   if(!req) return false;        // finished, got quit signal
    61: 
    62:   req->doit();
    63:   req->finished();          // finish hook. I find this handy
    64: 
    65:   return true;            // keep going
    66: }
    67: 
    68: void
    69: worker_fifo::add_worker_task(worker_task* task)
    70: {
    71:   //fprintf(stderr,"adding worker task %p\n",task);
    72:   fifo.enqueue(task);         // don't worry, fifo is re-entrant
    73: }
    74: 
    75: }}
    76: 
    77: 
End cpp section to pthread/pthread_work_fifo.cpp[1]
Start felix section to lib/pthread.flx[1 /1 ]
     1: #line 1478 "./lpsrc/flx_pthread.pak"
     2: #import <flx.flxh>
     3: 
     4: header pthread_hxx = '#include "pthread_thread.hpp"';
     5: header mutex_hxx = '#include "pthread_mutex.hpp"';
     6: header condv_hxx = '#include "pthread_condv.hpp"';
     7: header counter_hxx = '#include "pthread_counter.hpp"';
     8: header semaphore_hxx = '#include "pthread_semaphore.hpp"';
     9: header monitor_hxx = '#include "pthread_monitor.hpp"';
    10: header work_fifo_hxx = '#include "pthread_work_fifo.hpp"';
    11: 
    12: module Pthread
    13: {
    14:   requires package "flx_pthread";
    15:   open C_hack;
    16: 
    17:   type job_queue = "flx::pthread::worker_fifo*" requires work_fifo_hxx;
    18: 
    19:   gen mk_job_queue: int * int -> job_queue = "new flx::pthread::worker_fifo($1,$2)";
    20: 
    21:   proc spawn_pthread(p:1->0)
    22:   {
    23:       var con = start p;              // get continuation of p
    24:       var fthr = mk_thread con;
    25:       svc$ svc_spawn_pthread fthr;
    26:   }
    27: 
    28:   type mutex = "flx_mutex_t" requires mutex_hxx;
    29:   proc lock: lvalue[mutex] = "$1.lock();";
    30:   proc unlock: lvalue[mutex] = "$1.unlock();";
    31:   proc lock (m:&mutex) { lock$ *m; }
    32:   proc unlock (m:&mutex) { unlock$ *m; }
    33: 
    34:   // pre-emptive thread channels (monitor)
    35:   type pchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
    36: 
    37:   fun mk_pchannel[t]: 1->pchannel[t] =
    38:     "new flx::pthread::monitor_t()"
    39:   ;
    40: 
    41:   proc _read[t]: pchannel[t] * ptr[ptr[t]] = "*$2 = (?1*)($1->dequeue());";
    42: 
    43:   proc read[t](v:&t,chan:pchannel[t]) {
    44:     var p : ptr[t];
    45:     _read (chan, addr p);
    46:     *v = *p;
    47:   }
    48: 
    49:   proc _write[t]: pchannel[t] * ptr[t] = "$1->enqueue((void*)$2);";
    50:   proc write[t](chan:pchannel[t], v:t) {
    51:     var ps = cast[ptr[t]]$ xnew v;
    52:     _write (chan,ps);
    53:   }
    54: }
    55: 
End felix section to lib/pthread.flx[1]