1. demux

#@h=tangler('tmp/flx_demux_config.hpp') #@select(h) #// This is a fake flx_demux_config.h to be used at config time, before #// the rtl proper exists. It contains just enough info to compile #// a few of the demuxers. ##define DEMUX_EXTERN
Start cpp section to rtl/flx_demux_config.hpp[1 /1 ]
     1: #line 11 "./lpsrc/flx_demux.pak"
     2: #ifndef __FLX_DEMUX_CONFIG_H__
     3: #define __FLX_DEMUX_CONFIG_H__
     4: #include "flx_rtl_config.hpp"
     5: #ifdef BUILD_DEMUX
     6: #define DEMUX_EXTERN FLX_EXPORT
     7: #else
     8: #define DEMUX_EXTERN FLX_IMPORT
     9: #endif
    10: #endif
    11: 
    12: 
End cpp section to rtl/flx_demux_config.hpp[1]
Start cpp section to demux/flx_demux.hpp[1 /1 ]
     1: #line 24 "./lpsrc/flx_demux.pak"
     2: #ifndef __FLX_DEMUX_H__
     3: #define __FLX_DEMUX_H__
     4: #include <flx_demux_config.hpp>
     5: #include "pthread_thread.hpp"
     6: 
     7: #if FLX_WIN32
     8:  #include "demux_iocp_demuxer.hpp"
     9:  namespace flx { namespace demux {
    10:    typedef iocp_demuxer flx_demuxer_t;
    11:  }}
    12: #elif FLX_HAVE_KQUEUE_DEMUXER
    13:  #include "demux_kqueue_demuxer.hpp"
    14:   namespace flx { namespace demux {
    15:     typedef kqueue_demuxer flx_demuxer_t;
    16:   }}
    17: #elif FLX_HAVE_EVTPORTS
    18:  #include "demux_evtport_demuxer.hpp"
    19:   namespace flx { namespace demux {
    20:     typedef evtport_demuxer flx_demuxer_t;
    21:   }}
    22: #elif FLX_HAVE_EPOLL
    23:  #include "demux_epoll_demuxer.hpp"
    24:   namespace flx { namespace demux {
    25:     typedef epoll_demuxer flx_demuxer_t;
    26:   }}
    27: #elif FLX_HAVE_POLL
    28:  // NB!: on osx 10.3 poll exists, but is a poor cousin emulation layer on
    29:  // top of select. however, 10.3 has kqueues (above), so should be ok...
    30:  #include "demux_ts_poll_demuxer.hpp"
    31:   namespace flx { namespace demux {
    32:     typedef ts_poll_demuxer flx_demuxer_t;
    33:   }}
    34: #else
    35:  #include "demux_ts_select_demuxer.hpp"
    36:   namespace flx { namespace demux {
    37:     typedef ts_select_demuxer flx_demuxer_t;
    38:   }}
    39: #endif
    40: 
    41: 
    42: namespace flx { namespace demux {
    43: DEMUX_EXTERN flx_demuxer_t * make_std_demuxer();
    44: }}
    45: 
    46: #endif
    47: 
End cpp section to demux/flx_demux.hpp[1]
Start cpp section to demux/flx_demux.cpp[1 /1 ]
     1: #line 72 "./lpsrc/flx_demux.pak"
     2: #include "flx_demux.hpp"
     3: #include <stdio.h>
     4: #include <stdlib.h>
     5: 
     6: namespace flx { namespace demux {
     7: 
     8: // the thread which runs the demuxer polling loop
     9: static void
    10: pthread_thread(void* udat)
    11: {
    12:     demuxer*    d = (demuxer*)udat;
    13: 
    14:     while(1)
    15:     {
    16:         //fprintf(stderr, "ETHREAD ABOUT TO WAIT\n");
    17:         d->wait();          // this does it
    18:         //fprintf(stderr, "ETHREAD CHECKING QUIT FLAG\n");
    19:         demux_quit_flag* f = d->get_quit_flag();
    20:         if(f)
    21:         {
    22:           // got a quit flag - this is the very last thing we do before
    23:           // exiting. don't use the demuxer after this as it's probably been
    24:           // destructed.
    25:           //fprintf(stderr, "ETHREAD GOT QUIT FLAG, SIGNALLING AND EXITING\n");
    26:           f->signal_true();
    27:           // in the case of a system takedown there's no guarantee that
    28:           // anything after the signal_finish will be run at all, so this
    29:           // is not a good place to put anything important.
    30:           break;  // outta here
    31:         }
    32:     }
    33:     //fprintf(stderr, "ETHREAD EXITING\n");
    34:     // fprintf(stderr, "proto_async was asked to quit...\n");
    35: }
    36: 
    37: flx_demuxer_t *
    38: make_std_demuxer()
    39: {
    40:   flx_demuxer_t *d = new flx_demuxer_t();
    41:   pthread::flx_thread_t ethread;
    42:   if(ethread.init(pthread_thread, d) == -1)
    43:   {
    44:     fprintf(stderr,"Proto_async thread init failure\n");
    45:     exit(1);
    46:   }
    47:   return d;
    48: }
    49: }}
    50: 
