1. demux

$Log: flx_demux.pak,v $ Revision 1.32 2006/08/04 07:07:08 skaller get rid of debugging stuff Revision 1.31 2006/07/24 18:22:46 skaller Fix epoll bug which ignored epoll bug in kernels earlier than 2.6.9 .. epoll_ctl requires a non-NULL event address even though the argument is not used. Revision 1.30 2006/07/19 17:57:52 rfistman wrote self pipe for windows, not happy with it though because it has to use a named pipe (anon pipes don't work in nonblock mode). made demux_quitter portable and now quitting iocp demuxer cleanly added demux callbacks which are currently used for quitting and making demuxers responsive to new sockets. fixed unbalanced quote in ./bin/flx script Revision 1.29 2006/07/14 16:00:28 rfistman generalised demux quit flag into a condition variable that is picked up and signalled just before the event thread exits. this was yet another race condition in which a quit and destructed demuxer was still in use by its event thread. made get/set quit flag virtual so that "ts" style demuxers (those that encapsulate a non ts demuxer) report the quit flag properly. factored kqueue quit into async_quit for making sure that waiting thread in thread safe demuxers exits before the demuxer destructor completes. affects ts_poll_demuxer, ts_select_demuxer, evtport_demuxer still to do: same for iocp_demuxer. Revision 1.28 2006/07/13 18:22:55 idadesub fixes for freebsd. hopefully this won't break any other builds Revision 1.27 2006/06/22 06:59:28 rfistman added poll ts_poll demuxer. conditionalised demuxer usage, so linux no longer implies epoll, and solaris no longer implies evtports. this means that older systems should fall back to select and that we may get the rare nice surprise of finding a system that supports an unexpected demuxer (like osx10.4 with poll) or a linux with kqueues. who knows. ooh, case in point: cygwin supports poll. It spews warnings but it does work. Revision 1.26 2006/05/30 18:53:22 rfistman fixed exception handler linkage on win32 flx_rtl_config now stops Windows.h on VS toolchain including winsock vers 1 disabled HAVE_ISBLANK in tre config file to unbreak win32 build removed async bootstrapping from pkg_config as it's no longer needed Revision 1.25 2006/04/30 05:56:29 rfistman minimal change to test commit emails Revision 1.24 2006/04/30 02:09:37 rfistman got rid of suckitnsee kqueue config (too complicated with pthread use) added a quitter to ts_select_demuxer to fix crash fixed string exception in demo web server caused by POSTs Revision 1.23 2006/04/27 00:53:03 rfistman added demux_quitter to take down kqueue nicely. a simple quitting selfpipe trick was not enough as the kqueue could exit before the selfpipe's callback had finished. we now have to two ways to get attention on a (for now posix) demuxer: selfpiper + demux_quitter. quitter will come in handy for other demuxer whose quit boundary conditions are a little hazy. Revision 1.22 2006/04/25 10:38:42 rfistman added solaris libs (-lrt for semaphores, -lnsl, -lsocket) missing from some pkgs and added -lnsl -lsocket to flx_pkgconfig bootstrap, added missing namespace to evtport demuxer Revision 1.21 2006/04/23 06:09:15 rfistman fixed bug in async win32 ReadFile/WriteFile code - wasn't handling ERROR_IO_PENDING properly. Was also not opening files with FILE_FLAG_OVERLAPPED added actual io to windows named pipe example Revision 1.20 2006/04/22 07:43:37 rfistman taking down kqueue event source was blocking on osx 10.4. fixed now, but not perfect yet. similar problems exist with other sources, giving undefined behaviour (I believe) on takedown. watch this space. Revision 1.19 2006/04/20 01:57:17 rfistman factored out self_piper class. will be used in kqueue_demuxer and an eventual epoll demuxer. Revision 1.18 2006/04/08 08:18:34 rfistman fixed up flx_listener, working on portable regression test for tcp bug still at large in nocygwin flx_accept - hope it's portable! Revision 1.17 2006/04/08 03:53:44 skaller Detect vsnprintf.. some Windows don't provide it even though MSDN says it is available. The MS variation when available doesn't work correctly anyhow. Revision 1.16 2006/03/07 04:22:36 skaller Termination in presence of spawned pthreads should now be working, using new ts_locker class. Exception handling on per thread basis not implemented yet. Revision 1.15 2006/03/06 15:08:08 skaller Cygwin and MinGW builds. Revision 1.14 2006/03/06 13:16:16 skaller Fix library builds so exports and imports are properly distinguished on a library by library basis. Revision 1.13 2006/03/06 01:29:29 skaller spawn_pthread: init works Revision 1.12 2006/03/04 09:50:42 rfistman added "suck it and see" for kqueues. Revision 1.11 2006/03/04 04:26:41 rfistman making demux configure more "suck it and see". temporarily disabled kqueues whilst fixing osx 10.2 build. Revision 1.10 2006/03/03 04:51:24 rfistman removed the following line from the kqueues code (was breaking the mac build) don't know what it means, but there are no threads in the kqueue code.: using namespace flx::pthread; Revision 1.9 2006/03/02 17:52:51 skaller Fix flx_pkgconfig to handle linker switches Revision 1.8 2006/03/02 02:41:39 skaller Fixes for Win32/MSVC++ build. Revision 1.7 2006/03/01 14:52:28 skaller Cygwin needed one extra lib dependency. Revision 1.6 2006/02/28 02:07:13 skaller Refactor demux into demux + pthread. Revision 1.5 2006/02/26 06:39:36 skaller Fix flx_pkgconfig to conform to new spec. Revision 1.4 2006/02/25 20:38:12 skaller Upgrade flx_pkgconfig Revision 1.3 2006/02/23 19:33:01 skaller More fiddling with build system Revision 1.2 2006/02/22 19:00:04 rfistman oops, was passing ULONG as ULONG_PTR. Thanks to 64bit compiler for poinnting this out. Revision 1.1 2006/02/22 17:36:48 skaller Rename some files.. more coming. Make RTL modules full .paks Revision 1.54 2006/02/21 05:46:45 skaller Fix wrong link flag tag. Revision 1.53 2006/02/20 09:21:24 skaller Mingw/nocygwin support Revision 1.52 2006/02/20 02:16:44 rfistman removed recursive locks from the threadsafe select demuxer and from the code in general. Revision 1.51 2006/02/19 16:04:57 skaller Win32 build changes..seems to work now Revision 1.50 2006/02/19 14:02:51 skaller Windows sys libs with right switches Revision 1.49 2006/02/18 16:30:48 skaller More work on new package manager. Revision 1.48 2006/02/17 09:38:31 skaller conditionalised recursive mutex (to be removed) fixed windows lib_requires (added lib prefix) Revision 1.47 2006/02/16 23:39:11 rfistman fixed cygwin hang in posix tests. cygwin wakes select with an error flag when you shutdown a socket. other impls seem to wake select, but with no error. Revision 1.46 2006/02/16 15:09:45 skaller MSVC++ package manager Revision 1.45 2006/02/16 07:51:40 skaller Replace pkg-config with a Felix program flx_pkgconfig. Make sure to build it in a timely manner, since flx script now depends on it. Revision 1.44 2006/02/15 10:54:08 skaller Build time packaging system. Revision 1.43 2006/02/15 04:10:56 rfistman working on thread safe select demuxer (ts_select_demuxer). that should fix cygwin probs. and as yet other undiscovered problems. Revision 1.42 2006/02/13 05:47:40 rfistman readded recursive flag to (portable) mutexes Revision 1.41 2006/02/12 06:51:30 rfistman added WSAID_CONNECTEX and LPFN_CONNECTEX definitions for ming (nocygwin) target now conditionally define EAGAIN in posix compat layer because ming seems to have it. Revision 1.40 2006/02/11 22:12:30 skaller Allow assigning constructor index to enums. Revision 1.39 2006/02/09 21:05:33 skaller Fixed sdl to use polling. Revision 1.38 2006/02/09 07:53:27 skaller Fix windows semaphores. Revision 1.37 2006/02/07 15:55:06 skaller Added portable semaphores and a monitor class to demux. Revision 1.36 2006/02/06 11:05:18 skaller Timed wait on condition variable. Revision 1.35 2006/02/06 06:50:01 skaller Added pthread_cond_timedwait and pthread_cond_uswait functions to condition variables. The latter is my own invention, it waits for a specific interval in micro-seconds. The later is more efficient on Windows when you want to wait for an interval, since this is the native method. Otherwise you need to first obtain the time of day, do a nasty calculation .. and then the timedwait function will undo that, resulting in two unnecessary and expensive system calls. Revision 1.34 2006/02/04 11:34:36 skaller Portable demux stuff, Win32 version Revision 1.33 2006/02/04 10:35:55 skaller Portable thread sync stuff Revision 1.31 2006/02/02 18:52:35 skaller Reorganise demux a bit Revision 1.30 2006/01/31 04:29:17 rfistman added demux level "sleep until" code. Not quite right yet (posix format of current time in double isn't so good as it's measured from the 70s in microseconds, so macroscopic sleep amounts get swamped by the magnitude of "now"), so felix binding coming soon. Revision 1.29 2006/01/29 07:07:21 skaller fixed visual studio build after demux merge Revision 1.28 2006/01/29 05:58:08 rfistman fixed windows build (ming nocygwin) after merge Revision 1.27 2006/01/29 02:53:16 rfistman fixed missing epoll header for linux.:w Revision 1.26 2006/01/29 02:17:29 rfistman using latest demux, added epoll to flx_run (for linux). bugs fixed. Revision 1.25 2006/01/26 10:04:28 rfistman fixed failure to wake fthread after connects that finish immediately. this fast connect only ever shows up on solaris, so full marks to them. Revision 1.24 2006/01/21 23:45:10 rfistman fixed potential leak and warning in posix_timer_queue constructor Revision 1.23 2006/01/16 01:25:43 rfistman factored faio posix accept and connect back demux as control blocks removed pthread cancel from code - implicit cancel points are no longer used. instead the threads are convinced to return from their mains via specially formatted inputs. Revision 1.22 2006/01/13 05:16:50 rfistman made worker_fifo portable Revision 1.21 2006/01/11 01:16:32 rfistman added win_timer_queue to demux Revision 1.20 2006/01/09 16:32:03 skaller Integrate SDL tests, provide initial SDL event demux. #@h=tangler('tmp/flx_demux_config.hpp') #@select(h) #// This is a fake flx_demux_config.h to be used at config time, before #// the rtl proper exists. It contains just enough info to compile #// a few of the demuxers. ##define DEMUX_EXTERN
Start cpp section to rtl/flx_demux_config.hpp[1 /1 ]
     1: #line 288 "./lpsrc/flx_demux.pak"
     2: #ifndef __FLX_DEMUX_CONFIG_GUARD__
     3: #define __FLX_DEMUX_CONFIG_GUARD__
     4: #include "flx_rtl_config.hpp"
     5: #ifdef BUILD_DEMUX
     6: #define DEMUX_EXTERN FLX_EXPORT
     7: #else
     8: #define DEMUX_EXTERN FLX_IMPORT
     9: #endif
    10: #endif
    11: 
End cpp section to rtl/flx_demux_config.hpp[1]
Start cpp section to demux/demux_demuxer.hpp[1 /1 ]
     1: #line 300 "./lpsrc/flx_demux.pak"
     2: #ifndef __DEMUXER__
     3: #define __DEMUXER__
     4: #include <flx_demux_config.hpp>
     5: 
     6: namespace flx { namespace demux {
     7: 
     8: typedef struct {
     9:   char*   buffer;           // set on input
    10:   long    buffer_size;        // set on input
    11:   long    bytes_written;        // set on input and output
    12: 
    13:   bool    finished() { return bytes_written == buffer_size; }
    14: }sel_param;
    15: 
    16: // rename ...
    17: // read/write flags - they're no longer mutually exclusive
    18: enum { PDEMUX_READ = 1, PDEMUX_WRITE = 2 };
    19: 
    20: // base class/hook for implementing thread safe multithreaded demux quit
    21: // not that useful for single threaded implementations.
    22: class DEMUX_EXTERN demux_quit_flag
    23: {
    24: public:
    25:   virtual void signal_true() = 0; // = signal finish
    26:   virtual ~demux_quit_flag() {}
    27: };
    28: 
    29: // ********************************************************
    30: /// Demux base.
    31: // ********************************************************
    32: class DEMUX_EXTERN demuxer {
    33: protected:
    34:   // wait for outstanding events. may return before given events, so
    35:   // check your conditions. I've turned of all the timeouts that cause
    36:   // this, but don't rely on it!
    37:   // FACTOR. Give poll a greedy interface
    38:   virtual void  get_evts(bool poll) = 0;
    39: 
    40:   // for clean async takedown. contents guaranteed to be valid until
    41:   // quit_flag->signal_true is called
    42:   demux_quit_flag* quit_flag;
    43: public:
    44:   demuxer() : quit_flag(0) {}
    45:   virtual ~demuxer() {}
    46: 
    47:   void wait() { get_evts(false); }
    48:   void poll() { get_evts(true); }
    49: 
    50:   // ask users of demuxer to exit. not guarded. be sure to either set & get
    51:   // this flag from only one thread (with a wait/wakeup callback - see
    52:   // self_piper) or by using a memory barrier.
    53:   virtual demux_quit_flag* get_quit_flag() { return quit_flag; }
    54:   virtual void set_quit_flag(demux_quit_flag* f) { quit_flag = f; }
    55: };
    56: 
    57: // base class for callback from demuxer. useful when used in conjuction
    58: // with the self piper for implementing threadsafe demuxer quit and
    59: // guaranteeing responsiveness to new sockets.
    60: // run in the same thread that called d->wait/poll.
    61: class DEMUX_EXTERN demux_callback {
    62: public:
    63:   virtual void callback(demuxer* d) = 0;
    64:   virtual ~demux_callback() {}
    65: };
    66: 
    67: }} // namespace demux, flx
    68: #endif  /* __DEMUXER__ */
    69: 
End cpp section to demux/demux_demuxer.hpp[1]
Start cpp section to demux/demux_epoll_demuxer.hpp[1 /1 ]
     1: #line 370 "./lpsrc/flx_demux.pak"
     2: #ifndef __EPOLL_DEMUXER__
     3: #define __EPOLL_DEMUXER__
     4: 
     5: #include <flx_demux_config.hpp>
     6: #include "demux_posix_demuxer.hpp"
     7: 
     8: namespace flx { namespace demux {
     9: // epoll allows only one event per socket - it does not differentiate
    10: // on the awaited operation (read/write), however it does let you wait
    11: // on any combination (I think)
    12: 
    13: // ********************************************************
    14: /// epoll based demuxer
    15: // ********************************************************
    16: 
    17: class DEMUX_EXTERN epoll_demuxer : public posix_demuxer {
    18:   int   epoll_fd;
    19: 
    20:   // be careful of this - don't let it create race conditions
    21:   // should probably only be called by wait = in one thread only (check)
    22:   // this removes ALL outstanding events for s.
    23:   void  remove_wakeup(int s);
    24: 
    25:   virtual void  get_evts(bool poll);
    26: public:
    27:   epoll_demuxer();
    28:   virtual ~epoll_demuxer();
    29: 
    30:   virtual int   add_socket_wakeup(socket_wakeup* sv, int flags);
    31: };
    32: 
    33: }} // namespace demux, flx
    34: #endif
    35: 
End cpp section to demux/demux_epoll_demuxer.hpp[1]
Start cpp section to demux/demux_evtport_demuxer.hpp[1 /1 ]
     1: #line 406 "./lpsrc/flx_demux.pak"
     2: #ifndef __EVTPORT_DEMUXER__
     3: #define __EVTPORT_DEMUXER__
     4: 
     5: // driver for solaris 10 event port notifications
     6: 
     7: #include "demux_posix_demuxer.hpp"
     8: 
     9: namespace flx { namespace demux {
    10: 
    11: // Event ports are oneshot by default (I don't know if you can change that).
    12: // Events are tracked only by fd and not fd*event, so you cannot add
    13: // separate wakeups for read and write with the same fd and hope for it to
    14: // work as the later one will overwrite the earlier, fodder for race
    15: // conditions. This impl satisfies 1-1 wakeup to request ratio.
    16: 
    17: // I don't know if evtports can be waited upon by other evtports
    18: 
    19: // OBS.
    20: // after removing the threads from the demuxers/event sources
    21: // how are the two half demuxers supposed to work? They used to
    22: // have three threads and now they have one. How can two waits be
    23: // done in one thread? I could add one half_demuxer's evtport to
    24: // the other's and wait on that. Would that work? Otherwise I'll
    25: // have to start a thread, which screws things up a bit. Could do
    26: // that and communicate back to single thread via a waitable queue.
    27: // could have three half-demuxers, add them both to third and call
    28: // their wait functions depending on the outer's wait result.
    29: 
    30: class DEMUX_EXTERN evtport_demuxer : public posix_demuxer {
    31:     int     evtport;
    32: 
    33:   // I think evtports only track socket the socket and not
    34:   // socket*operation, so there's only one remove
    35:   void remove_wakeup(int s);
    36: 
    37:     virtual void  get_evts(bool poll);
    38: public:
    39:   evtport_demuxer();
    40:   virtual ~evtport_demuxer();
    41: 
    42:   virtual int   add_socket_wakeup(socket_wakeup* sv, int flags);
    43: };
    44: 
    45: }} // namespace demux, flx
    46: #endif
    47: 
End cpp section to demux/demux_evtport_demuxer.hpp[1]
Start cpp section to demux/demux_iocp_demuxer.hpp[1 /1 ]
     1: #line 454 "./lpsrc/flx_demux.pak"
     2: #ifndef __IOCP_DEMUXER__
     3: #define __IOCP_DEMUXER__
     4: 
     5: #include <flx_demux_config.hpp>
     6: //#include <Windows.h>
     7: // be specific - flx_rtl_config.h now jigs it so that windows.h  does not
     8: // include winsock version 1 headers by default. this was making the order
     9: // of inclusion of windows.h and winsock2.h significant with cl.exe.
    10: #include <WinSock2.h>
    11: 
    12: #include "demux_demuxer.hpp"
    13: #include "pthread_sleep_queue.hpp"
    14: 
    15: 
    16: namespace flx { namespace demux {
    17: 
    18: // not here? returns INVALID_SOCKET on failure.
    19: // if *io_port == 0, then a port is chosen and returned in *io_port
    20: SOCKET DEMUX_EXTERN create_listener_socket(int* io_port, int backlog);
    21: // these two probably not used. move to wsockety.h
    22: SOCKET DEMUX_EXTERN nice_accept(SOCKET listener);
    23: SOCKET DEMUX_EXTERN nice_connect(const char* addr, int port);
    24: int DEMUX_EXTERN set_tcp_nodelay(int s, int disable_nagle);
    25: 
    26: // ********************************************************
    27: /// make sure you instantion ONE (1) of these before using winsock
    28: // ********************************************************
    29: class DEMUX_EXTERN winsock_initer
    30: {
    31: public:
    32:   winsock_initer();
    33:   ~winsock_initer();
    34: };
    35: 
    36: // ********************************************************
    37: /// iocp_wakeup base class for users of iocp_demuxer
    38: /// becoming an overlapped call control block
    39: // ********************************************************
    40: class DEMUX_EXTERN iocp_wakeup {
    41: protected:            // folks need to use these in win 32 calls
    42:   OVERLAPPED  ol;
    43:   // store wakeup error here?
    44:   // I didn't want this to be felixy, useful though.
    45:   void clear_overlapped();  // zero the OVERLAPPED structure
    46: public:
    47:   // 2 possibilities for piggybacking data. who could ask for more?
    48:   // udat = per iocp association, olp = per overlapped function call.
    49:   // why don't I need this in the posix version?
    50:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    51:     LPOVERLAPPED olp, int err) = 0;
    52: 
    53:   // start overlapped async operation. returns true if it finished
    54:   // immediately. in this case there will be no iocp_finished wakeup.
    55:   // assumes all args ready for call.
    56:   virtual bool start_overlapped() = 0;
    57: 
    58:   // retrieves this pointer from OVERLAPPED pointer
    59:   static iocp_wakeup* from_overlapped(LPOVERLAPPED olp);
    60: };
    61: 
    62: // ********************************************************
    63: // ********************************************************
    64: class DEMUX_EXTERN iocp_demuxer : public demuxer {
    65:   HANDLE    iocp;     // the io completion queue
    66: 
    67:   void get_evts(bool poll);
    68: public:
    69:   iocp_demuxer();
    70:   virtual ~iocp_demuxer();
    71: 
    72:   // udat is the per IOCP object user cookie & the overlapped pointer
    73:   // is the per overlapped operation cookie (sort of), so in the case
    74:   // of acceptex, udat is set when the listener is associated with the
    75:   // iocp and is passed to the subsequent acceptex iocp wakeups.
    76:   // probably won't be used very often
    77:   // the OVERLAPPED retrieved from the iocp is assumed to be part of
    78:   // an iocp_wakeup - beware! returns 0 on success, -1 on failure.
    79:   int associate_with_iocp(HANDLE obj, ULONG_PTR udat);
    80: 
    81: };
    82: 
    83: }} // namespace demux, flx
    84: #endif
    85: 
End cpp section to demux/demux_iocp_demuxer.hpp[1]
Start cpp section to demux/demux_kqueue_demuxer.hpp[1 /1 ]
     1: #line 540 "./lpsrc/flx_demux.pak"
     2: #ifndef __KQUEUE_DEMUXER__
     3: #define __KQUEUE_DEMUXER__
     4: 
     5: #include "demux_posix_demuxer.hpp"
     6: 
     7: namespace flx { namespace demux {
     8: 
     9: // ********************************************************
    10: /// kqueue demuxer for osx'n'BSD and at least 1 linux
    11: // ********************************************************
    12: class DEMUX_EXTERN kqueue_demuxer : public posix_demuxer {
    13:   int   kq;
    14: protected:
    15:   // this could just be passed the socket_wakeup, if it stored
    16:   // the flags. Those flags are also set, though, which would
    17:   // create a race condition. In and out flags?
    18: 
    19:   int add_kqueue_filter(socket_wakeup* sv, short filter);
    20:   int remove_kqueue_filter(int s, short filter);
    21: 
    22:   int remove_socket_wakeup(int s, int flags);
    23:   void get_evts(bool poll);
    24: public:
    25:   kqueue_demuxer();
    26:   virtual ~kqueue_demuxer();
    27: 
    28:   virtual int add_socket_wakeup(socket_wakeup* sv, int flags);
    29: };
    30: 
    31: }} // namespace demux, flx
    32: #endif
    33: 
