8. Portable Job Queue

The class is actually a thread plus a job queue.
Start cpp section to pthread/pthread_work_fifo.hpp[1 /1 ]
     1: #line 1326 "./lpsrc/flx_pthread.pak"
     2: #ifndef __WORKER_FIFO__
     3: #define __WORKER_FIFO__
     4: #include <flx_pthread_config.hpp>
     5: #include "pthread_thread.hpp"
     6: #include "pthread_mutex.hpp"
     7: #include "pthread_sleep_queue.hpp"
     8: 
     9: namespace flx { namespace pthread {
    10: 
    11: /// Class of jobs to be queued in fifo for execution.
    12: class PTHREAD_EXTERN worker_task
    13: {
    14: public:
    15:   virtual ~worker_task() {}   // c++ should do this automatically
    16: 
    17:   /// function called by worker thread to carry out user job
    18:   virtual void doit() = 0;
    19: 
    20:   /// function called by worker thread after doit() is completed
    21:   /// used to notify job completion
    22:   virtual void finished() = 0; // finished hook (mi serve start gancia?)
    23: };
    24: 
    25: /// Linear job scheduler, executes jobs in turn from queue
    26: class PTHREAD_EXTERN worker_fifo
    27: {
    28:   sleep_queue_t fifo;
    29:   int nthreads;                 /// scheduled number of threads
    30: 
    31:   static void* thread_start(void*); // thread entry point, passed this
    32:   bool thread_loop_body();      // returns keep going flag
    33:   void stop_worker_thread();
    34:   void start_worker_thread();
    35: 
    36: public:
    37:   worker_fifo(int n, int m);   /// n: Q bound, m: # of threads
    38:   ~worker_fifo();
    39:   void add_worker_task(worker_task* task);
    40:   int get_nthreads()const;
    41:   void set_nthreads(int);
    42: };
    43: 
    44: }} // namespace pthread, flx
    45: #endif  // __WORKER_FIFO__
    46: 
End cpp section to pthread/pthread_work_fifo.hpp[1]
Start cpp section to pthread/pthread_work_fifo.cpp[1 /1 ]
     1: #line 1373 "./lpsrc/flx_pthread.pak"
     2: #include <stdio.h>    // printf
     3: #include "pthread_work_fifo.hpp"
     4: namespace flx { namespace pthread {
     5: 
     6: int worker_fifo::get_nthreads()const { return nthreads; }
     7: 
     8: void worker_fifo::set_nthreads(int n)
     9: {
    10:   while(nthreads<n) start_worker_thread();
    11:   while(nthreads>n) stop_worker_thread();
    12: }
    13: 
    14: void worker_fifo::start_worker_thread()
    15: {
    16:   ++nthreads;
    17:   //fprintf(stderr,"Spawn detached worker thread, count=%d\n",nthreads);
    18: #ifdef _WIN32
    19:   flx_detached_thread_t().init((LPTHREAD_START_ROUTINE)thread_start, this);
    20: #else
    21:   flx_detached_thread_t().init(thread_start, this);
    22: #endif
    23: }
    24: 
    25: worker_fifo::worker_fifo(int n, int m) : fifo(n), nthreads(0)
    26: {
    27:   set_nthreads(m);
    28: }
    29: 
    30: void
    31: worker_fifo::stop_worker_thread()
    32: {
    33:   //fprintf(stderr,"Kill detached worker thread, count=%d\n",nthreads);
    34:   --nthreads;
    35:   add_worker_task(NULL);    // thread safe takedown.
    36: }
    37: 
    38: worker_fifo::~worker_fifo()
    39: {
    40:   while(nthreads>0)stop_worker_thread();
    41:   fifo.wait_until_empty();
    42: }
    43: 
    44: // io thread entry point, passed this
    45: void*
    46: worker_fifo::thread_start(void* udat)
    47: {
    48:   worker_fifo*  fio = (worker_fifo*)udat;
    49:   while(fio->thread_loop_body()) ;
    50:   return 0;             // return status.
    51: }
    52: 
    53: // dequeues one task and executes it, calling finished hook. interprets
    54: // null task as a request to exit.
    55: bool
    56: worker_fifo::thread_loop_body()
    57: {
    58:   worker_task*  req = (worker_task*)fifo.dequeue();
    59:   //fprintf(stderr,"dequeued worker_task (%p)\n", req);
    60: 
    61:   if(!req) return false;        // finished, got quit signal
    62: 
    63:   req->doit();
    64:   req->finished();          // finish hook. I find this handy
    65: 
    66:   return true;            // keep going
    67: }
    68: 
    69: void
    70: worker_fifo::add_worker_task(worker_task* task)
    71: {
    72:   //fprintf(stderr,"adding worker task %p\n",task);
    73:   fifo.enqueue(task);         // don't worry, fifo is re-entrant
    74: }
    75: 
    76: }}
    77: 
    78: 