End cpp section to demux/flx_demux.cpp[1]
Start felix section to lib/flx_demux.flx[1 /1 ]
     1: #line 123 "./lpsrc/flx_demux.pak"
     2: module Demux
     3: {
     4:   type demuxer = "flx::demux::flx_demuxer_t*"
     5:     requires header '#include "flx_demux.hpp"'
     6:   ;
     7:   gen mk_sys_demux: 1->demuxer = "flx::demux::make_std_demuxer()";
     8:   val sys_demux =  mk_sys_demux();
     9: }
    10: 
End felix section to lib/flx_demux.flx[1]
Start cpp section to demux/demux_demuxer.hpp[1 /1 ]
     1: #line 134 "./lpsrc/flx_demux.pak"
     2: #ifndef __FLX_DEMUX_DEMUXER_H__
     3: #define __FLX_DEMUX_DEMUXER_H__
     4: #include <flx_demux_config.hpp>
     5: 
     6: namespace flx { namespace demux {
     7: 
     8: struct sel_param {
     9:   char*   buffer;           // set on input
    10:   long    buffer_size;        // set on input
    11:   long    bytes_written;        // set on input and output
    12: 
    13:   bool    finished() { return bytes_written == buffer_size; }
    14: };
    15: 
    16: // rename ...
    17: // read/write flags - they're no longer mutually exclusive
    18: enum { PDEMUX_READ = 1, PDEMUX_WRITE = 2, PDEMUX_EOF=4, PDEMUX_ERROR=8 };
    19: 
    20: // base class/hook for implementing thread safe multithreaded demux quit
    21: // not that useful for single threaded implementations.
    22: class DEMUX_EXTERN demux_quit_flag
    23: {
    24: public:
    25:   virtual void signal_true() = 0; // = signal finish
    26:   virtual ~demux_quit_flag() {}
    27: };
    28: 
    29: // ********************************************************
    30: /// Demux base.
    31: // ********************************************************
    32: class DEMUX_EXTERN demuxer {
    33: protected:
    34:   // wait for outstanding events. may return before given events, so
    35:   // check your conditions. I've turned of all the timeouts that cause
    36:   // this, but don't rely on it!
    37:   // FACTOR. Give poll a greedy interface
    38:   virtual void  get_evts(bool poll) = 0;
    39: 
    40:   // for clean async takedown. contents guaranteed to be valid until
    41:   // quit_flag->signal_true is called
    42:   demux_quit_flag* quit_flag;
    43: public:
    44:   demuxer() : quit_flag(0) {}
    45:   virtual ~demuxer() {}
    46: 
    47:   void wait() { get_evts(false); }
    48:   void poll() { get_evts(true); }
    49: 
    50:   // ask users of demuxer to exit. not guarded. be sure to either set & get
    51:   // this flag from only one thread (with a wait/wakeup callback - see
    52:   // self_piper) or by using a memory barrier.
    53:   virtual demux_quit_flag* get_quit_flag() { return quit_flag; }
    54:   virtual void set_quit_flag(demux_quit_flag* f) { quit_flag = f; }
    55: };
    56: 
    57: // base class for callback from demuxer. useful when used in conjuction
    58: // with the self piper for implementing threadsafe demuxer quit and
    59: // guaranteeing responsiveness to new sockets.
    60: // run in the same thread that called d->wait/poll.
    61: class DEMUX_EXTERN demux_callback {
    62: public:
    63:   virtual void callback(demuxer* d) = 0;
    64:   virtual ~demux_callback() {}
    65: };
    66: 
    67: }} // namespace demux, flx
    68: #endif  /* __DEMUXER__ */
    69: 
End cpp section to demux/demux_demuxer.hpp[1]
Start cpp section to demux/demux_timer_queue.hpp[1 /1 ]
     1: #line 204 "./lpsrc/flx_demux.pak"
     2: #ifndef __FLX_DEMUX_TIMER_QUEUE_H__
     3: #define __FLX_DEMUX_TIMER_QUEUE_H__
     4: 
     5: #include <flx_demux_config.hpp>
     6: 
     7: namespace flx { namespace demux {
     8: 
     9: // trying to factor out code to share between pc & posix versions
    10: 
    11: // class sleep_task : public worker_task
    12: // may not need time in here - just the wakeup - something I surely have
    13: // somewhere else.
    14: class DEMUX_EXTERN sleep_task
    15: {
    16: public:
    17:     virtual ~sleep_task() {}
    18: 
    19:     virtual void fire() = 0;
    20: };
    21: 
    22: class DEMUX_EXTERN timer_queue
    23: {
    24: public:
    25:     virtual ~timer_queue() {}
    26: 
    27:     virtual void add_sleep_request(sleep_task* st, double delta) = 0;
    28:     virtual void add_abs_sleep_request(sleep_task* st, double when) = 0;
    29: 
    30:     // bad design - this is actually implemented in the descendent classes,
    31:     // which limits the number of such classes probably to one.
    32:     static void get_time(double& t);        // in seconds from some ref pt
    33: };
    34: 
    35: DEMUX_EXTERN timer_queue *mk_timer_queue();
    36: 
    37: }} // namespace demux, flx
    38: 
    39: #endif
    40: 