End cpp section to demux/demux_kqueue_demuxer.hpp[1]
Start cpp section to demux/demux_pfileio.hpp[1 /1 ]
     1: #line 574 "./lpsrc/flx_demux.pak"
     2: #ifndef __PFILEIO__
     3: #define __PFILEIO__
     4: #include <flx_demux_config.hpp>
     5: 
     6: #include "demux_demuxer.hpp"
     7: #include "pthread_sleep_queue.hpp"
     8: #include "pthread_mutex.hpp"
     9: // #include <sys/types.h> // off_t (don't have flx iface to this yet)
    10:               // can just add new constructor
    11: #include "pthread_work_fifo.hpp"
    12: namespace flx { namespace demux {
    13: 
    14: // ********************************************************
    15: /// like another event source. this is basically a wrapped pread, pwrite
    16: /// should probably be derived from posix_wakeup or something like that.
    17: /// or have the same signature. abstract - users overload "finished
    18: // ********************************************************
    19: class DEMUX_EXTERN fileio_request : public flx::pthread::worker_task
    20: {
    21:   long    offset;   // make this a proper offset (64bit)
    22:   // off_t    offset; // in: offset, for use with pread, pwrite
    23:   int     fd;     // in: fd in question
    24:   bool    read_flag;  // in: read else write
    25: 
    26:   int     err;    // out:
    27: public:
    28:   // public so it can be got in felix
    29:   sel_param pb;   // in & out: what you want, what you get (64bit len?)
    30: 
    31:   virtual ~fileio_request(); // c++ should do this automatically
    32:   fileio_request();       // flx linkage
    33:   fileio_request(int f, char* buf, long len, long off, bool rd);
    34: 
    35:   virtual void doit();      // sync
    36: };
    37: 
    38: // ********************************************************
    39: // could do separate threads for in & out. or implement some form of cache.
    40: // ********************************************************
    41: class DEMUX_EXTERN pasync_fileio : public flx::pthread::worker_fifo
    42: {
    43: public:
    44:   pasync_fileio(int n,int m) : worker_fifo(n,m) {}
    45:   // compatibility only. don't need this class anymore.
    46:   void add_fileio_request(fileio_request* req) { add_worker_task(req); }
    47: };
    48: 
    49: }} // namespace demux, flx
    50: #endif  // __PFILEIO__
    51: 
End cpp section to demux/demux_pfileio.hpp[1]
Start cpp section to demux/demux_posix_demuxer.hpp[1 /1 ]
     1: #line 626 "./lpsrc/flx_demux.pak"
     2: #ifndef __POSIX_DEMUXER__
     3: #define __POSIX_DEMUXER__
     4: 
     5: // base classes for posix style demuxers
     6: 
     7: #include "demux_demuxer.hpp"
     8: 
     9: namespace flx { namespace demux {
    10: class DEMUX_EXTERN posix_demuxer;            // fwd decl
    11: 
    12: // abc
    13: class DEMUX_EXTERN posix_wakeup {
    14: public:
    15:   virtual ~posix_wakeup() {}
    16: 
    17:   // when called, the wakeup has finished and been removed.
    18:   virtual void wakeup(posix_demuxer& demux) = 0;
    19: };
    20: 
    21: class DEMUX_EXTERN socket_wakeup : public posix_wakeup {
    22: public:
    23:   int   s;                // the non blocking socket
    24:   int   wakeup_flags;         // set on wakeup, r/w or both
    25: 
    26:   socket_wakeup() : s(-1) {}
    27: };
    28: 
    29: class DEMUX_EXTERN posix_demuxer : public demuxer {
    30: protected:
    31:   void async_quit(); // useful for requesting wait thread quit in
    32:                      // thread safe demuxer destructors. doesn't throw.
    33: 
    34: public:
    35:   virtual ~posix_demuxer();
    36: 
    37:   // posix style sockets. for reading and writing (but not both at once
    38:   // for the same socket_wakeup) you are guaranteed to receive only one
    39:   // wakeup per call to this function when you call wait.
    40:   // returns -1 if no wakeup is coming and zero if one is.
    41:   // For simultaneous reading and writing you may get two wakeups,
    42:   // that is, it may violate the "one shot" rule. Ignoring for now,
    43:   // as it's not a common use. This makes it undefined behaviour.
    44:   // wakeup is owned by the demuxer until its wakeup is called,
    45:   // so treat it with kid gloves, i.e. don't mess with it.
    46:   virtual int   add_socket_wakeup(socket_wakeup* sv, int flags) = 0;
    47: 
    48:   // to be called when we can read & write without blocking
    49:   // return true if connection closed, update pb
    50:   // sort of a strange place to have this..., more a socket wakeup
    51:   // thing, even if static
    52:   static bool   socket_recv(int s, sel_param* pb);
    53:   static bool   socket_send(int s, sel_param* pb);
    54: };
    55: 
    56: // some handy control blocks for common non-blocking socket operations
    57: // note that they "fortuitously" both have start methods. hmm.
    58: // a socket io one could be handy here.
    59: 
    60: // this one's restartable (makes sense for listener sockets)
    61: class DEMUX_EXTERN accept_control_block : public socket_wakeup {
    62: public:
    63:   int   accepted;   // accepted socket (out)
    64:   int   socket_err;   // the error, if acceptee == -1, else 0 (out)
    65: 
    66:   accept_control_block() : accepted(-1), socket_err(0) {}
    67: 
    68:   virtual int start(posix_demuxer& demux);
    69:   virtual void wakeup(posix_demuxer& demux);
    70: };
    71: 
    72: class DEMUX_EXTERN connect_control_block : public socket_wakeup {
    73: public:
    74:   int     socket_err;   // outgoing error (on start or wake)
    75:   // this should probably be a sockaddr type
    76:   const char* addy;     // addr (dotted quad) (in)
    77:   int     p;        // port (in)
    78: 
    79:   connect_control_block() : socket_err(0) {}
    80: 
    81:   virtual int start(posix_demuxer& demux);
    82:   virtual void wakeup(posix_demuxer& demux);
    83: 
    84:   // oops, can't check for s != -1 as it's always there.
    85:   // was always "finished" and so I started io, losing the first wakeup
    86:   // on epoll evtsrc. Is this right, or should it be != EINPROGRESS?
    87:   // keep in sync with iocp version. give socket_err initial definition
    88:   // that works with this?
    89:   bool finished() { return ( 0 == socket_err); }
    90: };
    91: 
    92: }} // namespace demux, flx
    93: #endif
    94: 
End cpp section to demux/demux_posix_demuxer.hpp[1]
Start cpp section to demux/demux_select_demuxer.hpp[1 /1 ]
     1: #line 721 "./lpsrc/flx_demux.pak"
     2: #ifndef __SELECT_DEMUXER__
     3: #define __SELECT_DEMUXER__
     4: 
     5: #include "demux_posix_demuxer.hpp"
     6: #include <sys/types.h>    // for sys/select.h on osx
     7: #include <sys/select.h>   // for fd_set
     8: #include <sys/time.h>     // GUSI WTF?
     9: #include <unistd.h>       // for bsd
    10: 
    11: // Unlike the other demuxers, this one is NOT thread safe, so wait and
    12: // add socket wakeup must only be called from the same thread.
    13: // if you're looking for the thread safe version, try ts_select_demuxer
    14: 
    15: namespace flx { namespace demux {
    16: 
    17: class DEMUX_EXTERN select_demuxer : public posix_demuxer {
    18:   void  remove_fd(int s);
    19: 
    20:   // thanks Beej!
    21:   fd_set      master_read_set;    // fd watched for reading
    22:   fd_set      master_write_set;   // for writing
    23:   fd_set      master_except_set;    // for exceptions
    24: 
    25:   // read sveglias - note we only have one set, so currently this demuxer
    26:   // cannot have separate wakeups for the same file descriptor. this
    27:   // fits in fine with the "undefined" nature of doing that.
    28:   socket_wakeup*  svs[FD_SETSIZE];    // read sveglias
    29:   //socket_wakeup*  write_svs[FD_SETSIZE];  // write wakeups
    30: 
    31:   int       fdmax;          // high watermark for select
    32: 
    33: protected:
    34:   virtual void  get_evts(bool poll);
    35: 
    36: public:
    37:   // get_evts broken into pieces for thread safe implementations
    38:   void copy_sets(fd_set& rset, fd_set& wset, fd_set& exset);
    39:   // returns true if process_sets should be called.
    40:   bool select(fd_set& rset, fd_set& wset, fd_set& exset, bool poll);
    41:   // these could be consts
    42:   void process_sets(fd_set& rset, fd_set& wset, fd_set& exset);
    43: 
    44:   select_demuxer();
    45: 
    46:   virtual int   add_socket_wakeup(socket_wakeup* sv, int flags);
    47: };
    48: }} // namespace demux, flx
    49: #endif
    50: 
End cpp section to demux/demux_select_demuxer.hpp[1]
Start cpp section to demux/demux_ts_select_demuxer.hpp[1 /1 ]
     1: #line 772 "./lpsrc/flx_demux.pak"
     2: #ifndef __TS_SELECT_DEMUXER__
     3: #define __TS_SELECT_DEMUXER__
     4: 
     5: #include "demux_select_demuxer.hpp"
     6: #include "demux_self_piper.hpp"
     7: #include "pthread_mutex.hpp"
     8: 
     9: namespace flx { namespace demux {
    10: 
    11: // thread safe version of select demuxer
    12: 
    13: class DEMUX_EXTERN ts_select_demuxer : public posix_demuxer {
    14:   // lock
    15:   flx::pthread::flx_mutex_t      ham_fist;
    16:   // protects this little fella here.
    17:   select_demuxer  demux;
    18: 
    19:   // self pipe trick for waking waiting thread when we like.
    20:   // for demuxer responsiveness.
    21:   self_piper sp;
    22: protected:
    23:   virtual void    get_evts(bool poll);
    24: public:
    25:   ts_select_demuxer();
    26:   ~ts_select_demuxer();
    27: 
    28:   virtual int   add_socket_wakeup(socket_wakeup* sv, int flags);
    29: 
    30:   // oops! need to correctly get/set the quit flag
    31:   virtual demux_quit_flag* get_quit_flag() { return demux.get_quit_flag(); }
    32:   virtual void set_quit_flag(demux_quit_flag* f) { demux.set_quit_flag(f); }
    33: };
    34: }} // namespace demux, flx
    35: 
    36: #endif
End cpp section to demux/demux_ts_select_demuxer.hpp[1]
Start cpp section to demux/demux_timer_queue.hpp[1 /1 ]
     1: #line 809 "./lpsrc/flx_demux.pak"
     2: #ifndef __TIMER_QUEUE__
     3: #define __TIMER_QUEUE__
     4: 
     5: #include <flx_demux_config.hpp>
     6: 
     7: namespace flx { namespace demux {
     8: 
     9: // trying to factor out code to share between pc & posix versions
    10: 
    11: // class sleep_task : public worker_task
    12: // may not need time in here - just the wakeup - something I surely have
    13: // somewhere else.
    14: class DEMUX_EXTERN sleep_task
    15: {
    16: public:
    17:     virtual ~sleep_task() {}
    18: 
    19:     virtual void fire() = 0;
    20: };
    21: 
    22: class DEMUX_EXTERN timer_queue
    23: {
    24: public:
    25:     virtual ~timer_queue() {}
    26: 
    27:     virtual void add_sleep_request(sleep_task* st, double delta) = 0;
    28:     virtual void add_abs_sleep_request(sleep_task* st, double when) = 0;
    29: 
    30:     // bad design - this is actually implemented in the descendent classes,
    31:     // which limits the number of such classes probably to one.
    32:     static void get_time(double& t);        // in seconds from some ref pt
    33: };
    34: 
    35: }} // namespace demux, flx
    36: 
    37: #endif
    38: 
End cpp section to demux/demux_timer_queue.hpp[1]
Start cpp section to demux/demux_posix_timer_queue.hpp[1 /1 ]
     1: #line 848 "./lpsrc/flx_demux.pak"
     2: #ifndef __POSIX_TIMER_QUEUE__
     3: #define __POSIX_TIMER_QUEUE__
     4: 
     5: #include "pthread_thread.hpp"  // flx_thread_t
     6: #include "pthread_mutex.hpp"  // flx_mutex_t
     7: #include "pthread_condv.hpp"  // flx_condv_t
     8: #include "demux_timer_queue.hpp" // base class
     9: #include <sys/time.h>        // timespecs, gettimeofday
    10: 
    11: namespace flx { namespace demux {
    12: 
    13: // looks like a worker queue, but couldn't quite mash it into one
    14: class DEMUX_EXTERN posix_timer_queue : public timer_queue
    15: {
    16:     flx::pthread::flx_mutex_t lock; // factor to prio queue?
    17:     flx::pthread::flx_condv_t sleep_cond;
    18:     flx::pthread::flx_thread_t sleep_thread;
    19:     void*        opaque_prio_queue;        // less fat
    20: 
    21:     static void* thread_start(void*);    // passed "this"
    22:     bool thread_loop_body();
    23: 
    24: 
    25:     void wakeup_thread();                // we can do this!
    26: 
    27:     void add_sleep_request(sleep_task* st, timespec* abs);
    28: public:
    29:     posix_timer_queue();
    30:     ~posix_timer_queue();
    31: 
    32:     // thread safe.
    33:     virtual void add_sleep_request(sleep_task* st, double delta);
    34: 
    35:     // in seconds, relative to same base as timer::get_time.
    36:     virtual void add_abs_sleep_request(sleep_task* st, double when);
    37: };
    38: 
    39: }}
    40: 
    41: #endif // __POSIX_TIMER_QUEUE__
    42: 
End cpp section to demux/demux_posix_timer_queue.hpp[1]
Start cpp section to demux/demux_posix_timer_queue.cpp[1 /1 ]
     1: #line 891 "./lpsrc/flx_demux.pak"
     2: #include "demux_posix_timer_queue.hpp"
     3: 
     4: // a prio queue that executes tasks in a given order
     5: // factor out prio_queue? could be like queue.
     6: 
     7: // try to make work like the worker thread thing, fix it do so?.
     8: // remove time from sleep task...
     9: 
    10: #include <queue>    // stl seems to have a prio_queue
    11: #include <sys/time.h> // gettimeofday for calculating "now"
    12: 
    13: //using namespace flx::pthread;
    14: namespace flx { namespace demux {
    15: 
    16: #define MIL 1000000        // one million
    17: #define BIL (MIL*1000)    // one billion (metric)
    18: 
    19: using namespace std;
    20: 
    21: // it could happen!
    22: // factor
    23: class future_evt
    24: {
    25: public:
    26:     timespec    when;
    27:     sleep_task*    task;
    28: 
    29:     // ignore the direction, just trying to sort with smallest first
    30:     bool operator<(const future_evt& rhs) const
    31:     {
    32:         if(when.tv_sec != rhs.when.tv_sec)    // precedence to more significant
    33:             return when.tv_sec > rhs.when.tv_sec;
    34:         else                                // else check the less significant
    35:             return when.tv_nsec > rhs.when.tv_nsec;
    36:     }
    37: };
    38: 
    39: typedef priority_queue<future_evt> void_prio_queue;
    40: #define PRIOQ ((void_prio_queue*)opaque_prio_queue)
    41: 
    42: posix_timer_queue::posix_timer_queue()
    43: {
    44:     opaque_prio_queue = new void_prio_queue;    // a.k.a. PRIOQ
    45:     //fprintf(stderr,"initing timer sleep thread\n");
    46: 
    47:     // NEED'S TO CHECK RETURN VAL AND HANDLE ERROR
    48:     if(sleep_thread.init(thread_start, this))
    49:       fprintf(stderr, "failed to create posix timer queue thread!\n");
    50: }
    51: 
    52: posix_timer_queue::~posix_timer_queue()
    53: {
    54:     // the sleep_thread uses the prioq, so we must explicitly shut it
    55:     // down now, before we delete the prioq. left to its own devices,
    56:     // c++ destructs it at the end of this destructor.
    57: 
    58:     // take down the thread first because it uses all the other stuff.
    59:     // I actually don't need to do anything special to bring the thread
    60:     // down because all pthread_cond_*wait* are cancel aware. Or so they
    61:     // should be. As far as I can tell only the 64bit osx10.4.2 is, so
    62:     // for now the explicit cancel + wakeup followed by explicit
    63:     // cancel test stays.
    64: 
    65:     // fprintf(stderr, "asking timer thread to quit\n");
    66:     add_sleep_request(NULL, 0.0);    // super secret quit thread quit request
    67:     wakeup_thread();                // wakeup, cause to goto a cancel pt
    68: 
    69:     sleep_thread.join();            // will join
    70:     //fprintf(stderr,"about to delete PRIOQ\n");
    71:     delete PRIOQ;
    72: }
    73: 
    74: static void
    75: get_now(timespec* now)
    76: {
    77:     struct timeval tp;
    78: 
    79:     if(gettimeofday(&tp, NULL) == -1)
    80:         perror("gettimeofday");
    81: 
    82:     // (10^6-1)*1000 = 3B9AC618 = max usec -> nsec fits in a 32bit long.
    83:     now->tv_sec = tp.tv_sec;
    84:     now->tv_nsec = tp.tv_usec*1000;        // fits!
    85: 
    86:     // fprintf(stderr,"get_now = %li, %li\n", now->tv_sec, now->tv_nsec);
    87: }
    88: 
    89: // LIMIT!
    90: // seconds to microseconds - signed this gives a bit over half an hour
    91: #define SEC2TIMESPEC(ts, t) long    wait_musec = (long)(t*MIL);    \
    92:     timespec    ts = { wait_musec / MIL, (wait_musec % MIL)*1000 }
    93: 
    94: 
    95: // offset delta from "now" and store in "when"
    96: static void
    97: calc_when(timespec* when, double delta)
    98: {
    99: // how to use the posix abstime versions of timed waits? what kind of absolute
   100: // is abstime? pthread_get_expiration_np looks useful, but it too is np.
   101: // abstime is apparently in seconds since the Epoch, UTC.
   102: // To get now there's clock_gettime (not portable) or gettimeofday with
   103: // null timezone.
   104: 
   105:     timespec    now;
   106:     get_now(&now);
   107: 
   108:     // limit!
   109:     // seconds to microseconds - signed this gives a bit over half an hour
   110:     // long    wait_musec = (long)(delta*MIL);
   111:     // timespec    delay = { wait_musec / MIL, (wait_musec % MIL)*1000 };
   112:     SEC2TIMESPEC(delay, delta);
   113: 
   114:     // (10^6-1)*1000 = 3B9AC618 = max usec -> nsec fits in a 32bit long.
   115:     when->tv_sec = now.tv_sec + delay.tv_sec;
   116:     when->tv_nsec = now.tv_nsec + delay.tv_nsec;
   117: 
   118:     if(when->tv_nsec >= BIL)            // overflow of nanoseconds?
   119:     {
   120:         // fprintf(stderr,"OVERFLOW = %li, %li\n", when->tv_sec, when->tv_nsec);
   121:         // x, y < BIL, x + y < 2BIL
   122:         when->tv_sec++;
   123:         when->tv_nsec -= BIL;
   124:         // when->tv_sec += when->tv_nsec/BIL;
   125:         // when->tv_nsec %= BIL;
   126:     }
   127: 
   128:     // fprintf(stderr,"when = %li, %li\n", when->tv_sec, when->tv_nsec);
   129:     // tp contains tv_sec (seconds) & tv_usec (microseconds) both longs.
   130:     // however, if nonposix works everywhere...
   131: }
   132: 
   133: // absolute time
   134: void
   135: posix_timer_queue::add_sleep_request(sleep_task* st, timespec* abs)
   136: {
   137:     future_evt    evt;
   138:     evt.task = st;
   139:     evt.when = *abs;
   140: 
   141:     flx::pthread::flx_mutex_locker_t    locker(lock);
   142: 
   143:     PRIOQ->push(evt);
   144: 
   145:     // we may have inserted at sooner than any other evt, so wake up thread
   146:     // to figure it out (if need be). I seemed to be getting more wakeups
   147:     // with this. Turned off for now. Not sure how that works.
   148:     if(1 || PRIOQ->top().task == st)
   149:     {
   150: //        fprintf(stderr,"WE PUSHED IN - waking thread\n");
   151:         wakeup_thread();
   152:     }
   153: }
   154: 
   155: // note: may not need time to be in sleep_task. could pass time here.
   156: // thread safe
   157: void
   158: posix_timer_queue::add_sleep_request(sleep_task* st, double delta)
   159: {
   160:     // fprintf(stderr,"add_sleep_request: %lf\n", delta);
   161:     timespec    when;
   162:     calc_when(&when, delta);        // calculate when (t a delta)
   163: 
   164:     add_sleep_request(st, &when);
   165: }
   166: 
   167: void
   168: posix_timer_queue::add_abs_sleep_request(sleep_task* st, double when)
   169: {
   170:     // absolute version is closer to the posix implementation
   171:     SEC2TIMESPEC(abs_time, when);
   172:     add_sleep_request(st, &abs_time);
   173: }
   174: 
   175: // cause the timer wait thread to wake up. useful for asking it to
   176: // exit or re-evaluate a changed sleep queue.
   177: void
   178: posix_timer_queue::wakeup_thread()
   179: {
   180:     sleep_cond.signal();
   181: }
   182: 
   183: void*
   184: posix_timer_queue::thread_start(void* udat)
   185: {
   186:     posix_timer_queue*    q = (posix_timer_queue*)udat;
   187:     //fprintf(stderr,"sleeper thread\n");
   188: 
   189:     while(q->thread_loop_body()) ;
   190: 
   191:     return 0;
   192: }
   193: 
   194: bool
   195: posix_timer_queue::thread_loop_body()
   196: {
   197:     // lock on. lock off when waiting on condition
   198:     flx::pthread::flx_mutex_locker_t    locker(lock);
   199: 
   200:     int        res;
   201: 
   202:     // pthread_cond_wait & pthread_cond_timedwait (& np rel version?) are
   203:     // cancellation points. doco notes for timed & untimed waits that the
   204:     // predicate should be rechecked as there can be spurious wakeups.
   205:     // no worries, when we wakeup the lock has been acquired.
   206: 
   207:     while(!PRIOQ->empty())
   208:     {
   209:         future_evt    evt = PRIOQ->top();
   210: 
   211:         // quit request
   212:         if(!evt.task) return false;
   213: 
   214:         future_evt  now;        // "now' has no task, just a dummy.
   215:         get_now(&now.when);
   216: 
   217:         // if(evt < now)        // would prefer <=, eh.
   218:         // < is arse backwards because I don't know how to use the stl
   219:         if(now < evt)        // would prefer <=, eh.
   220:         {
   221:             // fprintf(stderr,"firing of (%li, %li) at (%li, %li)!\n",
   222:             //    evt.when.tv_sec, evt.when.tv_nsec,
   223:             //    now.when.tv_sec, now.when.tv_nsec);
   224:             evt.task->fire();
   225:             PRIOQ->pop();
   226:         }
   227:         else    // we have an event in future, so sleep for that long
   228:         {
   229:             // remember that condition waits are exit points...
   230:             // so I don't need to test - check that.
   231:             // fprintf(stderr,"sleeping from %li, %li until %li, %li\n",
   232:             //    now.when.tv_sec, now.when.tv_nsec,
   233:             //    evt.when.tv_sec, evt.when.tv_nsec);
   234:             (void)sleep_cond.timedwait(&lock, &evt.when);
   235: 
   236:             // if using posix abstime timed wait we make get EINVAL here for
   237:             // abstimes in the past. must handle this.
   238:             //JS: It's handled now, waiting for a time in the past is OK
   239: 
   240:             // fprintf(stderr,"pthread_cond_timedwait woke up! (%i)\n", res);
   241:         }
   242:     }
   243: 
   244:     // if we got here then the queue is empty, so sleep indefinitely
   245:     // that we don't really need the mainloop testcancel because the condition
   246:     // wait functions are cancellation points.
   247:     // fprintf(stderr,"no sleep task, sleeping indefinitely\n");
   248:     sleep_cond.wait(&lock);
   249:     // fprintf(stderr,"pthread_cond_wait woke up! (%i)\n", res);
   250: 
   251:     // lock released here
   252:     return true;                    // keep going
   253: }
   254: 
   255: 
   256: // in seconds from some ref pt
   257: // N.B. declared in base class!
   258: void
   259: timer_queue::get_time(double& t)
   260: {
   261:     timespec    now;
   262:     get_now(&now);        // just calls gettimeofday (msec) and converts
   263:                         // to timespec (sec, nsec). could skip that
   264:                         // and call directly, avoiding conversion
   265:     t = now.tv_sec + (now.tv_nsec*BIL);
   266: }
   267: 
   268: }}
   269: 
