2. Async IO (deprecated)

Start cpp section to faio/faio_asyncio.hpp[1 /1 ]
     1: #line 353 "./lpsrc/flx_faio.pak"
     2: #ifndef __ASYNCIO__
     3: #define __ASYNCIO__
     4: #include <flx_faio_config.hpp>
     5: 
     6: // this file acts as go-between for my asynchronous interface for felix
     7: // programmes.
     8: 
     9: // portable. Not that asynchronous.
    10: 
    11: #include "demux_demuxer.hpp"        // sel_param, demuxer base
    12: #include "faio_drv.hpp"
    13: 
    14: namespace flx { namespace faio {
    15: 
    16: class FAIO_EXTERN flx_driver_request_base {
    17: public:
    18:     virtual ~flx_driver_request_base() {}       // so destructors work
    19:     // returns finished flag (async may fail or immediately finish)
    20:     virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f) = 0;
    21: };
    22: 
    23: // not actually asynchronous as such since the threads are woken synchronously
    24: // definitely portable. look out for this shared structure when flx threads
    25: // are run pre-emptively. start_async_op will need to be serialised.
    26: class FAIO_EXTERN async_copipe : public flx_driver_request_base {
    27: public:
    28:     enum { WINDWARD, LEEWARD, NUM_CHANNELS };
    29: 
    30:     virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
    31:     void connect(void* f, demux::sel_param* pb, bool reading, int channel);
    32:     void close_channel(int which);      // WINDWARD or LEEWARD
    33:     void disconnect();                  // will be used by close
    34: 
    35:     static async_copipe* create_copipe() { return new async_copipe; }
    36:     bool debug;
    37:     void set_debug(bool);
    38: 
    39: private:
    40:     enum { READER, WRITER, NUM_CNXNS };
    41: 
    42:     // if there's a thread here, it's sleeping
    43:     void*   thread[NUM_CNXNS];
    44:     demux::sel_param*  pb[NUM_CNXNS];
    45:     int         num_users;
    46: 
    47:     bool        channel_open[NUM_CHANNELS];
    48:     int         current_channel;
    49:     void wake_thread(int n, flx_drv* drv);
    50:     void wake_all_threads(flx_drv* drv);
    51: 
    52:     async_copipe();                     // to control alloc/dealloc
    53: };
    54: 
    55: class FAIO_EXTERN copipe_endpt {
    56:     int             read_channel;       // the channel this endpt reads on
    57: 
    58:     copipe_endpt(async_copipe* p, int rchan) :
    59:       read_channel(rchan), pipe(p), debug(false)
    60:     {}
    61: 
    62: public:
    63:     async_copipe*   pipe;               // endpt's copy (flx accessible)
    64: 
    65:     // 0 = no reads, 1 = no writes, 2 = no nothing (! = close, however)
    66:     void shutdown(int how);
    67:     int get_channel(bool reading);
    68:     bool debug;
    69:     void set_debug(bool d) { debug = d; }
    70: 
    71:     ~copipe_endpt() { pipe->disconnect(); }
    72:     static void pipe_pair(copipe_endpt* pair[2]);
    73: };
    74: 
    75: // sleeping
    76: class FAIO_EXTERN sleep_request
    77:   : public flx_driver_request_base, public demux::sleep_task
    78: {
    79:   thread_wakeup fw;
    80:   double      delta;
    81: public:
    82:   sleep_request() {}        // flx linkage
    83: 
    84:   sleep_request(double d) : delta(d) {}
    85: 
    86:   // from driver request
    87:   virtual bool start_async_op(demux::demuxer& demux, flx_drv* drv, void* f);
    88: 
    89:   // from sleep_task
    90:   virtual void fire();
    91: };
    92: 
    93: }} // namespace faio, flx
    94: #endif  // __ASYNCIO__
    95: 