End cpp section to demux/demux_timer_queue.hpp[1]
Start cpp section to demux/demux_demuxer.cpp[1 /1 ]
     1: #line 245 "./lpsrc/flx_demux.pak"
     2: #include "demux_demuxer.hpp"
     3: 
     4: // nothing here atm ..
     5: 
End cpp section to demux/demux_demuxer.cpp[1]
Start cpp section to demux/demux_quitter.hpp[1 /1 ]
     1: #line 251 "./lpsrc/flx_demux.pak"
     2: 
     3: #ifndef __FLX_DEMUX_QUITTER_H__
     4: #define __FLX_DEMUX_QUITTER_H__
     5: 
     6: #include <flx_demux_config.hpp>
     7: #include "demux_demuxer.hpp"  // demuxers
     8: 
     9: #if FLX_WIN32
    10: #include "demux_wself_piper.hpp" // win32 self piper
    11: #else
    12: #include "demux_self_piper.hpp" // posix self piper
    13: #endif
    14: 
    15: #include "pthread_waitable_bool.hpp"
    16: 
    17: namespace flx { namespace demux {
    18: 
    19: // quits a demuxer
    20: class DEMUX_EXTERN demux_quitter
    21:         : public demux_callback, public demux_quit_flag {
    22:   // self pipes for getting demuxer attention
    23: #if FLX_WIN32
    24:   wself_piper sp;
    25: #else
    26:   self_piper sp;
    27: #endif
    28:   pthread::waitable_bool finished;  // initially false
    29:   void callback(demuxer* demux); // called back by demuxer in event thread.
    30:   virtual void signal_true(); // signal finish, from demux_quit_flag
    31: public:
    32:   void quit(demuxer* demux); // blocks until event thread exits
    33: };
    34: 
    35: } }
    36: 
    37: #endif
    38: 
End cpp section to demux/demux_quitter.hpp[1]
Start cpp section to demux/demux_quitter.cpp[1 /1 ]
     1: #line 290 "./lpsrc/flx_demux.pak"
     2: #include "demux_quitter.hpp"
     3: #include <stdio.h>
     4: 
     5: namespace flx { namespace demux {
     6: 
     7: // this is called by the demuxer, from its event thread.
     8: // for this reason it can modify the demuxer with tranquility, asking it
     9: // to quit, by setting the "quit flag". this is an obj with a virtual
    10: // signal_true method which the event thread/demuxer user calls, in effect
    11: // promising to never interact with that demuxer ever again. what to do with
    12: // the wakeups that demuxer still knows about is unclear, and with most
    13: // demuxer impls there isn't even a way to know which wakeups are outstanding.
    14: // it hasn't been a problem yet.
    15: void
    16: demux_quitter::callback(demuxer* demux)
    17: {
    18:   //fprintf(stderr, "quitter callback\n");
    19:   demux->set_quit_flag(this);
    20: }
    21: 
    22: // call this last thing before event thread exit, or more explicitly
    23: // call it before never touching the given demuxer ever again. see?
    24: // it is linked to demuxers. a better name for this method would be
    25: // i_promise_to_never_use_the_associated_object_ever_again_clear_question_mark
    26: void
    27: demux_quitter::signal_true() // signal finish, from demux_quit_flag
    28: {
    29:   finished.signal_true();
    30:   // do NOTHING here, we've probably already been destructed, mr anderson.
    31:   // I told you, my name is "neil".
    32: }
    33: 
    34: void
    35: demux_quitter::quit(demuxer* demux)
    36: {
    37:    // fprintf(stderr, "trying to quit demuxer...\n");
    38:    // install self piper, with our callback
    39:    sp.install(demux, this);
    40:    // wake demuxer, getting our callback called, which sets quit flag
    41:    sp.wake();
    42:    // wait for quit flag to be signalled by exiting event thread
    43:    finished.wait_until_true();
    44:    // event thread exited
    45: }
    46: 
    47: } }
    48: 
End cpp section to demux/demux_quitter.cpp[1]


1.1. OS specific demux codes