End cpp section to demux/demux_posix_timer_queue.cpp[1]
Start cpp section to demux/demux_win_timer_queue.hpp[1 /1 ]
     1: #line 1161 "./lpsrc/flx_demux.pak"
     2: #ifndef __WIN_TIMER_QUEUE__
     3: #define __WIN_TIMER_QUEUE__
     4: 
     5: #include "flx_demux_config.hpp"
     6: #include <Windows.h>
     7: 
     8: #include "demux_timer_queue.hpp"
     9: 
    10: namespace flx { namespace demux {
    11: 
    12: class DEMUX_EXTERN win_timer_queue : public timer_queue
    13: {
    14:   HANDLE    timer_queue;
    15: 
    16:   static VOID CALLBACK timer_callback(PVOID, BOOLEAN);
    17: public:
    18:   win_timer_queue();
    19:   ~win_timer_queue();
    20: 
    21:   virtual void add_sleep_request(sleep_task* st, double delta);
    22:   virtual void add_abs_sleep_request(sleep_task* st, double when);
    23: 
    24: };
    25: 
    26: }}
    27: 
    28: #endif // __SLEEP_TASK__
    29: 
End cpp section to demux/demux_win_timer_queue.hpp[1]
Start cpp section to demux/demux_win_timer_queue.cpp[1 /1 ]
     1: #line 1191 "./lpsrc/flx_demux.pak"
     2: #include "flx_demux_config.hpp"
     3: #include <Windows.h>
     4: #include <assert.h>
     5: 
     6: // simply wrapped windows timer queue. requires windows 5.00, which is
     7: // quite high (xp?) because I couldn't get the waitable timers to work.
     8: // must be careful with this stuff lest it create millions of threads.
     9: #include "demux_win_timer_queue.hpp"
    10: 
    11: #include <stdio.h>
    12: 
    13: namespace flx { namespace demux {
    14: 
    15: #define MIL 1000000    // 1 metric million
    16: 
    17: typedef struct
    18: {
    19:   sleep_task*  st;        // so we can make it fire
    20:   HANDLE    timer;      // we need to delete the timer, so we keep it
    21:   HANDLE    timer_queue;  // AND its queue (no back ptrs, I guess)
    22: } timer_cookie;
    23: 
    24: win_timer_queue::win_timer_queue()
    25: {
    26:   // fprintf(stderr,"win_timer_queue ctor\n");
    27: 
    28:   timer_queue = CreateTimerQueue();
    29:   if(!timer_queue)
    30:   {
    31:     fprintf(stderr, "CreateTimerQueue failed: %i\n", GetLastError());
    32:     throw -1;
    33:   }
    34:   // fprintf(stderr, "created timer queue: %p\n", timer_queue);
    35: }
    36: 
    37: 
    38: win_timer_queue::~win_timer_queue()
    39: {
    40:   // INVALID_HANDLE_VALUE indicates that DeleteTimerQueueEx should wait for
    41:   // all callback functions to complete before returning. One would hope that
    42:   // calling this causes all the timers to go off before their time (what
    43:   // else would the "actually fired" callback flag be for?). The alternative
    44:   // of waiting for some ever distant timer to go off would be too stupid
    45:   // for words. As usual, the msdn glosses over the important details like
    46:   // this one. Anyway, it's easy to test out... No, that flag's always true
    47:   // for timers, and this wait option doesn't work - maybe with other types
    48:   // flags for CreateTimerQueueTimer?
    49:   if(!DeleteTimerQueueEx(timer_queue, INVALID_HANDLE_VALUE))
    50:   {
    51:     fprintf(stderr, "DeleteTimerQueueEx failed: %i\n", GetLastError());
    52:     // whatcha gonna do about it?
    53:   }
    54:   // fprintf(stderr, "finished - did it wait?\n");
    55: }
    56: 
    57: // note: may not need time to be in sleep_task. could pass time here.
    58: // thread safe
    59: void
    60: win_timer_queue::add_sleep_request(sleep_task* st, double delta)
    61: {
    62:   // fprintf(stderr,"add_sleep_request: %lf to %p\n", delta, timer_queue);
    63: 
    64:   timer_cookie*  tc = new timer_cookie;
    65: 
    66:   // copy in the sleep_task and the timer queue
    67:   tc->st = st;
    68:   tc->timer_queue = timer_queue;
    69: 
    70:   // the timer thread may not be the best solution as nothing is stopping
    71:   // anyone from performing long operations with this structure, however
    72:   // in all likelihood, it'll just be felix adding threads back to its queue.
    73:   if(!CreateTimerQueueTimer(
    74:     &tc->timer,          // resulting timer in timer_cookie
    75:     timer_queue,
    76:     //NULL,            // add to default timer queue
    77:     timer_callback,        // should get called in delta seconds
    78:     tc,             // timer cookie is user data
    79:     (DWORD)(delta*1000),    // millisecond timer
    80:     0,              // zero period => signal once
    81:     WT_EXECUTEINTIMERTHREAD))  // NB: for short tasks (will this do?)
    82:   {
    83:     fprintf(stderr, "CreateTimerQueueTimer failed: %i\n", GetLastError());
    84:     delete tc;          // at least try not to leak
    85:     return;
    86:   }
    87: }
    88: 
    89: // this is a c callback - all the c++ code should probably be wrapped
    90: // in a try/catch. timer_or_wait_fired is always true for timers.
    91: VOID CALLBACK
    92: win_timer_queue::timer_callback(PVOID udat, BOOLEAN timer_or_wait_fired)
    93: {
    94:   timer_cookie*  tc = (timer_cookie*)udat;
    95: 
    96:   // fprintf(stderr, "timer queue callback fired: %p, %i\n",
    97:   //  tc, timer_or_wait_fired);
    98: 
    99:   if(!tc)
   100:   {
   101:     // Nothing that we can do in this situation.
   102:     fprintf(stderr, "WHOA - NULL queue cookie! (fired: %i)\n",
   103:       timer_or_wait_fired);
   104:     return;            // outta here
   105:   }
   106: 
   107:   // NULL means delete the thing now, INVALID_HANDLE_VALUE means wait until
   108:   // callback finishes. We're in the callback, so we can't do that (=deadlock
   109:   // of the timer thread, which isn't good). We're all adults here, the timer
   110:   // has expired, we know what we're doing, so lets just delete it.
   111:   tc->st->fire();
   112: 
   113:   // on my box this returns ERROR_IO_PENDING, on others it doesn't
   114:   // msdn says this should be ok, but I'm not so sure.
   115:   if(!DeleteTimerQueueTimer(tc->timer_queue, tc->timer, NULL))
   116:   {
   117:     int  err = GetLastError();
   118: 
   119:     if( ERROR_IO_PENDING != err)
   120:     {
   121:       fprintf(stderr, "DeleteTimerQueueTimer of %p failed: %i\n",
   122:         tc->timer, err);
   123:     }
   124:     else
   125:     {
   126:       // I'm not so sure, see if it leaks.
   127:       fprintf(stderr, "DeleteTimerQueueTimer = ERROR_IO_PENDING\n");
   128:       fprintf(stderr, "Apparently this is ok...\n");
   129:     }
   130:   }
   131:   delete tc;
   132: 
   133:   // fprintf(stderr, "leaving timer callback\n");
   134: }
   135: 
   136: // in seconds from some ref pt (UTC for this fn)
   137: // N.B. declared in base class!
   138: void
   139: timer_queue::get_time(double& t)
   140: {
   141:   SYSTEMTIME  sysnow;
   142:   GetSystemTime(&sysnow);
   143:   // now convert to seconds
   144:   // via FILETIME?
   145: 
   146:   // kinda sucks, but is the msdn recommended way of doing calculations
   147:   // on dates.
   148:   FILETIME  fnow;
   149:   if(!SystemTimeToFileTime(&sysnow, &fnow))
   150:   {
   151:     fprintf(stderr, "SystemTimeToFileTime failed: %i\n", GetLastError());
   152:     t = 0;
   153:     return;
   154:   }
   155: 
   156:   ULARGE_INTEGER now;  // so we can do some maths
   157: 
   158:   assert(sizeof(now) == sizeof(fnow));
   159:   memcpy(&now, &fnow, sizeof(now));
   160: 
   161:   // and now we have a big integer containing an offset jan 1, 1601 (UTC)
   162:   // 100 nanosecond intervals
   163:   t = now.QuadPart*MIL*10;  // *10 to microseconds, *MIL to seconds
   164: }
   165: 
   166: void
   167: win_timer_queue::add_abs_sleep_request(sleep_task* st, double when)
   168: {
   169:   // win timer queue works with relative offsets, so convert this absolute
   170:   double  now;
   171:   get_time(now);
   172:   double  delta = when-now;
   173:   if(delta < 0.0) delta = 0.0;
   174:   add_sleep_request(st, delta);
   175: }
   176: 
   177: }}
End cpp section to demux/demux_win_timer_queue.cpp[1]
Start cpp section to demux/demux_demuxer.cpp[1 /1 ]
     1: #line 1369 "./lpsrc/flx_demux.pak"
     2: #include "demux_demuxer.hpp"
     3: 
     4: // nothing here atm ..
     5: 
End cpp section to demux/demux_demuxer.cpp[1]
Start cpp section to demux/demux_epoll_demuxer.cpp[1 /1 ]
     1: #line 1375 "./lpsrc/flx_demux.pak"
     2: // epoll interface. does epoll support ordinary files in addition to sockets?
     3: // EPOLLET to make epoll edgetriggered. I guess the default is level triggered.
     4: 
     5: // epoll events are not one shot, in fact they're quite sticky so socket
     6: // filters must be removed manually to guarantee a one-to-one wakeup
     7: // to add_wakeup ratio. note that the oneshot flag is not a solution.
     8: 
     9: // cool! EPOLLONESHOT
    10: // BUGGER! doesn't seem to exist! and doing this doesn't make it so!
    11: // #ifndef EPOLLONESHOT
    12: // #define EPOLLONESHOT (1<<30)
    13: // #endif
    14: 
    15: #include "demux_epoll_demuxer.hpp"
    16: 
    17: #include <sys/epoll.h>  // for epoll_*
    18: #include <stdio.h>    // for perror
    19: #include <unistd.h>   // for close
    20: #include <errno.h>    // EEXIST, errno
    21: 
    22: namespace flx { namespace demux {
    23: 
    24: epoll_demuxer::epoll_demuxer()
    25:   : epoll_fd(-1)
    26: {
    27:   // EPOLLONESHOT is shit, don't use it. Enabling it just means that your
    28:   // wakeups are suppressed and you have to use EPOLL_CTL_MOD instead
    29:   // of EPOLL_CTL_ADD. If it isn't defined then so much the better.
    30: //#ifdef EPOLLONESHOT
    31: //  fprintf(stderr,"WARNING: EPOLLONESHOT AVAILABLE (%x)!!!\n", EPOLLONESHOT);
    32: //#endif
    33: 
    34:   // god knows what the maximum size will be, I'll just say 1 for now
    35:   epoll_fd = epoll_create(1);
    36:   if(-1 == epoll_fd)
    37:   {
    38:     perror("epoll_create");
    39:     throw -1;
    40:   }
    41: }
    42: 
    43: epoll_demuxer::~epoll_demuxer()
    44: {
    45:   async_quit(); // get waiting thread to exit.
    46: 
    47:   if(-1 != epoll_fd)
    48:   {
    49:     if(close(epoll_fd) != 0)
    50:       perror("epoll close");
    51:   }
    52: }
    53: 
    54: int
    55: epoll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    56: {
    57:   int s = sv->s;
    58: 
    59:   struct epoll_event  evt;
    60:   // fprintf(stderr,"add_socket_wakeup: %i (sv=%p, flags=%x)\n",
    61:   //  s, sv, flags);
    62: 
    63:   // EPOLLONESHOT saves us not only a system call to remove epoll evts,
    64:   // which aren't intrinsically one-shot, but having to do it ourselves
    65:   // would have been a pain as epoll doesn't tell you which fd had the event
    66:   // this way we can get away with not knowing & not losing our user cookie
    67:   evt.events = 0;
    68: 
    69:   if(flags & PDEMUX_READ) evt.events |= EPOLLIN;
    70:   if(flags & PDEMUX_WRITE) evt.events |= EPOLLOUT;
    71: 
    72:   // fprintf(stderr, "flags %x -> evt.events %x\n", flags, evt.events);
    73: 
    74:   // We do the remove manually because oneshot in epoll doesn't
    75:   // remove the socket, but rather, disables it.
    76: //#ifdef EPOLLONESHOT
    77: //  evt.events |= EPOLLONESHOT;         // yes!
    78: //#endif
    79:   // I think EPOLLHUP comes when the connection closes (on read?)
    80: // poll's (plain old poll) equivalents to this are ignored for input
    81: // same here?
    82:   // I get EPOLLHUPs for bad async connects whether I ask for them or not.
    83:   evt.events |= (EPOLLHUP | EPOLLERR);    // I think I want this
    84: 
    85:   evt.data.ptr = sv;              // our user data
    86: 
    87:   if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, s, &evt) == -1)
    88:   {
    89:     // EPOLL_CTL_MOD cannot help us do bidirection io on one socket,
    90:     // the mod will overwrite the old user cookie and direction.
    91:     // It seems that only kqueues, select and iocps allow that.
    92:     // Will need a wakeup that can do both, oneshot that indicates
    93:     // the available direction.
    94: // when using oneshot, we're supposed to use EPOLL_CTL_MOD
    95: #if 0
    96:     int err = errno;
    97: 
    98:     if(EEXIST == err)
    99:     {
   100:       // ok - let's try EPOLL_CTL_MOD
   101:       fprintf(stderr, "RETRYING WITH EPOLL_CTL_MOD\n");
   102:       if(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, s, &evt) != -1)
   103:         return 0; // good!
   104:     }
   105: #endif
   106:     perror("epoll_ctl (add)");
   107: 
   108:     return -1;
   109:   }
   110:   return 0;
   111: }
   112: 
   113: // epoll doesn't differentiate on events. I bet I could
   114: // just not pass that event...
   115: void
   116: epoll_demuxer::remove_wakeup(int s)
   117: {
   118:   // EPOLL_CTL_DEL uses no information from the event
   119:   // and so I should be able to pass NULL.
   120:   struct epoll_event evt;
   121:   // evt.events = (read) ? EPOLLIN : EPOLLOUT;
   122: 
   123:   // fprintf(stderr,"removing socket wakeup %i\n", s);
   124: 
   125:   if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, s, &evt) == -1)
   126:   {
   127:     //const char* str = (read) ? "epoll_ctl (remove read)"
   128:     //  : "epoll_ctl (remove write)";
   129:     // perror(str);
   130:     perror("epoll_ctl (remove)");
   131:   }
   132: }
   133: 
   134: void
   135: epoll_demuxer::get_evts(bool poll)
   136: {
   137:   struct epoll_event  evt;
   138: 
   139:   switch(epoll_wait(epoll_fd, &evt, 1, (poll) ? 0 : ~0))
   140:   {
   141:     case -1:    // error
   142:     perror("epoll_wait");
   143:       // fall through
   144:     case 0:     // no events (happens with timeout)
   145:       return;
   146:   }
   147: 
   148:   socket_wakeup* sv = (socket_wakeup*)evt.data.ptr;
   149: 
   150:   // not seeing timeouts as they're filtered by the switching.
   151:   // assuming that sv is good
   152:   // fprintf(stderr,"wakeup (sv=%p, sv->s=%i evt.events=%x)!\n",
   153:   //  sv, sv->s, evt.events);
   154: 
   155:   // accumulate bit field of what we got
   156:   sv->wakeup_flags = 0;
   157: 
   158:   bool  wake = false;
   159: 
   160:   // it might be possible to get both a read & write event...
   161:   // in which case I should take out the else below
   162:   if(evt.events & EPOLLIN)                // I think this is how you do it
   163:   {
   164:     // fprintf(stderr,"EPOLLIN for %p\n", sv);
   165:     sv->wakeup_flags |= PDEMUX_READ;
   166:     wake = true;
   167:   }
   168: 
   169:   if(evt.events & EPOLLOUT)
   170:   {
   171:     //fprintf(stderr,"EPOLLOUT for %p\n", sv);
   172:     sv->wakeup_flags |= PDEMUX_WRITE;
   173:     wake = true;
   174:   }
   175: 
   176:   // Is this for shutdown or closing of the other end?
   177:   // I get it for failed async connects. I don't know if other events cause
   178:   // it. In any case, I don't know whether it should be for read or write,
   179:   // so I just don't say. In any case, it should wake to get error.
   180:   // I seem to get both EPOLLHUP and EPOLLERROR on bad async connect
   181:   // see poll demuxer notes on POLLHUP for further possibly useful info.
   182:   if(evt.events & EPOLLHUP)
   183:   {
   184:     fprintf(stderr, "EPOLLHUP for %p->%i\n", sv, sv->s);
   185:     wake = true;
   186:   }
   187: 
   188:   if(evt.events & EPOLLERR)
   189:   {
   190: // How do I retrieve the error?
   191:     // There's no ambiguity - there's only ever one fd in a given epoll.
   192:     // If oneshot's present then don't need to do anything
   193: // not sure what to do here. if we've enabled/got oneshot the socket
   194: // should already have been removed
   195:     fprintf(stderr,"epoll error, waking: %i (errno?)\n", sv->s);
   196:     // similar story to EPOLLHUP
   197:     wake = true;
   198:   }
   199: 
   200:     if((evt.events & ~(EPOLLERR|EPOLLIN|EPOLLOUT|EPOLLHUP)))
   201:     {
   202:         fprintf(stderr,"unknown events in epoll_demuxer %x\n", evt.events);
   203:     }
   204: 
   205:   // we got something. tell the people.
   206:   // not dependent solely on wakeup_flags - errors need to wake too.
   207:   if(wake)
   208:   {
   209:     // we got something. better call wakeup, must remove to guarantee
   210:     // 1-1 wakeups with add_sockets
   211:     // fprintf(stderr, "no one-shot... remove %i\n", sv->s);
   212:     remove_wakeup(sv->s);
   213:     // fprintf(stderr, "calling wakeup (flags=%x)\n", sv->wakeup_flags);
   214:     sv->wakeup(*this);
   215:   }
   216: }
   217: }}
   218: 
End cpp section to demux/demux_epoll_demuxer.cpp[1]
Start cpp section to demux/demux_evtport_demuxer.cpp[1 /1 ]
     1: #line 1594 "./lpsrc/flx_demux.pak"
     2: // Evtports can get timer wakeups with PORT_SOURCE_TIMER.
     3: // Can also pick up aio notifications with PORT_SOURCE_AIO.
     4: 
     5: // looks like this stuff is only in solaris10, and not SunOS 5.8. Damn.
     6: 
     7: #include "demux_evtport_demuxer.hpp"
     8: 
     9: #include <port.h>
    10: #include <poll.h> // POLLIN/POLLOUT
    11: #include <stdio.h>  // printf
    12: #include <unistd.h> // close
    13: #include <assert.h>
    14: 
    15: namespace flx { namespace demux {
    16: 
    17: // header files for this stuff?
    18: // can use port_send for user defined events, to wake up reap loop
    19: // truss to see what's happening
    20: 
    21: evtport_demuxer::evtport_demuxer()
    22: {
    23:   if((evtport = port_create()) < 0)
    24:   {
    25:     perror("port_create");
    26:     throw -1;
    27:   }
    28: }
    29: 
    30: evtport_demuxer::~evtport_demuxer()
    31: {
    32:   async_quit(); // gets waiting thread to quit, returning afterwards
    33: 
    34:   if(-1 != evtport)
    35:   {
    36:     if(close(evtport) != 0)
    37:       perror("evtport close");
    38:   }
    39: }
    40: 
    41: int
    42: evtport_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    43: {
    44:   if(flags & ~(PDEMUX_READ|PDEMUX_WRITE)) // I can't understand you, lady!
    45:     return -1;
    46: 
    47:   int events = 0; // events are flags so we can accumulate them
    48:   int s = sv->s;
    49: 
    50:   if(flags & PDEMUX_READ) events |= POLLIN;
    51:   if(flags & PDEMUX_WRITE) events |= POLLOUT;
    52: 
    53:   // POLLHUP might make sense only for reads... add conditionally?
    54:   events |= (POLLHUP | POLLERR);
    55: 
    56:   // fprintf(stderr,"add_socket_wakeup: %i, sv: %p (%x)\n", s, sv, flags);
    57: 
    58:   // register for event we are interested in...
    59:   // works for files, sockets, timers...
    60:   // sockets are file descriptors in unix, so src is PORT_SOURCE_FD
    61:   if(port_associate(evtport, PORT_SOURCE_FD, (uintptr_t)s, events, sv) == -1)
    62:   {
    63:     perror("add socket_wakeup/port_associate");
    64:     return -1;
    65:   }
    66: 
    67:   return 0;
    68: }
    69: 
    70: // note that these two functions are exactly the same
    71: // we have to remove after a read or write else we can get multiple
    72: // wakeups - usually with a dud user cookie. the fact that there is
    73: // no differentiation between POLLIN & POLLOUT could be a problem for
    74: // mixed read/write things (rare). note that evt_ports let me associate
    75: // the samething twice. I don't know if this means you have to dissociate
    76: // (disassociate) twice.
    77: void
    78: evtport_demuxer::remove_wakeup(int s)
    79: {
    80:   if(port_dissociate(evtport, PORT_SOURCE_FD, s) == -1)
    81:     perror("reading port_dissociate");
    82: }
    83: 
    84: #define POLLPR(ev) if(e->portev_events & ev) fprintf(stderr,#ev", ")
    85: 
    86: static void
    87: print_port_evt(port_event_t* e)
    88: {
    89:   char* srcstr[PORT_SOURCE_ALERT-PORT_SOURCE_AIO+1]
    90:     = { "ALERT", "TIMER", "USER", "FD", "AIO"};
    91:   fprintf(stderr,"e: %p\n\t", e);
    92:   //fprintf(stderr,"portev_events: %x\n\t", e->portev_events);
    93:   fprintf(stderr,"portev_events: ");
    94: 
    95:   // I got these constants from the poll.h file
    96:   POLLPR(POLLIN); POLLPR(POLLOUT); POLLPR(POLLPRI);
    97:   POLLPR(POLLRDNORM); POLLPR(POLLRDBAND); POLLPR(POLLWRBAND);
    98: 
    99:   // in poll these are in a different field. port_event_t doesn't
   100:   // have that field, so lets try here.
   101:   POLLPR(POLLERR); POLLPR(POLLHUP); POLLPR(POLLNVAL); POLLPR(POLLREMOVE);
   102: 
   103:   fprintf(stderr," (%x)\n\t", e->portev_events);
   104: 
   105:   int src = e->portev_source;
   106:   if(PORT_SOURCE_AIO <= src && src <= PORT_SOURCE_ALERT)
   107:   {
   108:     fprintf(stderr,"portev_source: PORT_SOURCE_%s (%x)\n\t",
   109:       srcstr[src-PORT_SOURCE_AIO], src);
   110:   }
   111:   else
   112:   {
   113:     fprintf(stderr,"portev_source: %x\n\t", e->portev_source);
   114:   }
   115: 
   116:   fprintf(stderr,"portev_pad: %x\n\t", e->portev_pad);
   117:   // often our socket
   118:   fprintf(stderr,"portev_object: %x\n\t", e->portev_object);
   119:   fprintf(stderr,"portev_user: %p\n", e->portev_user);
   120: }
   121: 
   122: void
   123: evtport_demuxer::get_evts(bool poll)
   124: {
   125:   // Block until a single event appears on the port. Event will not fire
   126:   // again, so we get max 1 wakeup per event added.
   127: 
   128:   port_event_t  evt;
   129:   timespec    timeout, *tp = NULL;
   130: 
   131:   if(poll)    // effect a poll
   132:   {
   133:     timeout.tv_sec = 0;
   134:     timeout.tv_nsec = 0;
   135:     tp = &timeout;
   136:   }
   137: 
   138:   // wait for single event, no timeout
   139:   if(port_get(evtport, &evt, tp) == -1)
   140:   {
   141:     perror("port_get");
   142:     return;
   143:   }
   144: 
   145:   // fprintf(stderr,"PORT_GET RETURNED: "); print_port_evt(&evt);
   146: 
   147:   // get wakeup obj tucked away in the user cookie.
   148:   socket_wakeup*  sv = (socket_wakeup*)evt.portev_user;
   149:   int       s = evt.portev_object;
   150: 
   151:   assert(sv != NULL);
   152:   if(evt.portev_source != PORT_SOURCE_FD)
   153:   {
   154:     // when polling I often end up in here - we get an unknown evt
   155:     // source and a POLLNVAL event plus lots of other unknown flags.
   156:     // there's interesting looking stuff in the user field and so on,
   157:     // but it's nothing of mine and also undocumented
   158:     // fprintf(stderr,"got non PORT_SOURCE_FD (s=%i, sv=%p, src=%i)\n",
   159:     //  s, sv, evt.portev_source);
   160:     // fprintf(stderr, "skipping out...\n");
   161:     return;
   162:   }
   163: 
   164: 
   165:   // let's see what we've got for the wakeup
   166:   sv->wakeup_flags = 0;
   167: 
   168:   if(evt.portev_events & POLLERR)
   169:   {
   170:     fprintf(stderr,"ERRORS on s = %i, sv = %p\n", s, sv);
   171:     //evt.portev_events &= ~POLLERR;
   172:     //return;
   173:   }
   174: 
   175:   // for bidirectional wakeups, we should be able to get both
   176:   // POLLIN and POLLOUT at the same time, but I've not yet
   177:   // seen it happen, they're coming in one at a time for me.
   178: 
   179: 
   180:   if(evt.portev_events & POLLIN)
   181:   {
   182:     // fprintf(stderr,"GOT POLLIN FOR %p\n", sv);
   183:     sv->wakeup_flags |= PDEMUX_READ;
   184:   }
   185: 
   186:   if(evt.portev_events & POLLOUT)
   187:   {
   188:     // fprintf(stderr,"GOT POLLOUT FOR %p\n", sv);
   189:     sv->wakeup_flags |= PDEMUX_WRITE;
   190:   }
   191: 
   192:   // I never asked for POLLERR, but anyway
   193:   if(evt.portev_events & ~(POLLIN | POLLOUT | POLLERR))
   194:     {
   195:         fprintf(stderr,"UNSOLICITED events in evtport_demuxer (%x)\n",
   196:       evt.portev_events);
   197:     }
   198: 
   199:   assert(sv->wakeup_flags != 0);    // we should've gotten SOMETHING.
   200: 
   201:   if(sv->wakeup_flags)
   202:     sv->wakeup(*this);
   203: }
   204: }}
   205: 