End cpp section to pthread/pthread_work_fifo.cpp[1]
Start felix section to lib/pthread.flx[1 /1 ]
     1: #line 1452 "./lpsrc/flx_pthread.pak"
     2: #import <flx.flxh>
     3: 
     4: header mutex_hxx = '#include "pthread_mutex.hpp"';
     5: header condv_hxx = '#include "pthread_condv.hpp"';
     6: header counter_hxx = '#include "pthread_counter.hpp"';
     7: header semaphore_hxx = '#include "pthread_semaphore.hpp"';
     8: header monitor_hxx = '#include "pthread_monitor.hpp"';
     9: 
    10: module Pthread
    11: {
    12:   requires package "flx_pthread";
    13:   open C_hack;
    14:   proc spawn_pthread(p:1->0)
    15:   {
    16:       var con = start p;              // get continuation of p
    17:       var fthr = mk_thread con;
    18:       svc$ svc_spawn_pthread fthr;
    19:   }
    20: 
    21:   type mutex = "flx_mutex_t" requires mutex_hxx;
    22:   proc lock: lvalue[mutex] = "$1.lock();";
    23:   proc unlock: lvalue[mutex] = "$1.unlock();";
    24:   proc lock (m:&mutex) { lock$ *m; }
    25:   proc unlock (m:&mutex) { unlock$ *m; }
    26: 
    27:   // pre-emptive thread channels (monitor)
    28:   type pchannel[t] = "flx::pthread::monitor_t*" requires monitor_hxx;
    29: 
    30:   fun mk_pchannel[t]: 1->pchannel[t] =
    31:     "new flx::pthread::monitor_t()"
    32:   ;
    33: 
    34:   proc _read[t]: pchannel[t] * ptr[ptr[t]] = "*$2 = (?1*)($1->dequeue());";
    35: 
    36:   proc read[t](v:&t,chan:pchannel[t]) {
    37:     var p : ptr[t];
    38:     _read (chan, addr p);
    39:     *v = *p;
    40:   }
    41: 
    42:   proc _write[t]: pchannel[t] * ptr[t] = "$1->enqueue((void*)$2);";
    43:   proc write[t](chan:pchannel[t], v:t) {
    44:     var ps = cast[ptr[t]]$ xnew v;
    45:     _write (chan,ps);
    46:   }
    47: }
    48: 
End felix section to lib/pthread.flx[1]
Start felix section to test/nd101.flx[1 /1 ]
     1: #line 1501 "./lpsrc/flx_pthread.pak"
     2: #import <flx.flxh>
     3: include "flx_faio";
     4: include "pthread";
     5: open Pthread;
     6: 
     7: print "Pthread spawning test"; endl;
     8: 
     9: proc thr (x:int) { print "Thread "; print x; endl; }
    10: 
    11: proc flx_main
    12: {
    13:   print "Running main\n";
    14:   var chan = mk_pchannel[int]();
    15:   var dummy: int;
    16: 
    17:   spawn_pthread { thr 1; write (chan,1); };
    18:   spawn_pthread { thr 2; write (chan,2); };
    19:   spawn_pthread { thr 3; write (chan,3); };
    20:   spawn_pthread { thr 4; write (chan,4); };
    21:   spawn_pthread { thr 5; write (chan,5); };
    22:   spawn_pthread { thr 6; write (chan,6); };
    23:   spawn_pthread { thr 7; write (chan,7); };
    24:   print "Spawned\n";
    25:   &dummy <- read chan; // 1
    26:   print "joined "; print dummy; endl;
    27:   &dummy <- read chan; // 2
    28:   print "joined "; print dummy; endl;
    29:   &dummy <- read chan; // 3
    30:   print "joined "; print dummy; endl;
    31:   &dummy <- read chan; // 4
    32:   print "joined "; print dummy; endl;
    33:   &dummy <- read chan; // 5
    34:   print "joined "; print dummy; endl;
    35:   &dummy <- read chan; // 6
    36:   print "joined "; print dummy; endl;
    37:   &dummy <- read chan; // 7
    38:   print "joined "; print dummy; endl;
    39:   print "Joined all\n";
    40: }
    41: 
    42: export proc flx_main of (1) as "flx_main";
    43: 
End felix section to test/nd101.flx[1]
Start felix section to test/nd102.flx[1 /1 ]
     1: #line 1545 "./lpsrc/flx_pthread.pak"
     2: #import <flx.flxh>
     3: include "pthread";
     4: include "flx_faio";
     5: open String;
     6: 
     7: print "Garbage collector world stop test"; endl;
     8: 
     9: proc randprint(n:int)
    10: {
    11:   var i = 5;
    12:   print$ "Start Thread number "+str(n); endl;
    13:   whilst i > 0 do
    14:     var d = double_of$ Cstdlib::rand()%10;
    15:     if d == 0.0 do
    16:       print "ZERO FOUND -- collecting!"; endl;
    17:       collect;
    18:       print "collected!"; endl;
    19:     done;
    20:     print$ "Thread "+str n +" Sleep #"+str i+" for "+str d+" sec"; endl;
    21:     --i;
    22:     Faio::sleep d;
    23:   done;
    24:   print$ "Finish Thread number "+str(n); endl;
    25: }
    26: 
    27: Pthread::spawn_pthread { randprint(1); };
    28: Pthread::spawn_pthread { randprint(2); };
    29: Pthread::spawn_pthread { randprint(3); };
    30: Pthread::spawn_pthread { randprint(4); };
    31: Pthread::spawn_pthread { randprint(5); };
    32: 
    33: print "Mainline done!"; endl;
End felix section to test/nd102.flx[1]