End cpp section to faio/faio_asyncio.hpp[1]
Start cpp section to faio/faio_asyncio.cpp[1 /1 ]
     1: #line 449 "./lpsrc/flx_faio.pak"
     2: #include "faio_asyncio.hpp"
     3: #include <string.h>     // memmove
     4: #include <stdio.h>      // debugging
     5: 
     6: using namespace flx::demux;
     7: namespace flx { namespace faio {
     8: // cooperative pipes
     9: 
    10: // this is a bit interesting because the copipes are not asynchronous and
    11: // their work is actually all done in this function (once both ends have
    12: // read & written)
    13: 
    14: void
    15: async_copipe::set_debug(bool d) { debug = d; }
    16: 
    17: bool
    18: async_copipe::start_async_op(demux::demuxer& demux, flx_drv* drv, void* f)
    19: {
    20: // this is where a big lock will go for multithreaded driver
    21:     // we don't want to wake anyone twice
    22:     bool wake_caller = (f != thread[READER] && f != thread[WRITER]);
    23: 
    24:     // no channel => shutdown was called with no-one reading or writing
    25:     if(-1 == current_channel)
    26:         return true;                // wake caller
    27: 
    28:     // we may have one reader/writer (or even 2 with a multithreaded driver)
    29:     if(!channel_open[current_channel])
    30:     {
    31:         wake_all_threads(drv);
    32:         current_channel = -1;       // channel now undefined
    33:         return wake_caller;
    34:     }
    35:     // channel closed situations handled
    36: 
    37:     // if we don't have both endpts, then the op isn't finished
    38:     if(!(thread[READER] && thread[WRITER])) return wake_caller;
    39: 
    40:     // we have both ends. one or both must wake after this
    41: if(wake_caller) fprintf(stderr,"we should never have a 3rd party here!\n");
    42: 
    43:     // move data from writer to reader
    44:     long    len;
    45:     long    nb1 = pb[READER]->buffer_size-pb[READER]->bytes_written;
    46:     long    nb2 = pb[WRITER]->buffer_size-pb[WRITER]->bytes_written;
    47: 
    48:     // min(read bytes, write bytes)
    49:     len = (nb1 < nb2) ? nb1 : nb2;
    50: 
    51:     // write buf -> read buf
    52:     memmove(pb[READER]->buffer + pb[READER]->bytes_written,
    53:         pb[WRITER]->buffer + pb[WRITER]->bytes_written, len);
    54:     pb[READER]->bytes_written += len;
    55:     pb[WRITER]->bytes_written += len;
    56: 
    57:     // I could let the driver wake one of the threads, but it makes for
    58:     // easier reading if I do it myself:
    59:     int     num_woken = 0;
    60:     for(int i = 0; i < NUM_CNXNS; i++)
    61:     {
    62:         // wake up!
    63:         if(pb[i]->bytes_written == pb[i]->buffer_size)
    64:         {
    65:             wake_thread(i, drv);
    66:             num_woken++;
    67:         }
    68:     }
    69: 
    70:     if(num_woken == NUM_CNXNS)  // everyone woke up
    71:         current_channel = -1;   // channel now undefined
    72: 
    73:     // we're doing our own waking, so just say no, it isn't finished.
    74:     // not strictly honest, is it? driver waking is kind of lame anyway.
    75:     // it's our job.
    76:     return false;
    77: }
    78: 
    79: void
    80: async_copipe::wake_thread(int n, flx_drv* drv)
    81: {
    82:     drv->sched(thread[n]);
    83:     thread[n] = 0;          // not ours anymore
    84: }
    85: 
    86: void
    87: async_copipe::wake_all_threads(flx_drv* drv)
    88: {
    89:     for(int i = 0; i < NUM_CNXNS; i++)
    90:     {
    91:         if(thread[i]) wake_thread(i, drv);
    92:     }
    93: }
    94: 
    95: async_copipe::async_copipe() : debug(false)
    96: {
    97:     thread[READER] = 0;
    98:     thread[WRITER] = 0;
    99:     channel_open[WINDWARD] = true;  // both channels open = duplex io
   100:     channel_open[LEEWARD] = true;
   101:     current_channel = -1;           // current channel undefined
   102:     num_users = 2;                  // we delete this after 2 disconnects
   103: }
   104: 
   105: // this requires a driver request to make anything actually happen
   106: void
   107: async_copipe::close_channel(int which)
   108: {
   109:     // this should wake up the threads, but I don't have the queue.
   110:     // my choice is for the felix code to do an async op on this and
   111:     // re-evaluate, waking as necessary or hang on to the queue
   112:     channel_open[which] = false;
   113: }
   114: 
   115: void
   116: async_copipe::disconnect()
   117: {
   118:     if(debug)fprintf(stderr,"num_users before disconnect: %i\n", num_users);
   119:     if(--num_users == 0)
   120:     {
   121:         if(debug)fprintf(stderr,"deleting this!\n");
   122:         delete this;
   123:     }
   124: }
   125: 
   126: void
   127: async_copipe::connect(void* f, sel_param* inpb, bool reading, int channel)
   128: {
   129:     int i = (reading) ? READER : WRITER;
   130: 
   131:     if(-1 == current_channel)
   132:     {
   133:         current_channel = channel;
   134:     }
   135:     else if(current_channel != channel) // channel must agree if already chosen
   136:     {
   137:         if(debug)fprintf(stderr,"conflicting channels! make sure this causes a wake up!\n");
   138:         current_channel = -1;           // causes this thread to wake up
   139:         return;
   140:     }
   141: 
   142:     // there shouldn't be anything already there
   143:     if(f && thread[i])
   144:         if(debug)fprintf(stderr,"copipe conflict! results undefined!\n");
   145: 
   146:     thread[i] = f;
   147:     pb[i] = inpb;
   148: }
   149: 
   150: // copipe endpoints, just like socketpair. static.
   151: void
   152: copipe_endpt::pipe_pair(copipe_endpt* pair[2])
   153: {
   154: //fprintf(stderr,"this will leak if one fails, you buffoon!\n");
   155: // can just delete in that case. eh.
   156:     async_copipe* p = async_copipe::create_copipe();
   157:     // initialize with pipe and the channel they'll read on.
   158:     pair[0] = new copipe_endpt(p, async_copipe::WINDWARD);
   159:     pair[1] = new copipe_endpt(p, async_copipe::LEEWARD);
   160: }
   161: 
   162: int
   163: copipe_endpt::get_channel(bool reading)
   164: {
   165:     if(reading) return read_channel;
   166: 
   167:     return (read_channel == async_copipe::WINDWARD) ?
   168:             async_copipe::LEEWARD : async_copipe::WINDWARD;
   169: }
   170: 
   171: // this requires a driver request to make anything actually happen
   172: void
   173: copipe_endpt::shutdown(int how)
   174: {
   175:     int write_channel = get_channel(false);
   176: 
   177:     switch(how)
   178:     {
   179:         case 0:
   180:             // no further reads, shut read channel
   181:             pipe->close_channel(read_channel);
   182:         break;
   183:         case 1:
   184:             pipe->close_channel(write_channel);
   185:         break;
   186:         case 2:
   187:             // shut both channels, regardless
   188:             pipe->close_channel(async_copipe::WINDWARD);
   189:             pipe->close_channel(async_copipe::LEEWARD);
   190:         break;
   191:     }
   192: }
   193: 
   194: 
   195: // sleep task prio queue thing
   196: 
   197: bool
   198: sleep_request::start_async_op(demuxer& demux, flx_drv* drv, void* f)
   199: {
   200:   // fprintf(stderr,"driver called sleep_task start_async_op code\n");
   201:   // important: copy down the thread and the driver
   202:   RECORD_THREAD_INFO(fw);
   203: 
   204:   drv->get_sleepers()->add_sleep_request(this, delta);
   205: 
   206:   return false;   // no wakeup
   207: }
   208: 
   209: // from async io thread
   210: void
   211: sleep_request::fire()
   212: {
   213:   fw.wake();    // wake thread
   214: }
   215: 
   216: }}
   217: 
End cpp section to faio/faio_asyncio.cpp[1]