End cpp section to demux/demux_evtport_demuxer.cpp[1]
Start cpp section to demux/demux_iocp_demuxer.cpp[1 /1 ]
     1: #line 1800 "./lpsrc/flx_demux.pak"
     2: #include "demux_iocp_demuxer.hpp"
     3: #include "demux_quitter.hpp" // for clean threaded iocp takedown
     4: 
     5: #include <stdio.h>      // for printf debugging
     6: #include <stddef.h>     // offsetof
     7: #include <assert.h>
     8: // shoving the win_queue in here for now
     9: 
    10: namespace flx { namespace demux {
    11: 
    12: // this could really do with auto objs. steal the strat stuff?
    13: 
    14: // add windows error processing macros. It's a bore otherwise.
    15: 
    16: // WaitForSingleObject on an kill event in the thread for thread cancel
    17: // kill_event = CreateEvent(NULL, TRUE, FALSE, NULL); (what's that)
    18: // SetEvent(kill_event) to invoke (?): SetEvent sets the event to the
    19: // signalled state. Return value is success flag. GetLastError.
    20: 
    21: // do auto SOCKET wrapper, check closesocket return code.
    22: 
    23: // a completion port is a queue into which the os puts notifications of
    24: // completed overlapped io requests. once the operation completes, a
    25: // notification is sent to a worker thread that can process the result.
    26: // a socket may be associated with a completion port at any point after
    27: // creation.
    28: 
    29: 
    30: // I don't see how to nicely stop a thread, I may have to have my own protocol
    31: // to ask it to exit.
    32: 
    33: // PostQueuedCompletionStatus can be used by threads to wake up a worker
    34: // thread. Could be handy replacement for timeout. "useful for notifying
    35: // worker threads of external events"
    36: 
    37: // working through this: http://msdn.microsoft.com/msdnmag/issues/1000/Winsock/
    38: // example of worker thread here
    39: // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/msmq/msmq_using_reading_msg_98j7.asp
    40: // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/fileio/fs/i_o_completion_ports.asp
    41: // nono, use this onec
    42: // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/msmq/msmq_using_reading_msg_98j7.asp
    43: // oh, wait they're the same
    44: // FormatMessge
    45: 
    46: winsock_initer::winsock_initer()
    47: {
    48:   WSADATA wsaData;
    49: 
    50:   // strange, rt_faio_01 static doesn't work under vs without this.
    51:   // hum.
    52:   // fprintf(stderr, "WINSOCK INIT!!!\n");
    53:   // apparently 2.2's the way to go
    54:   int res= WSAStartup(MAKEWORD(2, 2), &wsaData);
    55:   if(res!= 0)
    56:   {
    57:     //JS: WSAGetLastError CANNOT be called, since WSAStartup failed
    58:     fprintf(stderr,"couldn't find usable winsock dll: %i\n", res);
    59:     throw res;
    60:   }
    61: }
    62: 
    63: winsock_initer::~winsock_initer()
    64: {
    65:   if(WSACleanup() != 0)
    66:   {
    67:     fprintf(stderr,"WSACleanup failed %i\n", WSAGetLastError());
    68:   }
    69: }
    70: 
    71: // iocp_wakeup base class for users of iocp_demuxer
    72: //static
    73: iocp_wakeup*
    74: iocp_wakeup::from_overlapped(LPOVERLAPPED olp)
    75: {
    76:   // calculate the address of this from overlapped member
    77:   // suffer an obligatory offsestof warning from broken gccs.
    78:   return (iocp_wakeup*)((char*)olp-offsetof(iocp_wakeup, ol));
    79: }
    80: 
    81: void
    82: iocp_wakeup::clear_overlapped()
    83: {
    84:   ZeroMemory(&ol, sizeof(ol));  // much better than memset, right?
    85: }
    86: 
    87: 
    88: iocp_demuxer::iocp_demuxer()
    89:   : iocp(NULL)
    90: {
    91:   // Create the completion port
    92:   // not sure what first 3 args do, but by specifying INVALID_HANDLE_VALUE
    93:   // for the first I think I can ignore the rest (apart from the last, numthreads)
    94:   // I still have to create the threads, but only NumberOfConcurrentThreads
    95:   // will wake up from GetQueuedCompletionStatus at a time. This looks to be
    96:   // slightly elastic...
    97: // NT 3.51 doesn't let you pass null filehandle, you've got to have a dummy
    98: // socket. keep that in mind. see InitializeIOCP in IOCPServer.cpp example
    99: // taken from codeproject. GetSystemInfo to find out num CPUs
   100:   fprintf(stderr,"CreateIoCompletionPort with ONE WORKER THREAD\n");
   101:   iocp = CreateIoCompletionPort(
   102:     INVALID_HANDLE_VALUE,
   103:     NULL,
   104:     (ULONG_PTR)0,
   105:     1       // 1 thread (zero means one for each CPU)
   106:   );
   107: 
   108:   if(NULL == iocp)
   109:   {
   110:     DWORD err = GetLastError();
   111:     fprintf(stderr,"failed to create completion port: %li\n", err);
   112:     throw -1;
   113:   }
   114: }
   115: 
   116: iocp_demuxer::~iocp_demuxer()
   117: {
   118:   fprintf(stderr, "~iocp (%p) NOW WITH ASYNC QUITTER!\n", iocp);
   119:   try
   120:   {
   121:     demux_quitter q;
   122:     q.quit(this);
   123:   }
   124:   catch(...)
   125:   {
   126:     fprintf(stderr, "~iocp_demuxer async quit threw exception!\n");
   127:     // now what do we do?
   128:   }
   129: 
   130: 
   131:   if(NULL != iocp && !CloseHandle(iocp))
   132:   {
   133:     DWORD err = GetLastError();
   134:     fprintf(stderr,"failed cleanup iocp: %li\n", err);
   135:   }
   136: }
   137: 
   138: int
   139: iocp_demuxer::associate_with_iocp(HANDLE obj, ULONG_PTR udat)
   140: {
   141:   // fprintf(stderr, "associating with iocp=%p: %p, udat: %lx\n",
   142:   //  iocp, obj, udat);
   143: 
   144:   // Any overlapped operations performed on the object will use the
   145:   // completion port for notification. The 3rd param can be used to pass
   146:   // per object context information. we'll just pass that back.
   147:   if(CreateIoCompletionPort(obj, iocp, udat, 0) == NULL) {
   148:     // adding the same obj twice without an intervening get completion
   149:     // status wakup gets an error 87, ERROR_INVALID_PARAMETER
   150:     fprintf(stderr,"CreateIoCompletionPort failed to register object: %li\n",
   151:       GetLastError());
   152:     return -1;
   153:   }
   154: 
   155:   return 0;
   156: }
   157: 
   158: void
   159: iocp_demuxer::get_evts(bool poll) {
   160:   // with multiple threads, this will actually wake up the last to
   161:   // block (lifo)
   162: 
   163:   // get context, call worker_thread
   164:   // need to be able to tell which thing completed, can have extra data
   165:   // following some kind of struct
   166:   // get this pointer
   167: 
   168:   // I guess to avoid swapping of thread context. By calling this on a given
   169:   // completion port this thread is associated with it until exit or respec
   170:   DWORD     nbytes;   // number of bytes in io transaction
   171:   ULONG_PTR     udat;   // user data - not using this atm
   172:   LPOVERLAPPED  olp;    // we get iocp_wakeup from this.
   173: 
   174: // If a socket handle associated with a completion port is closed,
   175: // GetQueuedCompletionStatus returns ERROR_SUCCESS, with *lpOverlapped
   176: // non-NULL and lpNumberOfBytes equal zero.
   177: 
   178:   int err = NO_ERROR;
   179: 
   180:   // No timeout. What does false mean? Eh. Could need a timeout to bring
   181:   // the thread down.
   182:   if(!GetQueuedCompletionStatus(iocp, &nbytes, &udat, &olp,
   183:     (poll) ? 0: INFINITE))
   184:   {
   185:     // That's strange - I sometimes get my ConnectEx errors popping
   186:     // out here (ERROR_SEM_TIMEOUT=121, ERROR_CONNECTION_REFUSED=1225)
   187:     // it looks like my args (overlapped, etc) are still filled out, so
   188:     // I can still awake the sleeper
   189:     err = GetLastError();   // doco says this & not WSALastError.
   190: 
   191:     // let's see: yep - there's my overlapped
   192:     // fprintf(stderr,"!iocp::wait: nbytes=%li, udat=%lx, io=%p, err=%i\n",
   193:     //  nbytes, udat, olp, err);
   194: 
   195:     if(WAIT_TIMEOUT == err)
   196:     {
   197:       // we get this a lot now that we can poll the iocp, so no output
   198:       // interestingly, nbytes = 1. what could that mean?
   199:       return;         // no wakeup
   200:     }
   201:     else if(ERROR_OPERATION_ABORTED == err)
   202:     {
   203:       // that's real bad manners. Or I could just ignore it. Anyway,
   204:       // any overlapped received is stale.
   205:       fprintf(stderr, "WHOA!!! - disassociate before killing handle\n");
   206:       return;         // no wakeup
   207:     }
   208:     else
   209:     {
   210:       fprintf(stderr,"GetQueuedCompletionStatus returned false: %i\n",
   211:         err);
   212:       // return here? relying on olp being NULL, to stop us dereffing
   213:     }
   214: 
   215:     // I'm going to assume that there's a good wakeup, and fall through
   216:     // We need to wakeup on some errors (like ERROR_CONNECTION_REFUSED)
   217:     // FALL THROUGH
   218:   }
   219: 
   220: // An IOCP is a very general event mechanism. It tells you not only about
   221: // the completion of reads & writes, but also of pretty much any asynchronous
   222: // event's completion. It doesn't quite fit in with my select style interfaces.
   223: // I've got general overlapped things completing here. I don't want them to
   224: // know about demuxers & so forth so I'll have to know about them.
   225: 
   226:   //fprintf(stderr,"HOLEY! Woke up!\n");
   227:   //fprintf(stderr,"nbytes=%li, udat=%lx, olp=%p, err=%i\n",
   228:   //  nbytes, udat, olp, err);
   229: 
   230:   // with polling it's normal not to get an overlapped pointer, because
   231:   // we may simply have timed out
   232:   assert( olp );
   233: 
   234:   // tell someone that some overlapped op finished
   235:   iocp_wakeup*  wakeup = iocp_wakeup::from_overlapped(olp);
   236: 
   237:   // passing olp may be redundant, seeing as it's contained in iocp_wakeup
   238:   wakeup->iocp_op_finished(nbytes, udat, olp, err);
   239: }
   240: 
   241: 
   242: 
   243: // simple utility fn, shouldn't be here. creates listener on any interface.
   244: // this could benifit from a SOCKET class. in failure returns INVALID_SOCKET
   245: // CURRENTLY EATS ERROR, SO DON'T BOTHER CHECKING
   246: SOCKET
   247: create_listener_socket(int* io_port, int backlog)
   248: {
   249:   fprintf(stderr,"creating_listener_socket\n");
   250:   SOCKET        listener;
   251: 
   252:   // could use WSASocket, but these seem to be turning out overlapped anyway
   253:   // at least after tangling with overlapped functions.
   254:   // socket returns INVALID_SOCKET on failure.
   255:   listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
   256: 
   257:   if (INVALID_SOCKET == listener)
   258:   {
   259:     fprintf(stderr,"listener create failed: %i\n", WSAGetLastError());
   260:     return INVALID_SOCKET;
   261:   }
   262: 
   263:   SOCKADDR_IN   addr;
   264: 
   265:   // msdn code examples don't zero the sockaddr. That makes me nervous.
   266:   ZeroMemory(&addr, sizeof(addr));
   267:   addr.sin_family = AF_INET;
   268:   addr.sin_addr.s_addr = htonl(INADDR_ANY);
   269:   addr.sin_port = htons(*io_port);
   270: 
   271:   // bind our name to the socket
   272:   int         res;
   273:   res = bind(listener, (LPSOCKADDR)&addr, sizeof(addr));
   274: 
   275:   if (SOCKET_ERROR == res)
   276:   {
   277:     fprintf(stderr,"bind() failed %i\n", WSAGetLastError());
   278:     if(closesocket(listener) == SOCKET_ERROR)
   279:     {
   280:       fprintf(stderr,"closesocket failed on listener: %i\n",
   281:         WSAGetLastError());
   282:     }
   283:     return INVALID_SOCKET;
   284:   }
   285: 
   286:   // if user wanted port chosen tell them what it turned out to be
   287:   if(0 == *io_port)
   288:   {
   289:     int namelen;
   290: 
   291:     if (getsockname(listener, (struct sockaddr *)&addr, &namelen)
   292:       == SOCKET_ERROR)
   293:     {
   294:       fprintf(stderr, "getsockname failed (%i)\n", WSAGetLastError());
   295: 
   296:       if(closesocket(listener) == SOCKET_ERROR)
   297:       {
   298:         fprintf(stderr,"closesocket failed on listener: %i\n",
   299:           WSAGetLastError());
   300:       }
   301:       return INVALID_SOCKET;
   302:     }
   303: 
   304:     *io_port = ntohs(addr.sin_port);
   305:   }
   306: 
   307:   // Set the socket to listen
   308:   res = listen(listener, backlog);
   309:   if (SOCKET_ERROR == res)
   310:   {
   311:     fprintf(stderr,"listen() failed %i\n", WSAGetLastError());
   312: 
   313:     if(closesocket(listener) == SOCKET_ERROR)
   314:     {
   315:       fprintf(stderr,"closesocket failed on listener: %i\n",
   316:         WSAGetLastError());
   317:     }
   318: 
   319:     return INVALID_SOCKET;
   320:   }
   321: 
   322:   return listener;
   323: }
   324: 
   325: // currently the following aren't used. Look forward to warnings
   326: // about them.
   327: 
   328: // the posix version of this made the socket nonblocking.
   329: // I don't seem to have to do that when using iocp. if you
   330: // want to create a nonblocking socket (or overlapped) pass
   331: // WSA_FLAG_OVERLAPPED to WSASocket. I've never had to
   332: // actually do this. How do you make accept do this? (supposing
   333: // you wanted to) WSAAccept doesn't have a flag for it (however
   334: // it does let you do conditional accepting).
   335: // There doesn't seem to be a sockopt
   336: // returns INVALID_SOCKET on failure. eats the err.
   337: SOCKET
   338: nice_accept(SOCKET listener)
   339: {
   340:   struct sockaddr_in  remoteaddr;
   341:   int         addrlen = sizeof(remoteaddr);
   342:   SOCKET        s;
   343: 
   344:   // accept returns INVALID_SOCKET when it fails
   345:   s = accept(listener, (struct sockaddr*)&remoteaddr, &addrlen);
   346: 
   347:   if(INVALID_SOCKET == s)
   348:   {
   349:     fprintf(stderr,"nice_accept failed (%i)\n", WSAGetLastError());
   350:   }
   351: 
   352:   // the posix version makes the socket nonblocking here
   353:   // we're not bothering
   354: 
   355:   return s;
   356: }
   357: 
   358: // returns SOCKET_ERROR on failure, with err in WSAGetLastError()
   359: static int
   360: connect_sock(SOCKET s, const char* addr, int port)
   361: {
   362:   struct sockaddr_in  sock_addr;
   363: 
   364:   memset(&sock_addr, 0, sizeof(sock_addr));
   365:   sock_addr.sin_family = AF_INET;
   366:   sock_addr.sin_addr.s_addr = inet_addr(addr);
   367:   sock_addr.sin_port = htons(port);
   368: 
   369:   return connect(s, (struct sockaddr *)&sock_addr, sizeof(sock_addr));
   370: }
   371: 
   372: // returns INVALID_SOCKET on failure, eats last error with WSAGetLastError
   373: // unlike the posix version, this does not make the socket nonblocking.
   374: SOCKET
   375: nice_connect(const char* addr, int port)
   376: {
   377:   SOCKET      s;
   378: 
   379:   if((s = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET
   380:     && connect_sock(s, addr, port) != SOCKET_ERROR)
   381:   {
   382:     return s;   /* success! */
   383:   }
   384: 
   385:   /* something happened (not as good as catch 22) */
   386:   fprintf(stderr,"nice_connect failed (%i)\n", WSAGetLastError());
   387: 
   388:   if(INVALID_SOCKET != s && closesocket(s) == SOCKET_ERROR)
   389:     fprintf(stderr,"nice close failed (%i)\n", WSAGetLastError());
   390: 
   391:   return INVALID_SOCKET;
   392: }
   393: 
   394: // returns -1 on error with errno in WSAGetLastError. 0 otherwise.
   395: // kind of crap.
   396: int
   397: set_tcp_nodelay(int s, int disable)
   398: {
   399:   BOOL  disable_nagle = (disable) ? true : false;
   400: 
   401:   int res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY,
   402:       (const char*)&disable_nagle, sizeof(disable_nagle));
   403: 
   404:   return (res == SOCKET_ERROR) ? -1 : 0;
   405: }
   406: 
   407: }}
   408: 
End cpp section to demux/demux_iocp_demuxer.cpp[1]
Start cpp section to demux/demux_overlapped.cpp[1 /1 ]
     1: #line 2209 "./lpsrc/flx_demux.pak"
     2: #include "demux_overlapped.hpp"
     3: #include <stdio.h>      // fprintf
     4: #include <assert.h>
     5: 
     6: // cygwin's copy of mswsock.h leaves something to be desired...
     7: #ifndef WSAID_CONNECTEX
     8: typedef
     9: BOOL
    10: (PASCAL FAR * LPFN_CONNECTEX) (
    11:     IN SOCKET s,
    12:     IN const struct sockaddr FAR *name,
    13:     IN int namelen,
    14:     IN PVOID lpSendBuffer OPTIONAL,
    15:     IN DWORD dwSendDataLength,
    16:     OUT LPDWORD lpdwBytesSent,
    17:     IN LPOVERLAPPED lpOverlapped
    18:     );
    19: 
    20: #define WSAID_CONNECTEX \
    21:     {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}}
    22: #endif
    23: 
    24: namespace flx { namespace demux {
    25: 
    26: // windows includes files here? vs will be fussy.
    27: 
    28: // AcceptEx
    29: 
    30: // return async finished flag (error flags - can be transmitted via class)
    31: // AcceptEx is the way to get accept connections via the IOCP
    32: bool
    33: acceptex_control_block::start_overlapped()
    34: {
    35:   clear_overlapped();
    36: 
    37: // I've seen two examples get the pointer to AcceptEx, just in case it
    38: // isn't implemented...
    39:   // fprintf(stderr,"AcceptExing: listen backlog => can succeed immediately\n");
    40: 
    41:   // this is only set when acceptex receives data and returns immediately.
    42:   // can't hurt to set it.
    43:   DWORD nbytes = 0;
    44:   BOOL  success;
    45: 
    46:   // note that in order to get the wakeup packet, the listener must
    47:   // already be associated with the iocp. for future async io, the acceptor
    48:   // must be associated too.
    49:   success = AcceptEx(listener, acceptor,
    50:     accept_buf,       // required - near/far address
    51:     0,            // receive data size - don't yet want this
    52:     ACCEPTEX_ADDR_SIZE,   // must be nonzero
    53:     ACCEPTEX_ADDR_SIZE,   // must be nonzero
    54:     &nbytes,        // only set if fn completes. should be 0
    55:     &ol);         // oblig. gets us back to the this ptr
    56: 
    57:   // if there is a backlog of connections, AcceptEx can return immediately
    58:   if(success)
    59:   {
    60:     // must clear the wait
    61:     fprintf(stderr,"WHOA! AcceptEx RETURNED SUCCESS IMMEDIATELY!\n");
    62:     fprintf(stderr, "This could be bad, as wait should call op_finish\n");
    63:     fprintf(stderr, "We also lose the udat cookie (set to NULL)\n");
    64:     // handle the successful wakeup
    65:     // complete_async_op(demux, drv, nbytes, NO_ERROR);
    66:     // I hope they don't want the udat pointer, because I
    67:     // just made it up (0=NULL). Not using it anyway.
    68:     iocp_op_finished(nbytes, 0, &ol, NO_ERROR);
    69:     return false;   // means no completion packet coming (correct?)
    70:   }
    71:   else
    72:   {
    73:     int err = WSAGetLastError();
    74:     // can also return WSACONNRESET, which isn't so bad
    75:     if(ERROR_IO_PENDING == err)
    76:     {
    77:       // fprintf(stderr,"AcceptEx returned ERROR_IO_PENDING - that's normal\n");
    78:       // This is the normal situation, fall through, leaving thread
    79:       // to sleep on wakeup call.
    80:     }
    81:     else
    82:     {
    83:       fprintf(stderr,"AcceptEx failed: %i\n", err);
    84:       fprintf(stderr,"returning true should wake thread to detect failure.\n");
    85:       return true;    // have self woken
    86:     }
    87:   }
    88:   return false;       // async not finished
    89: 
    90: }
    91: 
    92: // ConnectEx
    93: #if 0
    94: // apparently we're supposed to do this now to make the acceptee inherit
    95: // the listener's state. it is currently in the default state
    96: //err = setsockopt( sAcceptSocket,
    97: //  SOL_SOCKET,
    98: //  SO_UPDATE_ACCEPT_CONTEXT,
    99: //  (char *)&sListenSocket,
   100: //  sizeof(sListenSocket) );
   101: #endif
   102: 
   103: // what a pain in the arse (zzz)
   104: // This doesn't exist in win2000, so it'll need to be synchronous there.
   105: static int
   106: GetConnectExAddr(SOCKET s, LPFN_CONNECTEX* conn_fn)
   107: {
   108:   *conn_fn = NULL;
   109:   GUID      GuidConnectEx = WSAID_CONNECTEX;
   110:   DWORD     dwBytes;
   111:   int       err;
   112: 
   113:   err = WSAIoctl(s,   // why do I need this?
   114:     SIO_GET_EXTENSION_FUNCTION_POINTER,
   115:     &GuidConnectEx,
   116:     sizeof(GuidConnectEx),
   117:     conn_fn,
   118:     sizeof(*conn_fn),
   119:     &dwBytes,
   120:     NULL, NULL);    // no overlapped, no completion fun ptr
   121: //  fprintf(stderr,"Get addr dwbytes: %li\n", dwBytes);
   122:   return err;
   123: }
   124: 
   125: // this is the weirdest. To use ConnectEx, the socket must be already bound.
   126: // By trial and error, I found that it had to be bound to INADDR_ANY: 0.
   127: // So strange. Apparently I don't have to do it again if I want to reuse.
   128: static int
   129: bind_socket(SOCKET s)
   130: {
   131:   SOCKADDR_IN   addr;
   132: 
   133:   ZeroMemory(&addr, sizeof(addr));
   134:   addr.sin_family = AF_INET;
   135:   addr.sin_addr.s_addr = htonl(INADDR_ANY);
   136:   addr.sin_port = htons(0);
   137: 
   138:   return bind(s, (LPSOCKADDR)&addr, sizeof(addr));
   139: }
   140: 
   141: bool
   142: connectex_control_block::start_overlapped()
   143: {
   144:   clear_overlapped();
   145: 
   146:   // why not get this directly from the ConnectEx?
   147:   socket_err = ERROR_IO_PENDING;
   148: 
   149: 
   150:   DWORD bytes_sent = 0;   // we're not sending data on connect (yet)
   151:   BOOL  success;
   152: 
   153:   LPFN_CONNECTEX  pfConnectEx;
   154: 
   155:   // unfortunate, will fix up later.
   156:   // fprintf(stderr,"Getting ConnectEx address\n");
   157: 
   158:   // Turns out that ConnectEx isn't defined anywhere; I have to load its
   159:   // addr via WSAIoctl
   160:   // this is a bad way. make the driver cache it. why on earth is this
   161:   // call per-socket? does it really need to be that way?
   162:   if(GetConnectExAddr(s, &pfConnectEx) == SOCKET_ERROR)
   163:   {
   164:     fprintf(stderr,"GetConnectExAddr failed: %i\n", WSAGetLastError());
   165:     return true;
   166:   }
   167: 
   168:   // fprintf(stderr,"about to connectex to %s:%i, %i\n", addy, p, s);
   169: 
   170:   // this is so strange - I have to bind the socket to the localhost.
   171:   // if I don't, ConnectEx returns EINVAL. in any case, I won't need
   172:   // to do this again if I reuse this socket.
   173:   if(bind_socket(s) == SOCKET_ERROR)
   174:     fprintf(stderr,"ConnectEx bind failed: %i\n", WSAGetLastError());
   175: 
   176:   // I hope ConnectEx doesn't want this to hang around, because it's
   177:   // going to drop off the stack after this.
   178:   SOCKADDR_IN   addr;
   179: 
   180:   // some examples don't zero the addr. That makes me nervous.
   181:   ZeroMemory(&addr, sizeof(addr));
   182:   addr.sin_family = AF_INET;
   183:   addr.sin_addr.s_addr = inet_addr(addy);
   184:   addr.sin_port = htons(p);
   185: 
   186:   // in order to receive the wakeup packet, s must already be associated
   187:   // with the iocp. this is best done at socket creation time. for these
   188:   // sockets it's probably best to also bind them at the same time.
   189:   // that requires "purposed" sockets (CreateConnectSocket?).
   190:   // p.s. the default (waio_base) wakeup is doing fine for now.
   191: 
   192:   success = (*pfConnectEx)(s, // socket
   193:     (LPSOCKADDR)&addr,    // connect address
   194:     sizeof(addr),     // size thereof
   195:     NULL,         // not sending any data yet, but we could
   196:     0,            // ditto
   197:     NULL,         // should be zero until this changes
   198:     &ol);         // oblig. gets us back to the this ptr
   199: 
   200: // there's a caveat about the type of socket s becomes after ConnectEx.
   201: // It's in some kind of default state and cannot be used with shutdown
   202: // change it with setsockopt (?)
   203:   if(success)
   204:   {
   205:     fprintf(stderr,"WHOA! ConnectEx RETURNED SUCCESS IMMEDIATELY!\n");
   206:     fprintf(stderr, "BAD: calls op_finish and loses udat cookie\n");
   207:     // handle the successful wakeup. (udat=0, olp=&ol)
   208:     iocp_op_finished(bytes_sent, 0, &ol, NO_ERROR);
   209:     return false;   // means no completion packet coming (correct?)
   210:   }
   211:   else
   212:   {
   213:     int err = WSAGetLastError();
   214: 
   215:     if(ERROR_IO_PENDING == err)
   216:     {
   217:       // fprintf(stderr,"ConnectEx pending...\n");
   218:       // This is the normal situation, fall through, leaving thread
   219:       // to sleep on wakeup call.
   220:     }
   221:     else
   222:     {
   223:       // maybe store the error here. that could work for all
   224:       // windows wakeups
   225:       fprintf(stderr,"ConnectEx failed: %i\n", err);
   226:       return true;    // have self woken
   227:     }
   228:   }
   229:   return false;       // not finished
   230: }
   231: 
   232: // TransmitFile
   233: 
   234: bool
   235: transmitfile_control_block::start_overlapped()
   236: {
   237:   clear_overlapped();
   238: 
   239:   // 0 bytes => transmit entire file
   240:   // the second zero means use the default chunk size
   241:   // the NULL is for mem buffers to bookend the file with. nothing yet.
   242:   // the final zero is for various flags, including a way of doing
   243:   // DisconnectEx style socket reuse (more widely compatible?)
   244: 
   245:   // in order to receive the wakeup, s must already be associated with the
   246:   // iocp. this is best done at socket creation time.
   247:   if(TransmitFile(s, file, 0, 0, &ol, NULL, flags))
   248:   {
   249:     fprintf(stderr,"Transmit file succeeded immediately! waking...\n");
   250:     return true;
   251:   }
   252:   else
   253:   {
   254:     DWORD err = WSAGetLastError();
   255: 
   256:     // will need to actually signal something
   257:     // fprintf(stderr,"signal TransmitFile failure!\n");
   258:     if(ERROR_IO_PENDING != err && WSA_IO_PENDING != err)
   259:       fprintf(stderr,"genuine error from TransmitFile: %li\n", err);
   260:   }
   261:   return false;
   262: }
   263: 
   264: 
   265: // SOCKET io using WSASend and WSARecv
   266: 
   267: // windows style control blocks
   268: wsasocketio_control_block::wsasocketio_control_block(SOCKET src, sel_param* pb,
   269:   bool inread)
   270:   : s(src), ppb(pb), reading(inread)
   271: {
   272: }
   273: 
   274: bool
   275: wsasocketio_control_block::start_overlapped()
   276: {
   277:   clear_overlapped();
   278: 
   279:   error = 0;
   280: 
   281:   // num bytes received IF recv completes immediately.
   282:   DWORD imm_bytes;
   283:   int   recv_res;
   284: 
   285:   // set up the single wbuf, bearing in mind we may be part way.
   286:   wbufs[0].len = ppb->buffer_size - ppb->bytes_written;
   287:   wbufs[0].buf = ppb->buffer + ppb->bytes_written;
   288: 
   289: // fprintf(stderr, "sockio: %p->finished = %i, reading = %i\n",
   290: //  ppb, ppb->finished(), reading);
   291: 
   292:   // Ideally, we would like to be able to use MSG_WAITALL, which would
   293:   // let us only get a completion packet when either all the data was
   294:   // available or the connection had been closed or shutdown.
   295:   // Unfortunately this is not possible for non-blocking sockets, so
   296:   // we have to take whatever we get and then call WSARecv again.
   297: 
   298:   //#define MSG_WAITALL 0   // not defined in cygwin - apparently this
   299:   //DWORD flags = MSG_WAITALL;
   300: 
   301:   // ah, unfortunately MSG_WAITALL is not supported for non blocking sockets
   302:   // we'll just have to do it ourselves
   303:   DWORD flags = MSG_PARTIAL;
   304: 
   305:   // completion routines! (unused)
   306:   if(reading)
   307:     recv_res = WSARecv(s, wbufs, NUM_WBUFS, &imm_bytes, &flags, &ol, NULL);
   308:   else
   309:     recv_res = WSASend(s, wbufs, NUM_WBUFS, &imm_bytes, flags, &ol, NULL);
   310: 
   311:   // don't know if I need to check non winsock errs
   312: 
   313:   switch(recv_res)
   314:   {
   315:     case 0:
   316:     {
   317:       // flags are updated to indicate what? if there was a callback, it
   318:       // would be scheduled to be called when this thread is in the
   319:       // waitable state, whatever that means.
   320:       //fprintf(stderr,
   321:       //  "WSA%s completed immediately!!! nbytes: %li, flags: %lx\n",
   322:       //    (reading) ? "Recv" : "Send", imm_bytes, flags);
   323: 
   324:       // looks like we get the completion packet even if we do finish
   325:       // immediately so let the iocp wake us. note that this method
   326:       // of manually calling iocp_op_finished is not so great as we
   327:       // don't know the (as yet unused) udat cookie and so set it to 0.
   328:       // fprintf(stderr, "calling finished manually (ppb=%p)\n", ppb);
   329:       // iocp_op_finished(imm_bytes, 0, &ol, NO_ERROR);
   330: 
   331:       // false because iocp_op_finished will wake us. I guess false from
   332:       // these guys means that a completion packet is in the mail and
   333:       // true means that it isn't (in the mail).
   334:       return false;
   335:     }
   336:     break;
   337:     case SOCKET_ERROR:
   338:     {
   339:       DWORD err = WSAGetLastError();
   340: 
   341:       // normal mode - wait for completion
   342:       // fyi, xp pro seems to mostly give us ERROR_IO_PENDING
   343:       if(ERROR_IO_PENDING == err || WSA_IO_PENDING == err)
   344:       {
   345:         // fprintf(stderr,"WSA%s pending completion (%li)\n",
   346:         //  (reading) ? "Recv" : "Send", err);
   347:         return false;
   348:       }
   349: 
   350:       fprintf(stderr,"WSARecv/Send returned SOCKET_ERR: %li\n", err);
   351:       return true;    // assume it's bad and we won't get a wakeup
   352:     }
   353:     break;
   354:     default:
   355:     {
   356:       fprintf(stderr,"WSARecv/Send returned other error: %i, GetLastError: %li\n",
   357:         recv_res, GetLastError());
   358:       return true;        // wake up
   359:     }
   360:     break;
   361:   }
   362: 
   363:   return false;
   364: }
   365: 
   366: // NB: called by iocp::wait, so be aware of which thread is doing what, lest
   367: // you be surprised by this being called asynchronously.
   368: void
   369: wsasocketio_control_block::iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
   370:   LPOVERLAPPED olp, int err)
   371: {
   372:   error = err;        // copy back for others to look at.
   373:                 // perhaps move back to iocpwakeup
   374: // fprintf(stderr, "wsasocketio::finished: ppb=%p, nbytes=%li, err=%i, read=%i\n",
   375: //  ppb, nbytes, err, reading);
   376: 
   377:   if(err)
   378:   {
   379:     fprintf(stderr, "wsasocketio, got error: %i\n", err);
   380:   }
   381: 
   382:   // fprintf(stderr,"wsa_socketio wakeup, nb: %li, err: %i\n", nbytes, err );
   383: assert( !ppb->finished() );
   384:   // keep track of bytes received.
   385:   ppb->bytes_written += nbytes;
   386: 
   387:   if(0 == nbytes)
   388:   {
   389:     fprintf(stderr, "wsaiosocket got zero bytes: err=%i, read=%i\n",
   390:       err, reading);
   391:   }
   392: 
   393:   // if we're not finished, we have to reinstall our request
   394:   // zero bytes indicates shutdown/closure, right?
   395:   // might be using this for WSASend. Instead of broken pipes on win32,
   396:   // instead we get WSAECONNRESET (pretty sure) on write. On read?
   397: // not sure about this - I don't think we have to check nbytes == 0
   398:   if(0 == nbytes || ppb->finished())
   399:   {
   400:     return;
   401:   }
   402:   else
   403:   {
   404:     // go back around again
   405:     fprintf(stderr,"didn\'t get everything (%li of %li bytes)\n",
   406:       ppb->bytes_written, ppb->buffer_size);
   407:     if(start_overlapped())
   408:     {
   409:       fprintf(stderr, "UM, socket restart finished immediately\n");
   410:       fprintf(stderr, "causes new wakeup? or should I loop around?\n");
   411:     }
   412:   }
   413: }
   414: 
   415: 
   416: // file/pipe io using ReadFile and WriteFile
   417: 
   418: winfileio_control_block::winfileio_control_block(HANDLE f, void* buf, int len,
   419:   bool inread)
   420:   : reading(inread), file(f)
   421: {
   422:   // pb is not so useful here. we only want to
   423:   // know num bytes written/processed.
   424:   pb.buffer = (char*)buf;
   425:   pb.buffer_size = len;
   426:   pb.bytes_written = 0;
   427: }
   428: 
   429: // if file is opened with FILE_FLAG_OVERLAPPED, we can do "immutable file ptr"
   430: // ops & set the desired offset within the overlapped. can also stick an
   431: // event to signal in there.
   432: bool
   433: winfileio_control_block::start_overlapped()
   434: {
   435:   // fprintf(stderr,"winfileio_cb::start_overlapped: reading=%i\n", reading);
   436:   clear_overlapped();
   437: 
   438:   // DWORD  imm_bytes;
   439:   BOOL  success;
   440: 
   441:   // don't need bytes read, written when we have an OVERLAPPED
   442:   if(reading)
   443:     // success = ReadFile(file, pb.buffer, pb.buffer_size, &imm_bytes, &ol);
   444:     success = ReadFile(file, pb.buffer, pb.buffer_size, NULL, &ol);
   445:   else
   446:     //success = WriteFile(file, pb.buffer, pb.buffer_size, &imm_bytes, &ol);
   447:     success = WriteFile(file, pb.buffer, pb.buffer_size, NULL, &ol);
   448: 
   449:   // fprintf(stderr,"immbytes = %li\n", imm_bytes);
   450: 
   451:   if(!success)
   452:   {
   453:     int err = GetLastError();
   454: 
   455:     // I'm getting this for unfinished
   456:     if(ERROR_IO_PENDING == err)
   457:     {
   458:        return false;  // sleep on
   459:     }
   460:     else
   461:     {
   462:        fprintf(stderr,"%sFile failed! (%li)\n", (reading) ? "Read" : "Write",
   463:          err);
   464:        return true;      // ask for wakeup
   465:     }
   466: 
   467:     // fprintf(stderr,"%sFile failed! (%li)\n", (reading) ? "Read" : "Write",
   468:     //  GetLastError());
   469:     // fprintf(stderr,"do I still get completion packet???\n");
   470:     // assume not
   471:   }
   472: 
   473:   return false;       // sleep on
   474: }
   475: 
   476: }}
   477: 
End cpp section to demux/demux_overlapped.cpp[1]
Start cpp section to demux/demux_overlapped.hpp[1 /1 ]
     1: #line 2687 "./lpsrc/flx_demux.pak"
     2: #ifndef __DEMUX_OVERLAPPED__
     3: #define __DEMUX_OVERLAPPED__
     4: #include "flx_demux_config.hpp"
     5: 
     6: // visual studio is quite sensitve about how you do these includes.
     7: #include <WinSock2.h>              // Winsock2 (WSABufs, etc) must come before Windows.h
     8: #include "demux_iocp_demuxer.hpp"  // this header file include Windows.h
     9: #include <MSWSock.h>  // AcceptEx, TF_REUSE_SOCKET, etc
    10: 
    11: namespace flx { namespace demux {
    12: 
    13: // rename these to control block something or other
    14: // get rid of default constructors - faio can worry about that.
    15: 
    16: // WARNING: in some "immediate completion" cases I have to call
    17: // the finished function myself - in these cases I set the udat to 0, making
    18: // it not very reliable. Either make sure the user understands immediate
    19: // finish (and does it themselves) or keep a copy of udat.
    20: 
    21: // listener socket must be already associated with an IOCP
    22: // in doing an AcceptEx, it might succeed immediately - do you still
    23: // get the IOCP wakeup?
    24: class DEMUX_EXTERN acceptex_control_block : public iocp_wakeup {
    25:   enum { ACCEPTEX_ADDR_SIZE = sizeof(SOCKADDR_IN) + 16 };
    26: 
    27: public:
    28:   SOCKET  listener, acceptor;
    29:   // there are two of these!
    30:   char  accept_buf[2*ACCEPTEX_ADDR_SIZE];
    31: 
    32:   virtual bool start_overlapped();
    33: 
    34:   acceptex_control_block()
    35:     : listener(INVALID_SOCKET), acceptor(INVALID_SOCKET) {}
    36: };
    37: 
    38: class DEMUX_EXTERN connectex_control_block : public iocp_wakeup
    39: {
    40: public:
    41:   // move further back?
    42:   int socket_err;         // outgoing
    43: 
    44:   // can have buffer to be sent on connection
    45:   SOCKET    s;          // previously unbound socket
    46:   const char* addy;       // ipv4 address
    47:   int     p;          // port number
    48: 
    49:   // socket_err undefined
    50:   connectex_control_block() : s(INVALID_SOCKET), addy(0), p(0) {}
    51: 
    52:   // see posix version of this, try to keep them in sync. give socket_err
    53:   // initial definition that works with this?
    54:   bool finished() { return ERROR_IO_PENDING != socket_err; }
    55: 
    56:   virtual bool start_overlapped();
    57: };
    58: 
    59: // TransmitFile here (requires file handle)
    60: class DEMUX_EXTERN transmitfile_control_block : public iocp_wakeup {
    61:   SOCKET  s;
    62:   HANDLE  file;
    63:   DWORD flags;                // for possible socket reuse.
    64: public:
    65: 
    66:   transmitfile_control_block(SOCKET dst)      // for reuse of socket
    67:     : s(dst), file(NULL), flags(TF_DISCONNECT | TF_REUSE_SOCKET) {}
    68: 
    69:   transmitfile_control_block(SOCKET dst, HANDLE src)  // actual transmitfile
    70:     : s(dst), file(src), flags(0) {}
    71: 
    72:   virtual bool start_overlapped();
    73: };
    74: 
    75: 
    76: // handles both WSASend & WSARecv
    77: class DEMUX_EXTERN wsasocketio_control_block : public iocp_wakeup {
    78: protected:
    79:   enum { NUM_WBUFS = 1 }; // just one for now, but can do scattered send/recvs
    80:   WSABUF    wbufs[NUM_WBUFS];
    81: public:
    82:   SOCKET    s;
    83:   sel_param*  ppb;      // on input what you want, on output what you got
    84:   int     error;
    85:   bool    reading;  // else use WSASend
    86: 
    87:   // watch the memory interfaces here, going back and forth between threads.
    88:   wsasocketio_control_block(SOCKET src, sel_param* pb, bool read);
    89: 
    90:   virtual bool start_overlapped();
    91: 
    92:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    93:     LPOVERLAPPED olp, int err);
    94: };
    95: 
    96: // looks a bit like wsasocketio_control_block (bad name, sends too)
    97: class DEMUX_EXTERN winfileio_control_block : public iocp_wakeup {
    98:   bool    reading;
    99: public:
   100: // probably should be a pointer (?)
   101:   sel_param pb;
   102:   HANDLE    file; // I like to modify this from the outside
   103: 
   104:   // offset?
   105:   winfileio_control_block(HANDLE f, void* buf, int len, bool read);
   106: 
   107:   virtual bool start_overlapped();
   108: 
   109:   // NB: no iocp_op_finished callback. defined by users of the class.
   110: };
   111: 
   112: }}
   113: 
   114: #endif
End cpp section to demux/demux_overlapped.hpp[1]
Start cpp section to demux/demux_kqueue_demuxer.cpp[1 /1 ]
     1: #line 2802 "./lpsrc/flx_demux.pak"
     2: // kqueue demuxer for bsd/os x
     3: // N.B. calling close on a file descriptor will remove any kevents that
     4: // reference that descriptor. that would explain remove complaining from
     5: // time to time.
     6: // try EV_EOF to pick up eofs, useful for async file io.
     7: 
     8: #include "demux_kqueue_demuxer.hpp"
     9: 
    10: #include <stdio.h>      // perror
    11: #include <unistd.h>     // close
    12: 
    13: #include <sys/types.h>    // from the kqueue manpage
    14: #include <sys/event.h>    // kernel events
    15: #include <sys/time.h>   // timespec (kevent timeout)
    16: 
    17: // #include <sys/syscall.h> // syscall(SYC_close,kq) close workaround
    18: 
    19: namespace flx { namespace demux {
    20: kqueue_demuxer::kqueue_demuxer()
    21:   : kq(-1)
    22: {
    23:   // Not that you care, but this event queue is not inherited by
    24:   // forked children.
    25:   kq = kqueue();
    26:   if(-1 == kq)
    27:   {
    28:     perror("kqueue");
    29:     throw -1;
    30:   }
    31: }
    32: 
    33: kqueue_demuxer::~kqueue_demuxer()
    34: {
    35:   // calling close on a kq while a thread is waiting in kevent causes close
    36:   // to block! this happens on 10.4. Hard to say on 10.3 as close simply
    37:   // fails there. we need to wake the waiting thread, so we'll use that
    38:   // handy self pipe waker. Luckily, kqueues are responsive to new fds,
    39:   // otherwise we'd need the self pipe waker to be there from the start
    40:   // p.s. it's also bad form to destruct a demuxer while a thread waits
    41:   // on it. top marks to kqueues for making this obvious, passing fail
    42:   // to me for not applying the same to the other async demuxers.
    43:   async_quit();
    44: 
    45:   //if(syscall(SYS_close, kq) == -1)
    46:   // I don't seem to be able to close a kq. can't fstat it either
    47:   if(-1 != kq && close(kq) == -1)
    48:     perror("kqueue close");
    49: }
    50: 
    51: 
    52: // Events of interest to us ERead, EWrite.
    53: // ERead has fflags: NOTE_LOWAT, NOTE_EOF. ident is a descriptor (any?)
    54: 
    55: // if you're using the kqueue_demuxer to do a single biderectional wakeup,
    56: // be aware that it currently breaks the "one shot" rule, that is you
    57: // make get an unexpected wakeup the next time you call wait.
    58: int
    59: kqueue_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    60: {
    61:   // we only know these flags
    62:   if((flags & ~(PDEMUX_READ | PDEMUX_WRITE))) return -1;
    63: 
    64: // could set wakeup_flags here of what's been installed!
    65: 
    66:   // FUCK - can only do one at a time with kqueues
    67:   // For those doing both, if one fails, you're in a bit of trouble.
    68:   if(flags & PDEMUX_READ)
    69:   {
    70:     if(add_kqueue_filter(sv, EVFILT_READ) == -1) return -1;
    71:   }
    72: 
    73:   if(flags & PDEMUX_WRITE)
    74:   {
    75:     if(add_kqueue_filter(sv, EVFILT_WRITE) == -1) return -1;
    76:   }
    77: 
    78:   return 0;
    79: }
    80: 
    81: int
    82: kqueue_demuxer::add_kqueue_filter(socket_wakeup* sv, short filter)
    83: {
    84:   int       s = sv->s;
    85:   struct kevent evt;
    86: 
    87:   // this works just like select if the s is a listening socket
    88:   // *except* works with all types of fds, including pipes, files & fifos
    89:   // can set low water mark for reads with NOTE_LOWAT in fflags and size
    90:   // in data. on return data contains number of bytes available to read
    91:   // on return sets EV_EOF in flags if read dir socket has shutdown and
    92:   // returns a possible socket err in fflags
    93:   // should that be EV_ENABLE | EV_ADD. fflags zero cos I don't know what
    94:   // to put there. pass pb in udata
    95: 
    96:   // adding EV_ONESHOT to save me removing on wakeup (a syscall).
    97:   // I now require that during the evt be removed before wakeup fn.
    98: 
    99:   EV_SET(&evt, s, filter, EV_ADD | EV_ONESHOT, 0, 0, sv);
   100:   // trying to detect when have reached eof with async file io using kq
   101:   //EV_SET(&evt, s, EVFILT_READ, EV_ADD, | EV_ONESHOT NOTE_LOWAT, 16*1024, sv);
   102: 
   103:   // add event
   104:   if(kevent(kq, &evt, 1, NULL, 0, NULL) < 0)
   105:   {
   106:     perror("kevent add_kqueue_filter");
   107:     return -1;
   108:   }
   109:   return 0;
   110: }
   111: 
   112: // useful, but unused atm, thanks to ONESHOT.
   113: // this function skirts the portability boundaries of the demux interface
   114: // for kqueues each event monitor is identified (for sockets) by a pair,
   115: // (s, filter), or for us, (s, {in|out}). This means that we can add
   116: // individual wakeups for reading and writing but not both at once.
   117: // I think epoll can do both, and so can select (and N/A to IOCPs).
   118: // This "both at once" thing can't easily be one shot. There's a good
   119: // case for this behaviour to be defined "undefined". Not many people
   120: // using this part - could be caveat emptor...
   121: int
   122: kqueue_demuxer::remove_kqueue_filter(int s, short filter)
   123: {
   124:   struct kevent evt;
   125: 
   126:   EV_SET(&evt, s, filter, EV_DELETE, 0, 0, NULL);
   127:   if(kevent(kq, &evt, 1, NULL, 0, NULL) < 0)
   128:   {
   129:     perror("kevent remove_socket_wakeup");
   130:     return -1;
   131:   }
   132: 
   133:   return 0;
   134: }
   135: 
   136: int
   137: kqueue_demuxer::remove_socket_wakeup(int s, int flags)
   138: {
   139:   int r1 = 0, r2 = 0;
   140: 
   141:   if(flags & PDEMUX_READ) r1 = remove_kqueue_filter(s, EVFILT_READ);
   142:   if(flags & PDEMUX_WRITE) r2 = remove_kqueue_filter(s, EVFILT_WRITE);
   143: 
   144:   // If you want to know which one failed, you're out of luck.
   145:   if(r1 || r2) return -1;
   146: 
   147:   return 0;
   148: }
   149: 
   150: // from "advanced macos programming", on reading shutdown causes
   151: // the EV_EOF flag to be set in the flags field and returns errno
   152: // in the fflags field. There may still be pending data to read
   153: // when EV_EOF is set. The data field says how many bytes available.
   154: // for writing data says how much you can write. EV_EOF is set
   155: // when the reader "disconnects". Says nothing about errno/fflags
   156: // in this case.
   157: /*
   158:     fprintf(stderr,"readevt on %i, EOF = %s\n",
   159:       s, (ev.flags & EV_EOF) ? "TRUE" : "FALSE");
   160:  */
   161: 
   162: // do that thing where you get the events. can I get them one at a time?
   163: // I bet I can.
   164: void
   165: kqueue_demuxer::get_evts(bool poll)
   166: {
   167:   // event seems to remain unless we remove it
   168:   struct kevent ev;
   169:   int       nevts;
   170: 
   171:   struct timespec timeout, *tptr = NULL;
   172: 
   173:   if(poll)
   174:   {
   175:     timeout.tv_sec = 0;   // effectuate a poll
   176:     timeout.tv_nsec = 0;
   177:     tptr = &timeout;
   178:   }
   179: 
   180:   // timeout.tv_sec = 1;    // timeout every second
   181:   // timeout.tv_nsec = 0; // 10^9 nanoseconds per second
   182: 
   183:   nevts = kevent(kq, NULL, 0, &ev, 1, tptr);  // wait or poll
   184: 
   185:   if(nevts <= 0)
   186:   {
   187:     // error, else timeout & return to allow cancel
   188:     if(nevts < 0)
   189:       perror("kevent event fetch");
   190: 
   191:     return;
   192:   }
   193: 
   194:   // fprintf(stderr,"kqueue wakeup!\n");
   195: 
   196:   socket_wakeup*  sv = (socket_wakeup*)ev.udata;
   197: 
   198:   // The filters are not bit fields, hence they must come in serially.
   199:   // this means you're never going to get both read and write on
   200:   // a kqueue_demuxer wake up. No worries.
   201:   if(ev.filter == EVFILT_READ)
   202:   {
   203:   // this capability is lost for the moment, as we have no way
   204:   // of explaining it to felix. the event stuff isn't so good right now
   205: /*
   206:     // can chunk up on accepts. nice one kqueue
   207:     if(NULL == sv)      // => listener
   208:     {
   209:       int backlog = (int)ev.data;
   210:       // fprintf(stderr,"kq listen backlog: %i\n", backlog);
   211:       for(int i = 0; i < backlog; i++) handle_connection();
   212:     }
   213:     else
   214: */
   215:     // If a socket wakeup were a control block, you'd set the err here.
   216:     if(0 && ev.flags & EV_EOF)
   217:     {
   218:       // errno in fflags!
   219:       fprintf(stderr,
   220:         "got EV_EOF on read, %i bytes remain in buffer, errno=%i\n",
   221:         (int)ev.data, ev.fflags);
   222:     }
   223:     // fprintf(stderr,"EVFILT_READ: got %i bytes coming\n", (int)ev.data);
   224:     // remove_reading_fd(s);      // now useing EV_ONESHOT
   225: // remove other outstanding here...?
   226:     sv->wakeup_flags = PDEMUX_READ;   // Tell 'em what they've won.
   227:     sv->wakeup(*this);
   228:   }
   229:   else if(ev.filter == EVFILT_WRITE)
   230:   {
   231:     // fprintf(stderr,"EVFILT_WRITE: can write (?) %i bytes\n",
   232:     //  (int)ev.data);
   233: 
   234:     // using oneshot mode now.
   235:     // remove_writing_fd(s);
   236: 
   237:     if(ev.flags & EV_EOF)
   238:     {
   239:       // errno in fflags? data should be zero bytes, right?
   240:       // can't write anything
   241:       fprintf(stderr,
   242:         "got EV_EOF on write, data bytes =%i (0?), errno/fflags?=%i\n",
   243:         (int)ev.data, ev.fflags);
   244:     }
   245: // remove other outstanding here?
   246:     sv->wakeup_flags = PDEMUX_WRITE;
   247:     sv->wakeup(*this);
   248:   }
   249:   else
   250:   {
   251:     fprintf(stderr,"unsolicited event from kqueue (%i)...\n", ev.filter);
   252:     // no wakeup
   253:   }
   254: }
   255: }}
   256: 
End cpp section to demux/demux_kqueue_demuxer.cpp[1]
Start cpp section to demux/demux_pfileio.cpp[1 /1 ]
     1: #line 3059 "./lpsrc/flx_demux.pak"
     2: #include <stdio.h>    // printf
     3: #include <errno.h>    // errno
     4: #include "demux_pfileio.hpp"
     5: 
     6: // blocking reads & writes that use a worker fifo. users overload
     7: // finished flag to implement wakeup
     8: 
     9: // if we could group the requests, we could do a scattered read
    10: // or we could do single reads if the requests were of a similar
    11: // nature, i.e. the whole file, of popular files.
    12: 
    13: // for pwrite/pread, I'm supposed to include the following three (osx man page)
    14: // they don't appear to be necessary, but let's play it safe
    15: #include <sys/types.h>
    16: #include <sys/uio.h>
    17: #include <unistd.h>
    18: 
    19: namespace flx { namespace demux {
    20: // fileio_request stuff follows
    21: 
    22: // read or write in a blocking fashion. I like the idea of using pread
    23: // which doesn't change the file pointer. this could allow reuse of the same
    24: // file descriptor & block caching
    25: 
    26: fileio_request::~fileio_request(){}
    27: fileio_request::fileio_request(){}
    28: 
    29: fileio_request::fileio_request(int f, char* buf, long len, long off, bool rd)
    30:   : offset(off), fd(f), read_flag(rd), err(0)
    31: {
    32:   pb.buffer = buf;
    33:   pb.buffer_size = len;
    34:   pb.bytes_written = 0;
    35: }
    36: 
    37: // synchronously process read/write
    38: void
    39: fileio_request::doit()
    40: {
    41:   // fprintf(stderr,"faio about to try to %s %i bytes from fd=%i\n",
    42:   //  (read_flag) ? "read" : "write", pb.buffer_size, fd);
    43: 
    44: // switching off (explicit) seeks for now because I'm not using them
    45: // in the flx code & I'm not passing around enough info (just the fd)
    46:   ssize_t res;
    47: 
    48:   if(read_flag)
    49:   {
    50:     // res = pread(fd, pb.buffer, pb.buffer_size, offset);
    51:     res = read(fd, pb.buffer, pb.buffer_size);
    52:   }
    53:   else
    54:   {
    55:     // res = pwrite(fd, pb.buffer, pb.buffer_size, offset);
    56:     res = write(fd, pb.buffer, pb.buffer_size);
    57:   }
    58: 
    59:   // zero return value indicates end of file. that should just work.
    60:   if(-1 == res)
    61:   {
    62:     err = errno;    // grab errno
    63:     fprintf(stderr,"faio error: %i\n", err);
    64:   }
    65:   else
    66:   {
    67:     // fprintf(stderr,"faio %s %i bytes\n", (read_flag) ? "read" : "write", res);
    68:     pb.bytes_written = res;
    69:   }
    70: }
    71: }}
    72: 
End cpp section to demux/demux_pfileio.cpp[1]
Start cpp section to demux/demux_posix_demuxer.cpp[1 /1 ]
     1: #line 3132 "./lpsrc/flx_demux.pak"
     2: #include "demux_posix_demuxer.hpp"
     3: #include "demux_sockety.hpp"
     4: #include "demux_quitter.hpp" // fns for waking and quitting a demuxer
     5: 
     6: #include <stdio.h>        // "printf"
     7: #include <assert.h>       // assert
     8: #include <string.h>       // strerror
     9: #include <unistd.h>       // close
    10: 
    11: #include <sys/types.h>      // send/recv
    12: #include <sys/socket.h>
    13: 
    14: //#include <sys/errno.h>
    15: #include <errno.h>        // GUSI & solaris prefer this
    16: 
    17: namespace flx { namespace demux {
    18: 
    19: posix_demuxer::~posix_demuxer()
    20: {
    21: }
    22: 
    23: bool
    24: posix_demuxer::socket_recv(int s, sel_param* pb)
    25: {
    26:   // why do I have the zero buffer size?
    27:   assert(pb->buffer_size > pb->bytes_written || 0 == pb->buffer_size);
    28:   ssize_t     nbytes;
    29: 
    30:   // if this were read then this fn would work with non-sockets
    31:   nbytes = recv(s, pb->buffer + pb->bytes_written,
    32:         pb->buffer_size - pb->bytes_written, 0);
    33: 
    34:   if(nbytes <= 0)
    35:   {
    36:     if(nbytes == 0)
    37:     {
    38:       return true;        // connection closed
    39:     }
    40:     else
    41:     {
    42:       perror("recv");       // can get reset connection here
    43:       return true;        // so say closed, yeah?
    44:     }
    45:   }
    46:   else
    47:   {
    48:     // got some data
    49:     pb->bytes_written += nbytes;
    50:   }
    51:   return false;           // connection didn't close
    52: }
    53: 
    54: bool
    55: posix_demuxer::socket_send(int s, sel_param* pb)
    56: {
    57:   // kqueue (and some of the other ones) can let you know know how much
    58:   // to write... imagine that!
    59: 
    60:   // why do I have the zero buffer size?
    61:   assert(pb->buffer_size > pb->bytes_written || 0 == pb->buffer_size);
    62: 
    63:   ssize_t     nbytes;
    64: 
    65:   nbytes = send(s, pb->buffer + pb->bytes_written,
    66:     pb->buffer_size - pb->bytes_written, 0);
    67: 
    68:   // similar story here, with send vs write?
    69: 
    70:   // what's the story with zero? Is that allowed or does it signal
    71:   // that the connection closed?
    72:   if(-1 == nbytes)
    73:   {
    74:     perror("send");
    75:     return true;          // I guess the connection closed
    76:   }
    77:   else
    78:   {
    79:     // sent some data
    80:     pb->bytes_written += nbytes;
    81:   }
    82:   return false;           // connection didn't close
    83: }
    84: 
    85: // doesn't throw
    86: void
    87: posix_demuxer::async_quit()
    88: {
    89:   try {
    90:     // NEW and IMPROVED!!! demux quitter which sets demux quit flag
    91:     // via self pipe trick then waits for self pipe/quitting callback
    92:     // to finish. no fear of quitter being destructed early!
    93: 
    94:     // this will wake the thread (there should only be one atm), however
    95:     // that's just another race between kevent and close. really need to
    96:     // extend self piper to set a quit flag. we have to currently assume
    97:     // that there is only event collecting thread.
    98:     // fprintf(stderr, "taking down kqueue with quitter (QUITTER!)\n");
    99:     demux_quitter quitter;
   100:     quitter.quit(this);
   101:   } catch(...) {
   102:     fprintf(stderr, "error waking demuxer with self pipe quitter\n");
   103:   }
   104: }
   105: 
   106: #if 0
   107:   //nbytes = recv(s, pb->buffer + pb->bytes_written,
   108:   //      pb->buffer_size - pb->bytes_written, 0);
   109: 
   110:   // select and kqueue know when non socket fds have data.
   111:   // recv only works with sockets, but read works with both files
   112:   // and sockets and who knows what else. is there any disadvantage
   113:   // to using read instead? apart from losing flags arg?
   114:   // does read get the same 0 bytes = close behaviour
   115:   nbytes = read(s, pb->buffer + pb->bytes_written,
   116:         pb->buffer_size - pb->bytes_written);
   117: #endif
   118: 
   119: // handy posix control blocks for accept, connect.
   120: 
   121: int
   122: accept_control_block::start(posix_demuxer& demux)
   123: {
   124:   // add listener to demuxer as reading socket - see man 2 accept
   125:   // returns 0 on success, -1 on failure. not sure how to communicate
   126:   // the error.
   127: // could try the accept now, to see if it succeeds instantly...
   128: // observe wakeup rules (formulate them first)
   129:   accepted = -1;
   130:   // socket_err = 0;
   131:   // not quite true, but I want it to be clear if this ever becomes possible
   132:   // to do immediately
   133:   socket_err = EINPROGRESS;
   134:     return demux.add_socket_wakeup(this, PDEMUX_READ);
   135: }
   136: 
   137: // one wakeup socket is in accepted and error in socket_err
   138: void
   139: accept_control_block::wakeup(posix_demuxer& demux)
   140: {
   141:   // fprintf(stderr,"accept_control_block woke up\n");
   142: 
   143:   // we can now accept without blocking
   144:   // s is the listener, ambiguously named in parent socket_wakeup class
   145:   accepted = nice_accept(s, &socket_err);
   146: 
   147:   if(accepted == -1)
   148:   {
   149:     fprintf(stderr, "nice_accept failed, err (%i)\n", socket_err);
   150:   }
   151: }
   152: 
   153: // returns -1 on failure, 0 on success. on success the call is finished
   154: // (and so no wakeup) if socket_err == 0.
   155: int
   156: connect_control_block::start(posix_demuxer& demux)
   157: {
   158:   // fprintf(stderr,"async connect start\n");
   159: 
   160:   int finished;
   161: 
   162:   // returns either finished and err, or not finished
   163:   // and (no err || EINPROGRESS)
   164:   s = async_connect(addy, p, &finished, &socket_err);
   165: 
   166:   // fprintf(stderr,"async_connect returned s: %i, finished: %i, err=%i\n",
   167:   //  s, finished, socket_err);
   168: 
   169:   if(-1 == s)   // failed!
   170:   {
   171:     fprintf(stderr,"async_connect failed (%i)\n", socket_err);
   172:     return -1;  // error in socket_err, no wakeup
   173:   }
   174: 
   175:   if(finished)
   176:   {
   177:     // this actually happens on solaris when connecting to localhost!
   178:     fprintf(stderr,"async_connect finished immediately, waking\n");
   179:     fprintf(stderr, "No wakeup coming...\n");
   180:     // this does not indicate an error, but that there is no wakeup
   181:     // coming. this could be done by a wakeup, all that happens is
   182:     // getsockopt is called to check the socket's error state.
   183:     return -1;
   184:   }
   185: 
   186:   // fprintf(stderr,"connect_request didn't finish immediatly, sleeping\n");
   187: 
   188:   // add to demuxer as writing socket - see man 2 connect
   189:   // how do they get the error?
   190:     return demux.add_socket_wakeup(this, PDEMUX_WRITE);
   191: }
   192: 
   193: void
   194: connect_control_block::wakeup(posix_demuxer& demux)
   195: {
   196:   // fprintf(stderr,"connect woke up\n");
   197:   // this is how we check the success of async connects
   198:   // if get_socket_err fails, we're treating its errno as the socket's...
   199:   if(get_socket_error(s, &socket_err) == -1)
   200:     fprintf(stderr, "eep - get_socket_err failed!\n");
   201: 
   202:   // failed, throw away socket
   203:   if(0 != socket_err)
   204:   {
   205:     fprintf(stderr,"async connect error: %s (%i), closing\n",
   206:       strerror(socket_err), socket_err);
   207:     // we created the connect socket, so we close it too.
   208:     if(close(s) != 0)
   209:       perror("async socket close");
   210: 
   211:     s = -1;   // the result
   212:   }
   213: 
   214:   // resulting connected socket in s
   215: }
   216: }}
   217: 
End cpp section to demux/demux_posix_demuxer.cpp[1]
Start cpp section to demux/demux_select_demuxer.cpp[1 /1 ]
     1: #line 3350 "./lpsrc/flx_demux.pak"
     2: // P.S. for current impl don't need the pthreads. WHOO!!!
     3: 
     4: // A very light wrapper around select, that allows the addition
     5: // of new sockets and returns status in a queue.
     6: // on the powerbook with 10.3, FD_SETSIZE is 1024, that means
     7: // max 1024 sockets. That's kind of lame. See IO completion ports
     8: // on NT for a better solution.
     9: 
    10: // See ACE_Handle_Set_Iterator for an optimised seelect bit field examination
    11: // algorithm (p149 C++ network programming, volume1)
    12: 
    13: // see epoll, kqueue & IOCPs
    14: 
    15: // is select level triggered? I think it is.
    16: // strangely, I'm never seeing anything from the exception set.
    17: 
    18: #include "demux_select_demuxer.hpp"
    19: 
    20: #include <assert.h>
    21: #include <string.h>       // memset
    22: 
    23: #include <stdio.h>        // printf debug
    24: #include <stdlib.h>
    25: #include "demux_sockety.hpp"  // get_socket_error
    26: #include <memory>
    27: 
    28: namespace flx { namespace demux {
    29: 
    30: select_demuxer::select_demuxer()
    31: {
    32:   // clear these guys. after the thread starts, access to them will have
    33:   // to be via the lock
    34:   FD_ZERO(&master_read_set);
    35:   FD_ZERO(&master_write_set);
    36:   FD_ZERO(&master_except_set);
    37:   fdmax = 0;        // corresponds to stdin, which we're not using
    38: 
    39:   // clear this possibly quite large list
    40:   //memset(svs, 0, sizeof(svs));
    41:   //JS: memset must not be used except for raw data or chars
    42:   std::uninitialized_fill_n(svs,FD_SETSIZE,(socket_wakeup*)0);
    43: }
    44: 
    45: // one select, must not block indefinitely, so choose a timeslice
    46: // or find a way to make it wake on command, like a dummy socket
    47: void
    48: select_demuxer::get_evts(bool poll)
    49: {
    50:   // to use select we must copy our arguments, as it changes them!
    51:   // this code has been broken up in to pieces so that I can implement
    52: 
    53:   fd_set  read_set, write_set, except_set;
    54: 
    55:   copy_sets(read_set, write_set, except_set);
    56: 
    57:   if(select(read_set, write_set, except_set, poll))
    58:     process_sets(read_set, write_set, except_set);
    59: }
    60: 
    61: int
    62: select_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    63: {
    64:   int s = sv->s;
    65: 
    66:   // fprintf(stderr, "adding select wakeup for %i, flags=%x\n", s, flags);
    67: 
    68:   if(s < 0 || s >= FD_SETSIZE) return -1; // weakness of select
    69: 
    70:   assert(svs[s] == NULL);         // sanity check: nothing there
    71: 
    72:   if(flags & PDEMUX_READ) FD_SET(s, &master_read_set);
    73: 
    74:   if(flags & PDEMUX_WRITE) FD_SET(s, &master_write_set);
    75: 
    76:   // does this mean we could add a non-reading, non-writing socket
    77:   // and wait for errors on it?
    78:   FD_SET(s, &master_except_set);
    79: 
    80:   svs[s] = sv;              // record wakeup. ours now.
    81: 
    82: 
    83:   if(s > fdmax) fdmax = s;        // update highwater mark
    84: 
    85:   return 0;
    86: }
    87: 
    88: // removes for both reading AND writing.
    89: void
    90: select_demuxer::remove_fd(int s)
    91: {
    92:   // fprintf(stderr, "removing select fd: %i\n", s);
    93: 
    94:   assert(s >= 0 && s < FD_SETSIZE);
    95:   assert(svs[s] != NULL);         // there should be something there
    96: 
    97:   // clear them all regardless.
    98:   FD_CLR(s, &master_read_set);
    99:   FD_CLR(s, &master_write_set);
   100:   FD_CLR(s, &master_except_set);
   101: 
   102:   svs[s] = NULL;
   103: }
   104: 
   105: // virtual functions to be overridden for thread safe descendent
   106: void
   107: select_demuxer::copy_sets(fd_set& rset, fd_set& wset, fd_set& exset)
   108: {
   109:   rset = master_read_set;
   110:   wset = master_write_set;
   111:   exset = master_except_set;
   112: }
   113: 
   114: bool
   115: select_demuxer::select(fd_set& rset, fd_set& wset, fd_set& exset, bool poll)
   116: {
   117:   // this is depending on my fake socket to wakeup. perhaps use the timer
   118:   // for now.
   119:   struct timeval  tv, *tp = NULL;
   120: 
   121:   if(poll)
   122:   {
   123:     tv.tv_sec = 0;
   124:     tv.tv_usec = 0;
   125:     tp = &tv;
   126:   }
   127: 
   128:   // the return value here actually has significance
   129:   // sometimes I have to try again, or weed out bad fds.
   130:   //if(select(fdmax+1, &read_set, &write_set, &except_set, &tv) == -1)
   131: // nah! wait forever. none of these things shutdown properly yet.
   132: // it'll force the async new wakeup responsiveness
   133:   switch(::select(fdmax+1, &rset, &wset, &exset, tp))
   134:   {
   135:     case 0:
   136:       return false;   // timed out, don't process sets
   137:     break;
   138:     case -1:
   139:     // not the ideal reaction. I think this is where I weed out
   140:     // the bad socket(s). would need error set.
   141: 
   142:     // closing a socket without removing can get us here. that's pretty
   143:     // nasty, because our data would be stale. Try not to do that. I
   144:     // wonder if the except set would tell us when the socket was
   145:     // closed on us? Damn, you have to clear it, else you keep getting
   146:     // the same error.
   147:       perror("select");
   148:       // fall through and examine except set
   149:     break;
   150:   }
   151:   return true;    // call process_sets
   152: }
   153: 
   154: void
   155: select_demuxer::process_sets(fd_set& rset, fd_set& wset, fd_set& exset)
   156: {
   157:   // since we're about to traverse the socket sets anyway, we should
   158:   // note the highest fd seen, and make that the highwater mark.
   159:   // that way we wouldn't be guaranteed monotonically degrading performance.
   160: 
   161:   // might be worth keeping a low water mark as well.
   162:   // I guess this is why select sucks. On osx we can only watch
   163:   // about 1024 sockets. That sucks too. could allocate larger sets
   164:   // with malloc... see c++ network programming book.
   165: 
   166:   // like kqueues, this code could theoretically handle separate wakeups
   167:   // for read and write, should I do it? not right now.
   168:   int new_fdmax = 0;
   169: 
   170:   for(int i = 0; i <= fdmax; i++)
   171:   {
   172:     int   flags = 0;
   173: 
   174:     if(FD_ISSET(i, &rset)) flags |= PDEMUX_READ;
   175: 
   176:     if(FD_ISSET(i, &wset)) flags |= PDEMUX_WRITE;
   177: 
   178:     // sorta suggests that I ought to call the wakeup and pass
   179:     // an error flag on to it.
   180:     if(FD_ISSET(i, &exset))
   181:     {
   182:       // don't remove bad sockets - it's an error to close the socket
   183:       // or deallocate the wakeup without telling the source. when
   184:       // we get socket errors, we'd better hope that there's reading
   185:       // or writing to be done.
   186:       // under cygwin, closing down a socket (read, write or both)
   187:       // causes select to wake up with an exception bit. out of cygwin
   188:       // we only wake up. In both cases, the read bit is set so
   189:       // just handling the stuff seems to work. not sure about write.
   190:       // posix_demuxer::socket_recv thinks the connection's closed, but
   191:       // it all seems to work out. Yours, Confused.
   192: 
   193:       fprintf(stderr, "select error on socket %i, flags=%x\n",
   194:         i, flags);
   195: 
   196:       int err;
   197:       // heh heh, this isn't great to call on the pipe that is used
   198:       // in the self pipe trick. I don't know why it's getting an
   199:       // err anyway.
   200:       if(get_socket_error(i, &err) == -1)
   201:         fprintf(stderr, "get_socket_error failed!?!\n");
   202: 
   203:       fprintf(stderr, "socket err = %i, %s\n", err, strerror(err));
   204:       // don't remove! see below
   205:       // remove_fd(i);
   206:     }
   207: 
   208:     //
   209:     if(flags)
   210:     {
   211:       socket_wakeup*  sv = svs[i];
   212:       // remove before wakeup so wakeup can add itself back,
   213:       // if necessary.
   214:       remove_fd(i);
   215: 
   216:       sv->wakeup_flags = flags;
   217:       sv->wakeup(*this);
   218:     }
   219: 
   220:     // to lower high-watermark, keep track of highest seen.
   221:     if(svs[i]) new_fdmax = i;
   222:   }
   223: 
   224:   // fprintf(stderr, "new_fdmax=%i, fdmax=%i\n", new_fdmax, fdmax);
   225: 
   226:   fdmax = new_fdmax;      // copy it back
   227: }
   228: 
   229: }} // flx, demux
   230: 
End cpp section to demux/demux_select_demuxer.cpp[1]
Start cpp section to demux/demux_ts_select_demuxer.cpp[1 /1 ]
     1: #line 3581 "./lpsrc/flx_demux.pak"
     2: #include "demux_ts_select_demuxer.hpp"
     3: 
     4: // #include <stdio.h>
     5: 
     6: namespace flx { namespace demux {
     7: 
     8: ts_select_demuxer::ts_select_demuxer()
     9: {
    10:   // fprintf(stderr, "creating pipe for self-pipe trick\n");
    11:   // install self pipe trick
    12:   sp.install(&demux);
    13: }
    14: 
    15: ts_select_demuxer::~ts_select_demuxer()
    16: {
    17:   async_quit(); // wake thread, ask to leave.
    18: }
    19: 
    20: void
    21: ts_select_demuxer::get_evts(bool poll)
    22: {
    23:   fd_set  rset, wset, exset;
    24: 
    25:   // copy args under lock
    26:   {
    27:     flx::pthread::flx_mutex_locker_t locker(ham_fist);
    28:     demux.copy_sets(rset, wset, exset);
    29:   }
    30: 
    31:   // process arg set under lock. note that the select demuxer is passed
    32:   // to wakeups, so no recursive lock is needed. also, because any
    33:   // readditions caused by the callback are done to the naive demuxer,
    34:   // no selfpipe writes are required either. the only thing to remember
    35:   // is that the wakeup recipient should not be surprised to see a demuxer
    36:   // in its callback different to the one it originally added to.
    37:   if(demux.select(rset, wset, exset, poll))
    38:   {
    39:     flx::pthread::flx_mutex_locker_t locker(ham_fist);
    40:     demux.process_sets(rset, wset, exset);
    41:   }
    42: }
    43: 
    44: // thread safe overloaded functions follow. all acquire the lock
    45: // then call the unthreadsafe version of the function. nice!
    46: int
    47: ts_select_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    48: {
    49:   // fprintf(stderr, "ts_select::add: %p->%i (%s), %x\n",
    50:   //  sv, s, (s == self_pipe_fds[0]) ? "self pipe" : "socket", flags);
    51:   flx::pthread::flx_mutex_locker_t locker(ham_fist);
    52: 
    53:   int res = demux.add_socket_wakeup(sv, flags);
    54:   // I wouldn't touch the sv after this.
    55: 
    56:   // we have a new socket, so wake the event waiting thread
    57:   // for great responsiveness.
    58:   if(-1 != res) sp.wake();
    59: 
    60:   return res;
    61: 
    62: // we need to wake a blocking get_evts call here else we'll have bad
    63: // performance or even a lockup. the question is do we need to do it under
    64: // the tutelage of the lock?
    65: }
    66: 
    67: }}
    68: 
End cpp section to demux/demux_ts_select_demuxer.cpp[1]
Start cpp section to demux/demux_sockety.hpp[1 /1 ]
     1: #line 3650 "./lpsrc/flx_demux.pak"
     2: #ifndef __DEMUX_SOCKETY__
     3: #define __DEMUX_SOCKETY__
     4: #include <flx_demux_config.hpp>
     5: namespace flx { namespace demux {
     6: 
     7: // Shouldn't this all be DEMUX_EXTERN? eh, doesn't happen on win32
     8: // we'll probably live.
     9: int create_listener_socket(int* io_port, int q_len);
    10: int create_async_listener(int* io_port, int q_len);
    11: int nice_accept(int listener, int* err);
    12: int nice_connect(const char* addr, int port);
    13: int async_connect(const char* addr, int port, int* finished, int* err);
    14: 
    15: /* handy socket building blocks */
    16: 
    17: int connect_sock(int s, const char* addr, int port);
    18: 
    19: /* this could possibly do with NIC addr as well as port */
    20: int bind_sock(int s, int* io_port);
    21: 
    22: int make_nonblock(int s);
    23: int set_tcp_nodelay(int s, int disable_nagle);
    24: int get_socket_error(int s, int* socket_err);
    25: 
    26: }} // namespace demux, flx
    27: #endif
    28: 
End cpp section to demux/demux_sockety.hpp[1]
Start cpp section to demux/demux_quitter.hpp[1 /1 ]
     1: #line 3679 "./lpsrc/flx_demux.pak"
     2: 
     3: #ifndef __DEMUX_QUITTER__
     4: #define __DEMUX_QUITTER__
     5: 
     6: #include <flx_demux_config.hpp>
     7: #include "demux_demuxer.hpp"  // demuxers
     8: #include "pthread_mutex.hpp"  // mutexes
     9: #include "pthread_condv.hpp"  // condition var for same
    10: 
    11: #ifdef _WIN32
    12: #include "demux_wself_piper.hpp" // win32 self piper
    13: #else
    14: #include "demux_self_piper.hpp" // posix self piper
    15: #endif
    16: 
    17: namespace flx { namespace demux {
    18: 
    19: // a waitable boolean. gratuitously tied to demuxer by demux_quit_flag
    20: // todo: unhook.
    21: class DEMUX_EXTERN async_bool : public demux_quit_flag {
    22:   flx::pthread::flx_mutex_t cv_lock;       // to work with the condition var
    23:   flx::pthread::flx_condv_t finished_cond;
    24:   bool finished;   // might seem redundant, but that's how CVs work.
    25: public:
    26:   async_bool();
    27: 
    28:   void wait_until_true();
    29:   virtual void signal_true();
    30: };
    31: 
    32: // quits a demuxer
    33: class DEMUX_EXTERN demux_quitter : public demux_callback {
    34:   // self pipes for getting demuxer attention
    35: #ifdef _WIN32
    36:   wself_piper sp;
    37: #else
    38:   self_piper sp;
    39: #endif
    40:   async_bool finished;  // initially false
    41:   void callback(demuxer* demux); // called back by demuxer in event thread.
    42: public:
    43:   void quit(demuxer* demux); // blocks until event thread exits
    44: };
    45: 
    46: /*
    47: class DEMUX_EXTERN wdemux_quitter : demux_callback {
    48:   wself_piper sp;
    49:   async_bool finished; // initially false
    50: public:
    51:   void quit(iocp_demuxer* demux); // blocks until demuxer/event thread exits
    52: 
    53:   void callback(demuxer* d); // called back by demuxer
    54: };
    55: */
    56: 
    57: } }
    58: 
    59: #endif
    60: 
End cpp section to demux/demux_quitter.hpp[1]
Start cpp section to demux/demux_quitter.cpp[1 /1 ]
     1: #line 3740 "./lpsrc/flx_demux.pak"
     2: #include "demux_quitter.hpp"
     3: #include <stdio.h>
     4: 
     5: namespace flx { namespace demux {
     6: 
     7: async_bool::async_bool()
     8:   : finished(false)
     9: {
    10:   // nothing
    11: }
    12: 
    13: // called from the non-event-waiting thread.
    14: void
    15: async_bool::wait_until_true()
    16: {
    17:   flx::pthread::flx_mutex_locker_t    locker(cv_lock);
    18: 
    19:   // wait for the wakeup to say it's finished
    20:   while(!finished)
    21:   {
    22:     finished_cond.wait(&cv_lock);
    23:   }
    24: }
    25: 
    26: // call this last thing before event thread exit
    27: void
    28: async_bool::signal_true()
    29: {
    30:   finished = true;
    31:   finished_cond.signal();
    32:   // do absolutely NOTHING here as we may have already been destructed. boom.
    33: }
    34: 
    35: void
    36: demux_quitter::callback(demuxer* demux)
    37: {
    38:   //fprintf(stderr, "quitter callback\n");
    39:   demux->set_quit_flag(&finished);
    40: }
    41: 
    42: void
    43: demux_quitter::quit(demuxer* demux)
    44: {
    45:    // fprintf(stderr, "trying to quit demuxer...\n");
    46:    // install self piper, with our callback
    47:    sp.install(demux, this);
    48:    // wake demuxer, getting our callback called, which sets quit flag
    49:    sp.wake();
    50:    // wait for quit flag to be signalled by exiting event thread
    51:    finished.wait_until_true();
    52:    // event thread exited
    53: }
    54: 
    55: /*
    56: // blocks until demuxer/event thread exits
    57: void
    58: wdemux_quitter::quit(iocp_demuxer* demux)
    59: {
    60:   fprintf(stderr, "wdemux_quitter::quit\n");
    61: 
    62:   // install self piper in iocp demuxer, along withour callback.
    63:   sp.install(demux, this);
    64:   sp.wake(); // wake the demuxer
    65:   fprintf(stderr, "wdemux_quitter::quit waiting on finished flag\n");
    66:   finished.wait_until_true();
    67:   fprintf(stderr, "wdemux_quitter::quit exiting\n");
    68: }
    69: 
    70: void
    71: wdemux_quitter::callback(demuxer* d)
    72: {
    73:   fprintf(stderr, "wdemux_quitter got called back by demuxer (%p)!\n", d);
    74:   // install quit flag in demuxer
    75:   d->set_quit_flag(&finished);
    76: }
    77: */
    78: 
    79: } }
    80: 
End cpp section to demux/demux_quitter.cpp[1]
Start cpp section to demux/demux_self_piper.hpp[1 /1 ]
     1: #line 3821 "./lpsrc/flx_demux.pak"
     2: 
     3: #ifndef __DEMUX_SELF_PIPER__
     4: #define __DEMUX_SELF_PIPER__
     5: 
     6: #include <flx_demux_config.hpp>
     7: #include "demux_posix_demuxer.hpp"
     8: 
     9: namespace flx { namespace demux {
    10: 
    11: // there's no standard posix_socketio_wakeup, could be handy. could also
    12: // perhaps use it here? this is a pipe, not a socket. not sure if recv nor
    13: // send work on it, besides want to read an unlimited amount of redundant data.
    14: class DEMUX_EXTERN selfpipe_wakeup : public socket_wakeup {
    15: public:
    16:   demux_callback* cb; // optional callback
    17: 
    18:   virtual void wakeup(posix_demuxer& demux);
    19: };
    20: 
    21: class DEMUX_EXTERN auto_fd {
    22: public:
    23:     int fd;
    24: 
    25:     auto_fd();
    26:     ~auto_fd();
    27: };
    28: 
    29: // make portable here? make part of the wakeup obj?
    30: class DEMUX_EXTERN pipe_pair {
    31:   // self pipe trick!!! fd[0] = read end, fd[1] = write end.
    32:   auto_fd         fds[2];
    33: public:
    34:   pipe_pair();
    35:   // void read_byte(); // done for us by wakeup obj.
    36:   void write_byte();
    37:   int get_read_end();
    38: };
    39: 
    40: // wakes a POSIX demuxer, for when you want some kind of attention
    41: // todo: make portable
    42: class DEMUX_EXTERN self_piper {
    43:     pipe_pair       pp;
    44:     selfpipe_wakeup spw;
    45: public:
    46:     void install(demuxer* demux, demux_callback* cb = 0);
    47:     void wake();
    48: };
    49: 
    50: }} // namespace demux, flx
    51: 
    52: #endif
End cpp section to demux/demux_self_piper.hpp[1]
Start cpp section to demux/demux_self_piper.cpp[1 /1 ]
     1: #line 3874 "./lpsrc/flx_demux.pak"
     2: 
     3: #include "demux_self_piper.hpp"
     4: #include <stdio.h>              // printf, perror
     5: #include <unistd.h>             // pipe for self-pipe trick.
     6: #include <assert.h>
     7: 
     8: namespace flx { namespace demux {
     9: 
    10: auto_fd::auto_fd()
    11: {
    12:     fd = -1;        // invalid
    13: }
    14: 
    15: auto_fd::~auto_fd()
    16: {
    17:     if(-1 == fd) return;
    18: 
    19:     if(close(fd) == -1)
    20:         perror("auto fd close");
    21: }
    22: 
    23: void
    24: self_piper::install(demuxer* d, demux_callback* cb)
    25: {
    26:     //fprintf(stderr, "installing self piper in %p with cb=%p\n", d, cb);
    27:     posix_demuxer* demux = static_cast<posix_demuxer*>(d);
    28:     spw.s = pp.get_read_end();
    29:     spw.cb = cb;
    30: 
    31:     int res = demux->add_socket_wakeup(&spw, PDEMUX_READ);
    32:     assert(-1 != res);
    33: }
    34: 
    35: // wake the demuxer referenced in install
    36: void
    37: self_piper::wake()
    38: {
    39:     // fprintf(stderr, "self_piper::wake\n");
    40:     pp.write_byte();
    41: }
    42: 
    43: // currently setup to not see the self piper
    44: /*
    45: void
    46: self_piper::inspect(int s)
    47: {
    48:     // lets wake the demuxer by writing to our end of the pipe
    49:     // lets also not add redundant select wakes for our own selfpipe, as
    50:     // we know it's adding whilst not in select. Is that clear?
    51:     if(s != fds[0].fd) wake();
    52: }
    53: */
    54: 
    55: void
    56: selfpipe_wakeup::wakeup(posix_demuxer& demux)
    57: {
    58:     // fprintf(stderr, "selfpipe wakeup: read the pending byte and re-arm\n");
    59:     // not using the pipe pair because it doesn't know that it's part of
    60:     // one. not to worry.
    61:     ssize_t         nbytes;
    62:     char            b;
    63: 
    64:     // if this were read then this fn would work with non-sockets
    65:     // EH? It IS read.
    66:     nbytes = read(s, &b, 1);
    67: 
    68:     if(nbytes == -1) perror("read");
    69: 
    70:     // fprintf(stderr, "GOT: %li, %x\n", nbytes, b);
    71:     assert(nbytes == 1 && b == 1);
    72: 
    73:     // callback!
    74:     if(cb) cb->callback(&demux);
    75: 
    76:     // add self back! this happens even when we're quitting, but that
    77:     // doesn't seem to matter.
    78:     // fprintf(stderr, "selfpiper rearming\n");
    79:     int res = demux.add_socket_wakeup(this, PDEMUX_READ);
    80:     assert(-1 != res);
    81: }
    82: 
    83: pipe_pair::pipe_pair()
    84: {
    85:   // fprintf(stderr, "creating pipe for self-pipe trick\n");
    86: 
    87:   int         self_pipe_fds[2];
    88:   if(pipe(self_pipe_fds) == -1)
    89:   {
    90:       perror("ts_select_demuxer::self_pipe");
    91:       throw -1;
    92:   }
    93: 
    94:   // fprintf(stderr, "self pipe fds: read: %i, write: %i\n",
    95:   //  self_pipe_fds[0], self_pipe_fds[1]);
    96: 
    97:   fds[0].fd = self_pipe_fds[0];
    98:   fds[1].fd = self_pipe_fds[1];
    99: }
   100: 
   101: // never gets called.
   102: /*
   103: void
   104: pipe_pair::read_byte()
   105: {
   106:   ssize_t         nbytes;
   107:   char            b;
   108: 
   109:   // if this were read then this fn would work with non-sockets
   110:   // EH? It IS read.
   111:   nbytes = read(fds[0].fd, &b, 1);  // read end of the pipe
   112: 
   113:   if(nbytes == -1) perror("read");
   114: 
   115:   // fprintf(stderr, "GOT: %li, %x\n", nbytes, b);
   116:   assert(nbytes == 1 && b == 1);
   117: }
   118: */
   119: 
   120: void
   121: pipe_pair::write_byte()
   122: {
   123:     char    b = 1;
   124:     ssize_t nbytes;
   125:     // is this blocking? I guess it has to be...
   126:     nbytes = write(fds[1].fd, &b, 1);       // wake up, jeff!
   127: 
   128:     // fprintf(stderr, "self_piper::wake write returned: %i\n", nbytes);
   129: 
   130:     if(-1 == nbytes) perror("pipe_pair::write_byte");
   131:     assert(1 == nbytes);
   132: }
   133: 
   134: int
   135: pipe_pair::get_read_end()
   136: {
   137:   return fds[0].fd;
   138: }
   139: 
   140: } }
   141: 
End cpp section to demux/demux_self_piper.cpp[1]
Start cpp section to demux/demux_wself_piper.hpp[1 /1 ]
     1: #line 4016 "./lpsrc/flx_demux.pak"
     2: #ifndef __DEMUX_WSELF_PIPER__
     3: #define __DEMUX_WSELF_PIPER__
     4: 
     5: #include <flx_demux_config.hpp>
     6: #include "demux_overlapped.hpp"  // I use readfile control block
     7: #include "demux_wself_piper.hpp"
     8: 
     9: namespace flx { namespace demux {
    10: 
    11: class DEMUX_EXTERN auto_handle {
    12: public:
    13:   HANDLE h;
    14: 
    15:   auto_handle();
    16:   ~auto_handle();
    17: };
    18: 
    19: // win32 self pipe trick (what a pain!)
    20: class DEMUX_EXTERN wpipe_pair {
    21:   enum { READ_END, WRITE_END };
    22:   // HANDLE pipe[2];  // 0 = read end, 1 = write end
    23:   auto_handle pipe[2];
    24: public:
    25:   wpipe_pair();
    26: 
    27:   void write_byte();
    28:   HANDLE get_read_end() { return pipe[READ_END].h; }
    29: };
    30: 
    31: // use a winfileio_control_block to install a nonblocking ReadFile on the
    32: // read end of the pipe pair. when we get a byte we can execute whatever
    33: // the user wanted which for win32 which seems to be naturally responsive
    34: // to new sockets/handles. demux quit will probably be the only operation
    35: // needed.
    36: class DEMUX_EXTERN wself_piper_wakeup : public winfileio_control_block
    37: {
    38:   char the_byte;
    39: 
    40: public:
    41:   // possibly null, if not, called on iocp_op_finished
    42:   demux_callback* cb;
    43: 
    44:   // the demuxer. doesn't actually get passed by iocp_op_finished
    45:   iocp_demuxer* d;
    46: 
    47: 
    48:   wself_piper_wakeup();
    49: 
    50:   // detect when single byte read has finished and exec callback,
    51:   // re-arming.
    52:   virtual void iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
    53:     LPOVERLAPPED olp, int err);
    54: 
    55:   void arm();
    56: };
    57: 
    58: // at the very least, the read end must be nonblocking and associated
    59: // with the iocp.
    60: class DEMUX_EXTERN wself_piper {
    61:   wpipe_pair pp;
    62:   wself_piper_wakeup spw;
    63: public:
    64:   void install(demuxer* demux, demux_callback* cb = 0);
    65:   void wake(); // wakes demuxer which calls callback
    66: };
    67: 
    68: } } // demux, flx
    69: 
    70: #endif
    71: 
End cpp section to demux/demux_wself_piper.hpp[1]
Start cpp section to demux/demux_wself_piper.cpp[1 /1 ]
     1: #line 4088 "./lpsrc/flx_demux.pak"
     2: 
     3: #include "demux_wself_piper.hpp"
     4: #include <stdio.h>
     5: 
     6: namespace flx { namespace demux {
     7: 
     8: auto_handle::auto_handle()
     9: {
    10:   h = INVALID_HANDLE_VALUE;
    11: }
    12: 
    13: auto_handle::~auto_handle()
    14: {
    15:   if(INVALID_HANDLE_VALUE == h) return; // done
    16: 
    17:   if(!CloseHandle(h))
    18:     fprintf(stderr, "auto CloseHandle failed: %i\n", GetLastError());
    19: }
    20: 
    21: wpipe_pair::wpipe_pair()
    22: {
    23:   // fprintf(stderr, "wpipe CTOR\n");
    24:   // made bufsize 1 as we only ever read and write 1 byte at a time
    25: 
    26:   // looks like I can't use anonymous pipes with iocp. will have to
    27:   // use a specially setup named pipe - one that allows repetitions...
    28:   // this doesn't sound good.
    29:   const char* pname = "\\\\.\\pipe\\flx_iocp_quitter";
    30: 
    31:   // don't actually need duplex, nor those buffers. 1 byte at a time suffices
    32:   // I probably don't need both ends to be marked as nonblocking either, should
    33:   // only need the read end nonblocking.
    34: 
    35:   // create pipe
    36:   pipe[READ_END].h = CreateNamedPipe(pname,
    37:    PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
    38:    PIPE_TYPE_BYTE, 1, 256, 256, 0, NULL);
    39: 
    40:   if(INVALID_HANDLE_VALUE == pipe[READ_END].h)
    41:   {
    42:     fprintf(stderr, "couldn't create named pipe: %i\n", GetLastError());
    43:     throw -1;
    44:   }
    45: 
    46:   // this is the part that I don't like - this pipe's name isn't unique
    47:   // and so theoretically another iocp quitter could join here. a race!
    48: 
    49:   // connect to it. note that overlapped isn't needed for write end as
    50:   // we want to block.
    51:   pipe[WRITE_END].h = CreateFile(pname, FILE_READ_DATA | FILE_WRITE_DATA,
    52:     FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING,
    53:     FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
    54: 
    55:   if(INVALID_HANDLE_VALUE == pipe[WRITE_END].h)
    56:   {
    57:     fprintf(stderr, "failed to open named pipe: %i\n", GetLastError());
    58:     throw -1;
    59:   }
    60: 
    61:   // anonymous pipes can't be made nonblocking/iocpable on windows!
    62:   // What a shame!
    63: /*
    64:   if(!CreatePipe(&pipe[READ_END], &pipe[WRITE_END], NULL, 1))
    65:   {
    66:     fprintf(stderr, "wpipe_pair CreatePipe failed: %i\n", GetLastError());
    67:     throw -1;
    68:   }
    69: */
    70: }
    71: 
    72: void
    73: wpipe_pair::write_byte()
    74: {
    75:   // I think I want a blocking write here.
    76:   char  b = 1;
    77:   DWORD bytes_written;
    78:   // last arg is overlapped pointer, unused, we want to block.
    79:   if(!WriteFile(pipe[WRITE_END].h, &b, 1, &bytes_written, NULL))
    80:     fprintf(stderr, "wpipe_pair failed to write byte: %i\n",
    81:       GetLastError());
    82: }
    83: 
    84: void
    85: wself_piper::install(demuxer* d, demux_callback* cb)
    86: {
    87:   fprintf(stderr, "wself_piper::install(%p, %p)\n", d, cb);
    88:   iocp_demuxer* demux = static_cast<iocp_demuxer*>(d);
    89: 
    90:   // make read end non blocking and associate with iocp
    91:   HANDLE read_end = pp.get_read_end();
    92: 
    93: #if 0
    94:   // make the anonymous pipe non blocking. this function is for named pipes,
    95:   // but I've heard talk that it works for anon pipes too. Nope, doesn't work.
    96:   DWORD pipe_mode = PIPE_NOWAIT;
    97:   if(!SetNamedPipeHandleState(read_end, &pipe_mode, NULL, NULL))
    98:   {
    99:     fprintf(stderr, "SetNamedPipeHandleState failed: %i\n", GetLastError());
   100:     return; // not much to be done here.
   101:   }
   102: #endif
   103: 
   104:   if(0 != demux->associate_with_iocp(read_end, NULL))
   105:   {
   106:     fprintf(stderr, "failed to install self pipe in IOCP!!!\n");
   107:     return; // error code?
   108:   }
   109: 
   110:   // copy into the self pipe wakeup, for its later use.
   111:   spw.d = demux;
   112:   spw.cb = cb;
   113:   spw.file = read_end;
   114: 
   115:   fprintf(stderr, "initial self pipe arm\n");
   116:   spw.arm();
   117: }
   118: 
   119: // wakes demuxer
   120: void
   121: wself_piper::wake()
   122: {
   123:   fprintf(stderr, "wself_piper::wake - write a byte\n");
   124:   pp.write_byte();
   125: }
   126: 
   127: wself_piper_wakeup::wself_piper_wakeup()
   128:   // configure the control block for ReadFile on the read end pipe
   129:   // will set pipe handle later, read = true
   130:   : winfileio_control_block(INVALID_HANDLE_VALUE, NULL, 0, true),
   131:     cb(0), d(0)
   132: {
   133:   // I'll probably need to reset the byte address
   134:   fprintf(stderr, "SET UP THE PIPE HANDLE!\n");
   135: }
   136: 
   137: // at this point the byte has already been read. we want to re-arm for future
   138: // wakeups.
   139: void
   140: wself_piper_wakeup::iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
   141:     LPOVERLAPPED olp, int err)
   142: {
   143:   fprintf(stderr, "wself_piper_wakeup::iocp_op_finished\n");
   144:   fprintf(stderr, "nbytes=%i, err=%i\n", nbytes, err);
   145:   fprintf(stderr, "about to callback %p(%p)\n", cb, d);
   146: 
   147:   if(cb) cb->callback(d);
   148: 
   149:   arm();  // re-arm
   150: }
   151: 
   152: void
   153: wself_piper_wakeup::arm()
   154: {
   155:   // exec another nonblocking ReadFile on read end of pipe
   156:   fprintf(stderr, "wself_piper_wakeup::arm\n");
   157:   pb.buffer = &the_byte;
   158:   pb.buffer_size = 1;
   159:   pb.bytes_written = 0;
   160:   if(start_overlapped())
   161:     fprintf(stderr, "WARNING: wslef_pipe install completed immediately\n");
   162: }
   163: 
   164: } }
   165: 
End cpp section to demux/demux_wself_piper.cpp[1]
Start cpp section to demux/demux_poll_demuxer.hpp[1 /1 ]
     1: #line 4254 "./lpsrc/flx_demux.pak"
     2: #ifndef __POLL_DEMUXER__
     3: #define __POLL_DEMUXER__
     4: 
     5: #include <flx_demux_config.hpp>
     6: #include "demux_posix_demuxer.hpp"
     7: 
     8: // not re-entrant
     9: 
    10: namespace flx { namespace demux {
    11: 
    12: class DEMUX_EXTERN poll_demuxer : public posix_demuxer {
    13:   void*  fd_array;    // make him stop!
    14:   void*  sv_array;
    15: 
    16:   virtual void get_evts(bool poll);
    17: public:
    18:   poll_demuxer();
    19:   virtual ~poll_demuxer();
    20: 
    21:   virtual int add_socket_wakeup(socket_wakeup* sv, int flags);
    22: 
    23:   void get_arrays(void** fds, void** svs);
    24:   int dopoll(void* infds, bool poll_flag);    // returns nevents
    25:   void process_evts(void* infds, void* svs, int nevts);
    26: };
    27: 
    28: } }
    29: #endif
    30: 
End cpp section to demux/demux_poll_demuxer.hpp[1]
Start cpp section to demux/demux_poll_demuxer.cpp[1 /1 ]
     1: #line 4285 "./lpsrc/flx_demux.pak"
     2: #include <stdio.h>      // my friend printf
     3: #include <poll.h>
     4: #include <assert.h>
     5: #include "demux_poll_demuxer.hpp"
     6: 
     7: #include <vector>
     8: 
     9: namespace flx { namespace demux {
    10: 
    11: using namespace std;
    12: 
    13: typedef vector<socket_wakeup*> sockvec;
    14: typedef vector<struct pollfd> fdvec;
    15: 
    16: #define FDS ((fdvec*)fd_array)
    17: #define SOCS ((sockvec*)sv_array)
    18: 
    19: // be aware that under os x 10.3 (and other systems?), poll is a thin
    20: // user level layer on top of select and so of no real advantage.
    21: // the select emulation doesn't seem to give POLLHUP errs, either.
    22: // under 10.4 poll doesn't appear to be calling select, so I guess it's
    23: // all good.
    24: 
    25: poll_demuxer::poll_demuxer()
    26:   : fd_array(0), sv_array(0)
    27: {
    28:   // fprintf(stderr, "poll_demuxer ctor\n");
    29: }
    30: 
    31: poll_demuxer::~poll_demuxer()
    32: {
    33:   // fprintf(stderr, "poll_demuxer dtor\n");
    34:   if(SOCS) delete SOCS;
    35:   if(FDS) delete FDS;
    36: }
    37: 
    38: // breaking up for a thread safe impl
    39: void
    40: poll_demuxer::get_arrays(void** fds, void** svs)
    41: {
    42:   *fds = fd_array;
    43:   *svs = sv_array;
    44: 
    45:   fd_array = 0;
    46:   sv_array = 0;
    47: }
    48: 
    49: // returns nevents
    50: int
    51: poll_demuxer::dopoll(void* infds, bool poll_flag)
    52: {
    53:   fdvec*    fds_copy = (fdvec*)infds;
    54: 
    55:   if(!fds_copy)
    56:   {
    57:     if(!poll_flag) fprintf(stderr, "Warning ::poll(\\inf) on zero fds!\n");
    58:     return 0;
    59:   }
    60: 
    61:   struct pollfd*  fds = &(*fds_copy)[0];
    62:   // I sometimes end up with nothing to watch in poll mode
    63:   // this type doesn't seem to exist for the 10.3 emulated poll.
    64:   unsigned long nfds = (*fds_copy).size();
    65:   // nfds_t  nfds = (*fds_copy).size();
    66:   int      nevts;
    67: 
    68:   // fprintf(stderr, "calling ::poll with %p*%i fds\n", fds, nfds);
    69:   // -1 for timeout means wait indefinitely
    70:   nevts = ::poll(fds, nfds, (poll_flag) ? 0 : -1);
    71: 
    72:   if(-1 == nevts)
    73:   {
    74:     perror("poll_demuxer::get_evts");
    75:     return 0;
    76:   }
    77: 
    78:   return nevts;    // zero => timeout
    79: }
    80: 
    81: // processes events, calls callbacks, deletes fds & svs upon completion.
    82: // Seems a bit busy, no?
    83: void
    84: poll_demuxer::process_evts(void* infds, void* svs, int nevts)
    85: {
    86:   // Optimisation: when no events (due to timeout) and our fds
    87:   // are null (nothing changed in the meantime), just set the
    88:   // fd and svs members to the incoming ones and return
    89: 
    90:   // fprintf(stderr, "poll::process_evts: %i, %p\n", nevts, fd_array);
    91: 
    92:   if(0 == nevts && !fd_array)
    93:   {
    94:     // fprintf(stderr, "Optimising by resetting arrays!\n");
    95:     assert( !sv_array );  // keep in sync
    96:     fd_array = infds;
    97:     sv_array = svs;
    98:     return;
    99:   }
   100: 
   101:   fdvec*    fds_copy = (fdvec*)infds;
   102:   sockvec*  socs_copy = (sockvec*)svs;
   103: 
   104:   struct pollfd*  fds = &(*fds_copy)[0];
   105:   unsigned long nfds = (*fds_copy).size();
   106:   // nfds_t  nfds = (*fds_copy).size();
   107: 
   108:   // sanity check. looks like read and write count as 1 each
   109:   int    evts_encountered = 0;
   110: 
   111:   // examine all fds for signs of life. try early out with nevts?
   112:   // for(nfds_t i = 0; i < nfds; i++, fds++)
   113:   for(unsigned long i = 0; i < nfds; i++, fds++)
   114:   {
   115:     // fprintf(stderr, "fds[%i]->revents=%x\n", i, fds->revents);
   116: 
   117:     socket_wakeup* sv = (*socs_copy)[i];
   118: 
   119:     // accumulate bit field of what we got
   120:     // don't touch original bits, we might have to restore them
   121:     int  wakeup_flags = 0;
   122: 
   123:     bool    wake = false;
   124: 
   125:     // it might be possible to get both a read & write event...
   126:     // in which case I should take out the else below
   127:     if(fds->revents & POLLIN)                // I think this is how you do it
   128:     {
   129:           // fprintf(stderr,"POLLIN for %p->%i\n", sv, sv->s);
   130:       wakeup_flags |= PDEMUX_READ;
   131:       wake = true;
   132:       evts_encountered++;
   133:     }
   134: 
   135:     if(fds->revents & POLLOUT)
   136:     {
   137:       // fprintf(stderr,"POLLOUT for %p->%i\n", sv, sv->s);
   138:       wakeup_flags |= PDEMUX_WRITE;
   139:       wake = true;
   140:       evts_encountered++;
   141:     }
   142: 
   143:     // check here for the unsolicited POLLERR, POLLHUP and POLLNVALs
   144:     if(fds->revents & POLLERR)
   145:     {
   146:       fprintf(stderr, "POLLERR for %p->%i\n", sv, sv->s);
   147:       wake = true;    // good to do?
   148:     }
   149: 
   150:     // device has been disconnected. this and POLLOUT are mutually exclusive.
   151:     // a stream can never be writeable again if a hangup has occured.
   152:     // I've seen POLLHUPs come in for shutdown(s, 1). In this case you want
   153:     // the wakeup, at least if you were waiting to write. POLLHUPs also seem
   154:     // to be the message/wake up when reading from a connection that has
   155:     // closed: you get the remaining bytes, but via POLLHUP rathern POLLOUT.
   156:     // perhaps not worth printing, seeing as this usage is quite common
   157:     if(fds->revents & POLLHUP)
   158:     {
   159:       fprintf(stderr, "POLLHUP for %p->%i\n", sv, sv->s);
   160:       assert((fds->revents & POLLOUT) == 0);
   161:       wake = true;    // good to do? probably.
   162:     }
   163: 
   164:     // Invalid fd. We shouldn't ever get that.
   165:     if(fds->revents & POLLNVAL)
   166:     {
   167:       fprintf(stderr, "POLLNVAL for %p->%i\n", sv, sv->s);
   168:       wake = true;    // good to do?
   169:     }
   170: 
   171:     if(wake)
   172:     {
   173:       // 1-1 wakeups with add_sockets
   174:       // be aware that callback may add back...
   175:       sv->wakeup_flags = wakeup_flags;
   176:       sv->wakeup(*this);
   177:     }
   178:     else
   179:     {
   180:       // reinstall for the next iteration. note that we keep a copy
   181:       // of the flags in sv->wakeup_flags, set on adding. that belongs
   182:       // to us so there should be no problem there.
   183:       //fprintf(stderr, "poll::readding: %i, %x\n",
   184:       //  sv->s, sv->wakeup_flags);
   185:       if(add_socket_wakeup(sv, sv->wakeup_flags) == -1)
   186:         fprintf(stderr, "poll re-add finished immediately!?!\n");
   187:     }
   188:   }
   189: 
   190:   // keep the bastards honest
   191:   if(evts_encountered != nevts)
   192:   {
   193:     fprintf(stderr, "poll seen/nevts mismatch: %i/%i\n",
   194:       evts_encountered, nevts);
   195:   }
   196: 
   197:   // delete all here.
   198:   delete fds_copy;
   199:   delete socs_copy;
   200: }
   201: 
   202: 
   203: // poll is the call, call the bool poll_flag
   204: void
   205: poll_demuxer::get_evts(bool poll_flag)
   206: {
   207:   // fprintf(stderr, "poll_demuxer::get_evts\n");
   208:   void    *fds, *svs;
   209:   int      nevts;
   210: 
   211:   get_arrays(&fds, &svs);    // we now own them. must call process_evts
   212:                 // to give them back.
   213: 
   214:   nevts = this->dopoll(fds, poll_flag);
   215: 
   216:   // don't shortcut based on nevts being zero - pass it on to process_evts
   217:   // it recongnises the optimisation opportunity uation and handles it,
   218:   // which has the advantage of benefiting the threadsafe version too.
   219:   process_evts(fds, svs, nevts);
   220: }
   221: 
   222: // precondition: not currently in get_evts (not reentrant)
   223: int
   224: poll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
   225: {
   226:   // fprintf(stderr, "poll::add_socket_wakeup: %p->%i, %x\n",
   227:   //  sv, sv->s, flags);
   228: 
   229:   if(!FDS)
   230:   {
   231:     // fprintf(stderr, "creating fds and svns\n");
   232:     assert(SOCS == NULL);    // both should be null or non null, no mix
   233: 
   234:     fd_array = new fdvec;    // FDS
   235:     sv_array = new sockvec;    // SOCS
   236:   }
   237: 
   238:   // add to array
   239:   struct pollfd  fd;
   240: 
   241:   // note that we keep a copy of the flags, because we often have to
   242:   // copy the wakeups back that haven't had activity
   243:   sv->wakeup_flags = flags;
   244: 
   245:   fd.fd = sv->s;
   246:   fd.events = 0;
   247:   // set on output, but in the ambiguous case of a poll that returns due
   248:   // to timeout what would it be? If I can guarantee 0, then I can use the
   249:   // same piece of code to re-add the fds in the thread safe version.
   250:   fd.revents = 0;
   251: 
   252:   if(flags & PDEMUX_READ) fd.events |= POLLIN;
   253:   if(flags & PDEMUX_WRITE) fd.events |= POLLOUT;
   254: 
   255:   // don't bother setting  POLLERR or POLLHUP. They're output (revents) only.
   256:   // is the same true for epoll_demuxer? I'd say so...
   257: 
   258:   // fd.revents is set on output by ::poll
   259:   // fprintf(stderr, "turning all revents bits on to test 0 output\n");
   260:   // fd.revents = -1;
   261: 
   262:   assert(0 != fd.events);
   263: 
   264:   // add to array along with sv pointer
   265:   FDS->push_back(fd);
   266:   SOCS->push_back(sv);
   267: 
   268:   return 0;      // there'll be a wakeup
   269: }
   270: 
   271: } }
   272: 
End cpp section to demux/demux_poll_demuxer.cpp[1]
Start cpp section to demux/demux_ts_poll_demuxer.hpp[1 /1 ]
     1: #line 4558 "./lpsrc/flx_demux.pak"
     2: #ifndef __TS_POLL_DEMUXER__
     3: #define __TS_POLL_DEMUXER__
     4: 
     5: // thread safe version of poll_demuxer
     6: 
     7: #include "demux_poll_demuxer.hpp"
     8: #include "demux_self_piper.hpp"     // self pipe trick
     9: #include "pthread_mutex.hpp"
    10: 
    11: namespace flx { namespace demux {
    12: 
    13: class ts_poll_demuxer : public posix_demuxer {
    14:   // lock
    15:   flx::pthread::flx_mutex_t    ham_fist;
    16:   // protects this little fella here.
    17:   poll_demuxer    demux;
    18: 
    19:   self_piper      sp;
    20: protected:
    21:   virtual void    get_evts(bool poll);
    22: public:
    23:   ts_poll_demuxer();
    24:   ~ts_poll_demuxer();
    25: 
    26:   virtual int     add_socket_wakeup(socket_wakeup* sv, int flags);
    27: 
    28:   // oops! need to correctly get/set the quit flag
    29:   virtual demux_quit_flag* get_quit_flag() { return demux.get_quit_flag(); }
    30:   virtual void set_quit_flag(demux_quit_flag* f) { demux.set_quit_flag(f); }
    31: };
    32: 
    33: }} // namespace demux, flx
    34: 
    35: #endif
    36: 
End cpp section to demux/demux_ts_poll_demuxer.hpp[1]
Start cpp section to demux/demux_ts_poll_demuxer.cpp[1 /1 ]
     1: #line 4595 "./lpsrc/flx_demux.pak"
     2: #include "demux_ts_poll_demuxer.hpp"
     3: #include <stdio.h>
     4: 
     5: namespace flx { namespace demux {
     6: 
     7: ts_poll_demuxer::ts_poll_demuxer()
     8: {
     9:   fprintf(stderr, "ts_poll_demuxer installing self-piper\n");
    10:   // install self pipe trick.
    11:   sp.install(&demux);
    12: }
    13: 
    14: ts_poll_demuxer::~ts_poll_demuxer()
    15: {
    16:   fprintf(stderr, "ts_polling asking thread to quit\n");
    17:   async_quit();  // get async waiting thread to stop waiting
    18:   fprintf(stderr, "ts_poll async quit finished\n");
    19: }
    20: 
    21: void
    22: ts_poll_demuxer::get_evts(bool poll)
    23: {
    24:   void  *fds, *svs;
    25: 
    26:   // copy args under lock
    27:   {
    28:     flx::pthread::flx_mutex_locker_t  locker(ham_fist);
    29:     demux.get_arrays(&fds, &svs);  // the arrays are now mine
    30:     // lock released
    31:   }
    32: 
    33:   // do the poll
    34:   int  nevts = demux.dopoll(fds, poll);
    35: 
    36:   // regardless of the number of events, I have to copy the pieces back
    37:   // under lock tutelage
    38:   {
    39:     flx::pthread::flx_mutex_locker_t  locker(ham_fist);
    40:     demux.process_evts(fds, svs, nevts);
    41:     // lock released
    42:   }
    43: }
    44: 
    45: // thread safe overloaded functions follow. all acquire the lock
    46: // then call the unthreadsafe version of the function. nice!
    47: int
    48: ts_poll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    49: {
    50:   // fprintf(stderr, "ts_poll::add_sock(%i, %x)\n", sv->s, flags);
    51:   flx::pthread::flx_mutex_locker_t  locker(ham_fist);
    52: 
    53:   int  res = demux.add_socket_wakeup(sv, flags);
    54:   // I wouldn't touch the sv after this.
    55: 
    56:   if(-1 != res) sp.wake();
    57: 
    58:   return res;
    59: }
    60: }}
End cpp section to demux/demux_ts_poll_demuxer.cpp[1]