1.1. OS specific demux codes

Start cpp section to demux/demux_posix_demuxer.hpp[1 /1 ]
     1: #line 3 "./lpsrc/flx_posix_demux.ipk"
     2: #ifndef __FLX_DEMUX_POSIX_DEMUXER_H__
     3: #define __FLX_DEMUX_POSIX_DEMUXER_H__
     4: 
     5: // base classes for posix style demuxers
     6: 
     7: #include "demux_demuxer.hpp"
     8: 
     9: namespace flx { namespace demux {
    10: class DEMUX_EXTERN posix_demuxer;            // fwd decl
    11: 
    12: // abc
    13: class DEMUX_EXTERN posix_wakeup {
    14: public:
    15:   virtual ~posix_wakeup() {}
    16: 
    17:   // when called, the wakeup has finished and been removed.
    18:   virtual void wakeup(posix_demuxer& demux) = 0;
    19: };
    20: 
    21: class DEMUX_EXTERN socket_wakeup : public posix_wakeup {
    22: public:
    23:   int   s;                // the non blocking socket
    24:   int   wakeup_flags;         // set on wakeup, r/w or both
    25: 
    26:   socket_wakeup() : s(-1) {}
    27: };
    28: 
    29: class DEMUX_EXTERN posix_demuxer : public demuxer {
    30: protected:
    31:   void async_quit(); // useful for requesting wait thread quit in
    32:                      // thread safe demuxer destructors. doesn't throw.
    33: 
    34: public:
    35:   virtual ~posix_demuxer();
    36: 
    37:   // posix style sockets. for reading and writing (but not both at once
    38:   // for the same socket_wakeup) you are guaranteed to receive only one
    39:   // wakeup per call to this function when you call wait.
    40:   // returns -1 if no wakeup is coming and zero if one is.
    41:   // For simultaneous reading and writing you may get two wakeups,
    42:   // that is, it may violate the "one shot" rule. Ignoring for now,
    43:   // as it's not a common use. This makes it undefined behaviour.
    44:   // wakeup is owned by the demuxer until its wakeup is called,
    45:   // so treat it with kid gloves, i.e. don't mess with it.
    46:   virtual int   add_socket_wakeup(socket_wakeup* sv, int flags) = 0;
    47: 
    48:   // to be called when we can read & write without blocking
    49:   // return true if connection closed, update pb
    50:   // sort of a strange place to have this..., more a socket wakeup
    51:   // thing, even if static
    52:   static bool   socket_recv(int s, sel_param* pb);
    53:   static bool   socket_send(int s, sel_param* pb);
    54: };
    55: 
    56: // some handy control blocks for common non-blocking socket operations
    57: // note that they "fortuitously" both have start methods. hmm.
    58: // a socket io one could be handy here.
    59: 
    60: // this one's restartable (makes sense for listener sockets)
    61: class DEMUX_EXTERN accept_control_block : public socket_wakeup {
    62: public:
    63:   int   accepted;   // accepted socket (out)
    64:   int   socket_err;   // the error, if acceptee == -1, else 0 (out)
    65: 
    66:   accept_control_block() : accepted(-1), socket_err(0) {}
    67: 
    68:   virtual int start(posix_demuxer& demux);
    69:   virtual void wakeup(posix_demuxer& demux);
    70: };
    71: 
    72: class DEMUX_EXTERN connect_control_block : public socket_wakeup {
    73: public:
    74:   int     socket_err;   // outgoing error (on start or wake)
    75:   // this should probably be a sockaddr type
    76:   const char* addy;     // addr (dotted quad) (in)
    77:   int     p;        // port (in)
    78: 
    79:   connect_control_block() : socket_err(0) {}
    80: 
    81:   virtual int start(posix_demuxer& demux);
    82:   virtual void wakeup(posix_demuxer& demux);
    83: 
    84:   // oops, can't check for s != -1 as it's always there.
    85:   // was always "finished" and so I started io, losing the first wakeup
    86:   // on epoll evtsrc. Is this right, or should it be != EINPROGRESS?
    87:   // keep in sync with iocp version. give socket_err initial definition
    88:   // that works with this?
    89:   bool finished() { return ( 0 == socket_err); }
    90: };
    91: 
    92: }} // namespace demux, flx
    93: #endif
    94: 
End cpp section to demux/demux_posix_demuxer.hpp[1]
Start cpp section to demux/demux_posix_demuxer.cpp[1 /1 ]
     1: #line 98 "./lpsrc/flx_posix_demux.ipk"
     2: #include "demux_posix_demuxer.hpp"
     3: #include "demux_sockety.hpp"
     4: #include "demux_quitter.hpp" // fns for waking and quitting a demuxer
     5: 
     6: #include <stdio.h>        // "printf"
     7: #include <assert.h>       // assert
     8: #include <string.h>       // strerror
     9: #include <unistd.h>       // close
    10: 
    11: #include <sys/types.h>      // send/recv
    12: #include <sys/socket.h>
    13: 
    14: //#include <sys/errno.h>
    15: #include <errno.h>        // GUSI & solaris prefer this
    16: 
    17: namespace flx { namespace demux {
    18: 
    19: posix_demuxer::~posix_demuxer()
    20: {
    21: }
    22: 
    23: bool
    24: posix_demuxer::socket_recv(int s, sel_param* pb)
    25: {
    26:   // why do I have the zero buffer size?
    27:   assert(pb->buffer_size > pb->bytes_written || 0 == pb->buffer_size);
    28:   ssize_t     nbytes;
    29: 
    30:   // if this were read then this fn would work with non-sockets
    31:   nbytes = recv(s, pb->buffer + pb->bytes_written,
    32:         pb->buffer_size - pb->bytes_written, 0);
    33: 
    34:   /*
    35:   fprintf(stderr,"posix_demuxer RECV: s=%d, pb=%p, buf+%d, req=%d, got %d\n",
    36:     s,pb, int(pb->bytes_written), int(pb->buffer_size - pb->bytes_written), int(nbytes)
    37:   );
    38:   */
    39:   if(nbytes <= 0)
    40:   {
    41:     if(nbytes == 0)
    42:     {
    43:       return true;        // connection closed
    44:     }
    45:     else
    46:     {
    47:       perror("recv");       // can get reset connection here
    48:       return true;        // so say closed, yeah?
    49:     }
    50:   }
    51:   else
    52:   {
    53:     // got some data
    54:     pb->bytes_written += nbytes;
    55:   }
    56:   return false;           // connection didn't close
    57: }
    58: 
    59: bool
    60: posix_demuxer::socket_send(int s, sel_param* pb)
    61: {
    62:   // kqueue (and some of the other ones) can let you know know how much
    63:   // to write... imagine that!
    64: 
    65:   // why do I have the zero buffer size?
    66:   assert(pb->buffer_size > pb->bytes_written || 0 == pb->buffer_size);
    67: 
    68:   ssize_t     nbytes;
    69: 
    70:   nbytes = send(s, pb->buffer + pb->bytes_written,
    71:     pb->buffer_size - pb->bytes_written, 0);
    72: 
    73:   /*
    74:   fprintf(stderr,"posix_demuxer SEND: s=%d, pb=%p buf+%d, req=%d, got %d\n",
    75:     s,pb, int(pb->bytes_written), int(pb->buffer_size - pb->bytes_written), int(nbytes)
    76:   );
    77:   */
    78:   // similar story here, with send vs write?
    79: 
    80:   // what's the story with zero? Is that allowed or does it signal
    81:   // that the connection closed?
    82:   if(-1 == nbytes)
    83:   {
    84:     perror("send");
    85:     return true;          // I guess the connection closed
    86:   }
    87:   else
    88:   {
    89:     // sent some data
    90:     pb->bytes_written += nbytes;
    91:   }
    92:   return false;           // connection didn't close
    93: }
    94: 
    95: // get a posix demuxer to quit, that is, get the demuxer's event thread
    96: // to exit. doesn't return until this has happened. pretty sure that
    97: // calling this on a demuxer used synchronously would be a bad idea.
    98: // doesn't throw
    99: void
   100: posix_demuxer::async_quit()
   101: {
   102:   try {
   103:     // NEW and IMPROVED!!! demux quitter which sets demux quit flag
   104:     // via self pipe trick then waits for self pipe/quitting callback
   105:     // to finish. no fear of quitter being destructed early!
   106:     // fprintf(stderr, "async_quit called on posix demuxer\n");
   107:     demux_quitter quitter;
   108:     quitter.quit(this);
   109:         // event thread has exited at this point
   110:   } catch(...) {
   111:     fprintf(stderr, "error waking demuxer with self pipe quitter\n");
   112:   }
   113: }
   114: 
   115: #if 0
   116:   //nbytes = recv(s, pb->buffer + pb->bytes_written,
   117:   //      pb->buffer_size - pb->bytes_written, 0);
   118: 
   119:   // select and kqueue know when non socket fds have data.
   120:   // recv only works with sockets, but read works with both files
   121:   // and sockets and who knows what else. is there any disadvantage
   122:   // to using read instead? apart from losing flags arg?
   123:   // does read get the same 0 bytes = close behaviour
   124:   nbytes = read(s, pb->buffer + pb->bytes_written,
   125:         pb->buffer_size - pb->bytes_written);
   126: #endif
   127: 
   128: // handy posix control blocks for accept, connect.
   129: 
   130: int
   131: accept_control_block::start(posix_demuxer& demux)
   132: {
   133:   // add listener to demuxer as reading socket - see man 2 accept
   134:   // returns 0 on success, -1 on failure. not sure how to communicate
   135:   // the error.
   136: // could try the accept now, to see if it succeeds instantly...
   137: // observe wakeup rules (formulate them first)
   138:   accepted = -1;
   139:   // socket_err = 0;
   140:   // not quite true, but I want it to be clear if this ever becomes possible
   141:   // to do immediately
   142:   socket_err = EINPROGRESS;
   143:     return demux.add_socket_wakeup(this, PDEMUX_READ);
   144: }
   145: 
   146: // one wakeup socket is in accepted and error in socket_err
   147: void
   148: accept_control_block::wakeup(posix_demuxer& demux)
   149: {
   150:   // fprintf(stderr,"accept_control_block woke up\n");
   151: 
   152:   // we can now accept without blocking
   153:   // s is the listener, ambiguously named in parent socket_wakeup class
   154:   accepted = nice_accept(s, &socket_err);
   155: 
   156:   if(accepted == -1)
   157:   {
   158:     fprintf(stderr, "nice_accept failed, err (%i)\n", socket_err);
   159:   }
   160: }
   161: 
   162: // returns -1 on failure, 0 on success. on success the call is finished
   163: // (and so no wakeup) if socket_err == 0.
   164: int
   165: connect_control_block::start(posix_demuxer& demux)
   166: {
   167:   // fprintf(stderr,"async connect start\n");
   168: 
   169:   int finished;
   170: 
   171:   // returns either finished and err, or not finished
   172:   // and (no err || EINPROGRESS)
   173:   s = async_connect(addy, p, &finished, &socket_err);
   174: 
   175:   // fprintf(stderr,"async_connect returned s: %i, finished: %i, err=%i\n",
   176:   //  s, finished, socket_err);
   177: 
   178:   if(-1 == s)   // failed!
   179:   {
   180:     fprintf(stderr,"async_connect failed (%i)\n", socket_err);
   181:     return -1;  // error in socket_err, no wakeup
   182:   }
   183: 
   184:   if(finished)
   185:   {
   186:     // this actually happens on solaris when connecting to localhost!
   187:     fprintf(stderr,"async_connect finished immediately, waking\n");
   188:     fprintf(stderr, "No wakeup coming...\n");
   189:     // this does not indicate an error, but that there is no wakeup
   190:     // coming. this could be done by a wakeup, all that happens is
   191:     // getsockopt is called to check the socket's error state.
   192:     return -1;
   193:   }
   194: 
   195:   // fprintf(stderr,"connect_request didn't finish immediatly, sleeping\n");
   196: 
   197:   // add to demuxer as writing socket - see man 2 connect
   198:   // how do they get the error?
   199:     return demux.add_socket_wakeup(this, PDEMUX_WRITE);
   200: }
   201: 
   202: void
   203: connect_control_block::wakeup(posix_demuxer& demux)
   204: {
   205:   // fprintf(stderr,"connect woke up\n");
   206:   // this is how we check the success of async connects
   207:   // if get_socket_err fails, we're treating its errno as the socket's...
   208:   if(get_socket_error(s, &socket_err) == -1)
   209:     fprintf(stderr, "eep - get_socket_err failed!\n");
   210: 
   211:   // failed, throw away socket
   212:   if(0 != socket_err)
   213:   {
   214:     fprintf(stderr,"async connect error: %s (%i), closing\n",
   215:       strerror(socket_err), socket_err);
   216:     // we created the connect socket, so we close it too.
   217:     if(close(s) != 0)
   218:       perror("async socket close");
   219: 
   220:     s = -1;   // the result
   221:   }
   222: 
   223:   // resulting connected socket in s
   224: }
   225: }}
   226: 
End cpp section to demux/demux_posix_demuxer.cpp[1]
Start cpp section to demux/demux_pfileio.hpp[1 /1 ]
     1: #line 325 "./lpsrc/flx_posix_demux.ipk"
     2: #ifndef __FLX_DEMUX_PFILEIO_H__
     3: #define __FLX_DEMUX_PFILEIO_H__
     4: #include <flx_demux_config.hpp>
     5: 
     6: #include "demux_demuxer.hpp"
     7: #include "pthread_sleep_queue.hpp"
     8: #include "pthread_mutex.hpp"
     9: // #include <sys/types.h> // off_t (don't have flx iface to this yet)
    10:               // can just add new constructor
    11: #include "pthread_work_fifo.hpp"
    12: namespace flx { namespace demux {
    13: 
    14: // ********************************************************
    15: /// like another event source. this is basically a wrapped pread, pwrite
    16: /// should probably be derived from posix_wakeup or something like that.
    17: /// or have the same signature. abstract - users overload "finished
    18: // ********************************************************
    19: class DEMUX_EXTERN fileio_request : public flx::pthread::worker_task
    20: {
    21:   long    offset;   // make this a proper offset (64bit)
    22:   // off_t    offset; // in: offset, for use with pread, pwrite
    23:   int     fd;     // in: fd in question
    24:   bool    read_flag;  // in: read else write
    25: 
    26:   int     err;    // out:
    27: public:
    28:   // public so it can be got in felix
    29:   sel_param pb;   // in & out: what you want, what you get (64bit len?)
    30: 
    31:   virtual ~fileio_request(); // c++ should do this automatically
    32:   fileio_request();       // flx linkage
    33:   fileio_request(int f, char* buf, long len, long off, bool rd);
    34: 
    35:   virtual void doit();      // sync
    36: };
    37: 
    38: }} // namespace demux, flx
    39: #endif  // __PFILEIO__
    40: 
End cpp section to demux/demux_pfileio.hpp[1]
Start cpp section to demux/demux_pfileio.cpp[1 /1 ]
     1: #line 366 "./lpsrc/flx_posix_demux.ipk"
     2: #include <stdio.h>    // printf
     3: #include <errno.h>    // errno
     4: #include "demux_pfileio.hpp"
     5: 
     6: // blocking reads & writes that use a worker fifo. users overload
     7: // finished flag to implement wakeup
     8: 
     9: // if we could group the requests, we could do a scattered read
    10: // or we could do single reads if the requests were of a similar
    11: // nature, i.e. the whole file, of popular files.
    12: 
    13: // for pwrite/pread, I'm supposed to include the following three (osx man page)
    14: // they don't appear to be necessary, but let's play it safe
    15: #include <sys/types.h>
    16: #include <sys/uio.h>
    17: #include <unistd.h>
    18: 
    19: namespace flx { namespace demux {
    20: // fileio_request stuff follows
    21: 
    22: // read or write in a blocking fashion. I like the idea of using pread
    23: // which doesn't change the file pointer. this could allow reuse of the same
    24: // file descriptor & block caching
    25: 
    26: fileio_request::~fileio_request(){}
    27: fileio_request::fileio_request(){}
    28: 
    29: fileio_request::fileio_request(int f, char* buf, long len, long off, bool rd)
    30:   : offset(off), fd(f), read_flag(rd), err(0)
    31: {
    32:   pb.buffer = buf;
    33:   pb.buffer_size = len;
    34:   pb.bytes_written = 0;
    35: }
    36: 
    37: // synchronously process read/write
    38: void
    39: fileio_request::doit()
    40: {
    41:   /*
    42:   fprintf(stderr,"faio about to try to %s %i bytes from fd=%i\n",
    43:     (read_flag) ? "read" : "write", pb.buffer_size, fd);
    44:   */
    45: 
    46: // switching off (explicit) seeks for now because I'm not using them
    47: // in the flx code & I'm not passing around enough info (just the fd)
    48:   ssize_t res;
    49: 
    50:   if(read_flag)
    51:   {
    52:     // res = pread(fd, pb.buffer, pb.buffer_size, offset);
    53:     res = read(fd, pb.buffer, pb.buffer_size);
    54:   }
    55:   else
    56:   {
    57:     // res = pwrite(fd, pb.buffer, pb.buffer_size, offset);
    58:     res = write(fd, pb.buffer, pb.buffer_size);
    59:   }
    60: 
    61:   // zero return value indicates end of file. that should just work.
    62:   if(-1 == res)
    63:   {
    64:     err = errno;    // grab errno
    65:     fprintf(stderr,"faio error: %i\n", err);
    66:   }
    67:   else
    68:   {
    69:     // fprintf(stderr,"faio %s %i bytes\n", (read_flag) ? "read" : "write", res);
    70:     pb.bytes_written = res;
    71:   }
    72: }
    73: }}
    74: 
End cpp section to demux/demux_pfileio.cpp[1]
Start cpp section to demux/demux_self_piper.hpp[1 /1 ]
     1: #line 441 "./lpsrc/flx_posix_demux.ipk"
     2: 
     3: #ifndef __FLX_DEMUX_SELF_PIPER_H__
     4: #define __FLX_DEMUX_SELF_PIPER_H__
     5: 
     6: #include <flx_demux_config.hpp>
     7: #include "demux_posix_demuxer.hpp"
     8: 
     9: namespace flx { namespace demux {
    10: 
    11: // there's no standard posix_socketio_wakeup, could be handy. could also
    12: // perhaps use it here? this is a pipe, not a socket. not sure if recv nor
    13: // send work on it, besides want to read an unlimited amount of redundant data.
    14: class DEMUX_EXTERN selfpipe_wakeup : public socket_wakeup {
    15: public:
    16:   demux_callback* cb; // optional callback
    17: 
    18:   virtual void wakeup(posix_demuxer& demux);
    19: };
    20: 
    21: class DEMUX_EXTERN auto_fd {
    22: public:
    23:     int fd;
    24: 
    25:     auto_fd();
    26:     ~auto_fd();
    27: };
    28: 
    29: // make portable here? make part of the wakeup obj?
    30: class DEMUX_EXTERN pipe_pair {
    31:   // self pipe trick!!! fd[0] = read end, fd[1] = write end.
    32:   auto_fd         fds[2];
    33: public:
    34:   pipe_pair();
    35:   // void read_byte(); // done for us by wakeup obj.
    36:   void write_byte();
    37:   int get_read_end();
    38: };
    39: 
    40: // wakes a POSIX demuxer, for when you want some kind of attention
    41: // todo: make portable
    42: class DEMUX_EXTERN self_piper {
    43:     pipe_pair       pp;
    44:     selfpipe_wakeup spw;
    45: public:
    46:     void install(demuxer* demux, demux_callback* cb = 0);
    47:     void wake();
    48: };
    49: 
    50: }} // namespace demux, flx
    51: 
    52: #endif
End cpp section to demux/demux_self_piper.hpp[1]
Start cpp section to demux/demux_self_piper.cpp[1 /1 ]
     1: #line 494 "./lpsrc/flx_posix_demux.ipk"
     2: 
     3: #include "demux_self_piper.hpp"
     4: #include <stdio.h>              // printf, perror
     5: #include <unistd.h>             // pipe for self-pipe trick.
     6: #include <assert.h>
     7: 
     8: namespace flx { namespace demux {
     9: 
    10: auto_fd::auto_fd()
    11: {
    12:     fd = -1;        // invalid
    13: }
    14: 
    15: auto_fd::~auto_fd()
    16: {
    17:     if(-1 == fd) return;
    18: 
    19:     if(close(fd) == -1)
    20:         perror("auto fd close");
    21: }
    22: 
    23: void
    24: self_piper::install(demuxer* d, demux_callback* cb)
    25: {
    26:     //fprintf(stderr, "installing self piper in %p with cb=%p\n", d, cb);
    27:     posix_demuxer* demux = static_cast<posix_demuxer*>(d);
    28:     spw.s = pp.get_read_end();
    29:     spw.cb = cb;
    30: 
    31:     int res = demux->add_socket_wakeup(&spw, PDEMUX_READ);
    32:     assert(-1 != res);
    33: }
    34: 
    35: // wake the demuxer referenced in install
    36: void
    37: self_piper::wake()
    38: {
    39:     // fprintf(stderr, "self_piper::wake\n");
    40:     pp.write_byte();
    41: }
    42: 
    43: void
    44: selfpipe_wakeup::wakeup(posix_demuxer& demux)
    45: {
    46:     // fprintf(stderr, "selfpipe wakeup: read the pending byte and re-arm\n");
    47:     // not using the pipe pair because it doesn't know that it's part of
    48:     // one. not to worry.
    49:     ssize_t         nbytes;
    50:     char            b;
    51: 
    52:     // if this were read then this fn would work with non-sockets
    53:     // EH? It IS read.
    54:     nbytes = read(s, &b, 1);
    55: 
    56:     if(nbytes == -1) perror("read");
    57: 
    58:     // fprintf(stderr, "GOT: %li, %x\n", nbytes, b);
    59:     assert(nbytes == 1 && b == 1);
    60: 
    61:     // callback!
    62:     if(cb) cb->callback(&demux);
    63: 
    64:     // add self back! this happens even when we're quitting, but that
    65:     // doesn't seem to matter.
    66:     // fprintf(stderr, "selfpiper rearming\n");
    67:     int res = demux.add_socket_wakeup(this, PDEMUX_READ);
    68:     assert(-1 != res);
    69: }
    70: 
    71: pipe_pair::pipe_pair()
    72: {
    73:   // fprintf(stderr, "creating pipe for self-pipe trick\n");
    74: 
    75:   int         self_pipe_fds[2];
    76:   if(pipe(self_pipe_fds) == -1)
    77:   {
    78:       perror("ts_select_demuxer::self_pipe");
    79:       throw -1;
    80:   }
    81: 
    82:   // fprintf(stderr, "self pipe fds: read: %i, write: %i\n",
    83:   //  self_pipe_fds[0], self_pipe_fds[1]);
    84: 
    85:   fds[0].fd = self_pipe_fds[0];
    86:   fds[1].fd = self_pipe_fds[1];
    87: }
    88: 
    89: void
    90: pipe_pair::write_byte()
    91: {
    92:     char    b = 1;
    93:     ssize_t nbytes;
    94:     // is this blocking? I guess it has to be...
    95:     nbytes = write(fds[1].fd, &b, 1);       // wake up, jeff!
    96: 
    97:     // fprintf(stderr, "self_piper::wake write returned: %i\n", nbytes);
    98: 
    99:     if(-1 == nbytes) perror("pipe_pair::write_byte");
   100:     assert(1 == nbytes);
   101: }
   102: 
   103: int
   104: pipe_pair::get_read_end()
   105: {
   106:   return fds[0].fd;
   107: }
   108: 
   109: } }
   110: 
End cpp section to demux/demux_self_piper.cpp[1]
Start cpp section to demux/demux_sockety.hpp[1 /1 ]
     1: #line 605 "./lpsrc/flx_posix_demux.ipk"
     2: #ifndef __FLX_DEMUX_SOCKETY_H__
     3: #define __FLX_DEMUX_SOCKETY_H__
     4: #include <flx_demux_config.hpp>
     5: namespace flx { namespace demux {
     6: 
     7: // Shouldn't this all be DEMUX_EXTERN? eh, doesn't happen on win32
     8: // we'll probably live.
     9: int create_listener_socket(int* io_port, int q_len);
    10: int create_async_listener(int* io_port, int q_len);
    11: int nice_accept(int listener, int* err);
    12: int nice_connect(const char* addr, int port);
    13: int async_connect(const char* addr, int port, int* finished, int* err);
    14: 
    15: /* handy socket building blocks */
    16: 
    17: int connect_sock(int s, const char* addr, int port);
    18: 
    19: /* this could possibly do with NIC addr as well as port */
    20: int bind_sock(int s, int* io_port);
    21: 
    22: int make_nonblock(int s);
    23: int make_linger(int s, int t);
    24: int set_tcp_nodelay(int s, int disable_nagle);
    25: int get_socket_error(int s, int* socket_err);
    26: 
    27: }} // namespace demux, flx
    28: #endif
    29: 
End cpp section to demux/demux_sockety.hpp[1]
NOTE: sockety.cpp is created by flx_demuxconfig.pak .. it isn't missing!
Start cpp section to demux/demux_epoll_demuxer.hpp[1 /1 ]
     1: #line 3 "./lpsrc/flx_epoll_demux.ipk"
     2: #ifndef __FLX_DEMUX_EPOLL_DEMUXER_H__
     3: #define __FLX_DEMUX_EPOLL_DEMUXER_H__
     4: 
     5: #include <flx_demux_config.hpp>
     6: #include "demux_posix_demuxer.hpp"
     7: 
     8: namespace flx { namespace demux {
     9: // epoll allows only one event per socket - it does not differentiate
    10: // on the awaited operation (read/write), however it does let you wait
    11: // on any combination (I think)
    12: 
    13: // ********************************************************
    14: /// epoll based demuxer
    15: // ********************************************************
    16: 
    17: class DEMUX_EXTERN epoll_demuxer : public posix_demuxer {
    18:   int   epoll_fd;
    19: 
    20:   // be careful of this - don't let it create race conditions
    21:   // should probably only be called by wait = in one thread only (check)
    22:   // this removes ALL outstanding events for s.
    23:   void  remove_wakeup(int s);
    24: 
    25:   virtual void  get_evts(bool poll);
    26: public:
    27:   epoll_demuxer();
    28:   virtual ~epoll_demuxer();
    29: 
    30:   virtual int   add_socket_wakeup(socket_wakeup* sv, int flags);
    31: };
    32: 
    33: }} // namespace demux, flx
    34: #endif
    35: 
End cpp section to demux/demux_epoll_demuxer.hpp[1]
Start cpp section to demux/demux_epoll_demuxer.cpp[1 /1 ]
     1: #line 39 "./lpsrc/flx_epoll_demux.ipk"
     2: // epoll interface. does epoll support ordinary files in addition to sockets?
     3: // EPOLLET to make epoll edgetriggered. I guess the default is level triggered.
     4: 
     5: // epoll events are not one shot, in fact they're quite sticky so socket
     6: // filters must be removed manually to guarantee a one-to-one wakeup
     7: // to add_wakeup ratio. note that the oneshot flag is not a solution.
     8: 
     9: // cool! EPOLLONESHOT
    10: // BUGGER! doesn't seem to exist! and doing this doesn't make it so!
    11: // #ifndef EPOLLONESHOT
    12: // #define EPOLLONESHOT (1<<30)
    13: // #endif
    14: 
    15: #include "demux_epoll_demuxer.hpp"
    16: 
    17: #include <sys/epoll.h>  // for epoll_*
    18: #include <stdio.h>    // for perror
    19: #include <unistd.h>   // for close
    20: #include <errno.h>    // EEXIST, errno
    21: 
    22: namespace flx { namespace demux {
    23: 
    24: epoll_demuxer::epoll_demuxer()
    25:   : epoll_fd(-1)
    26: {
    27:   // EPOLLONESHOT is shit, don't use it. Enabling it just means that your
    28:   // wakeups are suppressed and you have to use EPOLL_CTL_MOD instead
    29:   // of EPOLL_CTL_ADD. If it isn't defined then so much the better.
    30: //#ifdef EPOLLONESHOT
    31: //  fprintf(stderr,"WARNING: EPOLLONESHOT AVAILABLE (%x)!!!\n", EPOLLONESHOT);
    32: //#endif
    33: 
    34:   // god knows what the maximum size will be, I'll just say 1 for now
    35:   epoll_fd = epoll_create(1);
    36:   if(-1 == epoll_fd)
    37:   {
    38:     perror("epoll_create");
    39:     throw -1;
    40:   }
    41: }
    42: 
    43: epoll_demuxer::~epoll_demuxer()
    44: {
    45:   async_quit(); // get waiting thread to exit.
    46: 
    47:   if(-1 != epoll_fd)
    48:   {
    49:     if(close(epoll_fd) != 0)
    50:       perror("epoll close");
    51:   }
    52: }
    53: 
    54: int
    55: epoll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    56: {
    57:   int s = sv->s;
    58: 
    59:   struct epoll_event  evt;
    60:   // fprintf(stderr,"add_socket_wakeup: %i (sv=%p, flags=%x)\n",
    61:   //  s, sv, flags);
    62: 
    63:   // EPOLLONESHOT saves us not only a system call to remove epoll evts,
    64:   // which aren't intrinsically one-shot, but having to do it ourselves
    65:   // would have been a pain as epoll doesn't tell you which fd had the event
    66:   // this way we can get away with not knowing & not losing our user cookie
    67:   evt.events = 0;
    68: 
    69:   if(flags & PDEMUX_READ) evt.events |= EPOLLIN;
    70:   if(flags & PDEMUX_WRITE) evt.events |= EPOLLOUT;
    71: 
    72:   // fprintf(stderr, "flags %x -> evt.events %x\n", flags, evt.events);
    73: 
    74:   // We do the remove manually because oneshot in epoll doesn't
    75:   // remove the socket, but rather, disables it.
    76: //#ifdef EPOLLONESHOT
    77: //  evt.events |= EPOLLONESHOT;         // yes!
    78: //#endif
    79:   // I think EPOLLHUP comes when the connection closes (on read?)
    80: // poll's (plain old poll) equivalents to this are ignored for input
    81: // same here?
    82:   // I get EPOLLHUPs for bad async connects whether I ask for them or not.
    83:   evt.events |= (EPOLLHUP | EPOLLERR);    // I think I want this
    84: 
    85:   evt.data.ptr = sv;              // our user data
    86: 
    87:   if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, s, &evt) == -1)
    88:   {
    89:     // EPOLL_CTL_MOD cannot help us do bidirection io on one socket,
    90:     // the mod will overwrite the old user cookie and direction.
    91:     // It seems that only kqueues, select and iocps allow that.
    92:     // Will need a wakeup that can do both, oneshot that indicates
    93:     // the available direction.
    94: // when using oneshot, we're supposed to use EPOLL_CTL_MOD
    95: #if 0
    96:     int err = errno;
    97: 
    98:     if(EEXIST == err)
    99:     {
   100:       // ok - let's try EPOLL_CTL_MOD
   101:       fprintf(stderr, "RETRYING WITH EPOLL_CTL_MOD\n");
   102:       if(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, s, &evt) != -1)
   103:         return 0; // good!
   104:     }
   105: #endif
   106:     perror("epoll_ctl (add)");
   107: 
   108:     return -1;
   109:   }
   110:   return 0;
   111: }
   112: 
   113: // epoll doesn't differentiate on events. I bet I could
   114: // just not pass that event...
   115: void
   116: epoll_demuxer::remove_wakeup(int s)
   117: {
   118:   // EPOLL_CTL_DEL uses no information from the event
   119:   // and so I should be able to pass NULL.
   120:   struct epoll_event evt;
   121:   // evt.events = (read) ? EPOLLIN : EPOLLOUT;
   122: 
   123:   // fprintf(stderr,"removing socket wakeup %i\n", s);
   124: 
   125:   if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, s, &evt) == -1)
   126:   {
   127:     //const char* str = (read) ? "epoll_ctl (remove read)"
   128:     //  : "epoll_ctl (remove write)";
   129:     // perror(str);
   130:     perror("epoll_ctl (remove)");
   131:   }
   132: }
   133: 
   134: void
   135: epoll_demuxer::get_evts(bool poll)
   136: {
   137:   struct epoll_event  evt;
   138: 
   139:   switch(epoll_wait(epoll_fd, &evt, 1, (poll) ? 0 : ~0))
   140:   {
   141:     case -1:    // error
   142:     perror("epoll_wait");
   143:       // fall through
   144:     case 0:     // no events (happens with timeout)
   145:       return;
   146:   }
   147: 
   148:   socket_wakeup* sv = (socket_wakeup*)evt.data.ptr;
   149: 
   150:   // not seeing timeouts as they're filtered by the switching.
   151:   // assuming that sv is good
   152:   // fprintf(stderr,"wakeup (sv=%p, sv->s=%i evt.events=%x)!\n",
   153:   //  sv, sv->s, evt.events);
   154: 
   155:   // accumulate bit field of what we got
   156:   sv->wakeup_flags = 0;
   157: 
   158:   bool  wake = false;
   159: 
   160:   // it might be possible to get both a read & write event...
   161:   // in which case I should take out the else below
   162:   if(evt.events & EPOLLIN)                // I think this is how you do it
   163:   {
   164:     // fprintf(stderr,"EPOLLIN for %p\n", sv);
   165:     sv->wakeup_flags |= PDEMUX_READ;
   166:     wake = true;
   167:   }
   168: 
   169:   if(evt.events & EPOLLOUT)
   170:   {
   171:     //fprintf(stderr,"EPOLLOUT for %p\n", sv);
   172:     sv->wakeup_flags |= PDEMUX_WRITE;
   173:     wake = true;
   174:   }
   175: 
   176:   // Is this for shutdown or closing of the other end?
   177:   // I get it for failed async connects. I don't know if other events cause
   178:   // it. In any case, I don't know whether it should be for read or write,
   179:   // so I just don't say. In any case, it should wake to get error.
   180:   // I seem to get both EPOLLHUP and EPOLLERROR on bad async connect
   181:   // see poll demuxer notes on POLLHUP for further possibly useful info.
   182:   if(evt.events & EPOLLHUP)
   183:   {
   184:     fprintf(stderr, "EPOLLHUP for %p->%i\n", sv, sv->s);
   185:     sv->wakeup_flags |= PDEMUX_EOF;
   186:     wake = true;
   187:   }
   188: 
   189:   if(evt.events & EPOLLERR)
   190:   {
   191: // How do I retrieve the error?
   192:     // There's no ambiguity - there's only ever one fd in a given epoll.
   193:     // If oneshot's present then don't need to do anything
   194: // not sure what to do here. if we've enabled/got oneshot the socket
   195: // should already have been removed
   196:     fprintf(stderr,"epoll error, waking: %i (errno?)\n", sv->s);
   197:     // similar story to EPOLLHUP
   198:     sv->wakeup_flags |= PDEMUX_ERROR;
   199:     wake = true;
   200:   }
   201: 
   202:     if((evt.events & ~(EPOLLERR|EPOLLIN|EPOLLOUT|EPOLLHUP)))
   203:     {
   204:         fprintf(stderr,"unknown events in epoll_demuxer %x\n", evt.events);
   205:     }
   206: 
   207:   // we got something. tell the people.
   208:   // not dependent solely on wakeup_flags - errors need to wake too.
   209:   if(wake)
   210:   {
   211:     // we got something. better call wakeup, must remove to guarantee
   212:     // 1-1 wakeups with add_sockets
   213:     // fprintf(stderr, "no one-shot... remove %i\n", sv->s);
   214:     remove_wakeup(sv->s);
   215:     // fprintf(stderr, "calling wakeup (flags=%x)\n", sv->wakeup_flags);
   216:     sv->wakeup(*this);
   217:   }
   218: }
   219: }}
   220: 
   221: 
End cpp section to demux/demux_epoll_demuxer.cpp[1]
Start cpp section to demux/demux_evtport_demuxer.hpp[1 /1 ]
     1: #line 3 "./lpsrc/flx_evtport_demux.ipk"
     2: #ifndef __FLX_DEMUX_EVTPORT_DEMUXER_H__
     3: #define __FLX_DEMUX_EVTPORT_DEMUXER_H__
     4: 
     5: // driver for solaris 10 event port notifications
     6: 
     7: #include "demux_posix_demuxer.hpp"
     8: 
     9: namespace flx { namespace demux {
    10: 
    11: // Event ports are oneshot by default (I don't know if you can change that).
    12: // Events are tracked only by fd and not fd*event, so you cannot add
    13: // separate wakeups for read and write with the same fd and hope for it to
    14: // work as the later one will overwrite the earlier, fodder for race
    15: // conditions. This impl satisfies 1-1 wakeup to request ratio.
    16: 
    17: // I don't know if evtports can be waited upon by other evtports
    18: 
    19: // OBS.
    20: // after removing the threads from the demuxers/event sources
    21: // how are the two half demuxers supposed to work? They used to
    22: // have three threads and now they have one. How can two waits be
    23: // done in one thread? I could add one half_demuxer's evtport to
    24: // the other's and wait on that. Would that work? Otherwise I'll
    25: // have to start a thread, which screws things up a bit. Could do
    26: // that and communicate back to single thread via a waitable queue.
    27: // could have three half-demuxers, add them both to third and call
    28: // their wait functions depending on the outer's wait result.
    29: 
    30: class DEMUX_EXTERN evtport_demuxer : public posix_demuxer {
    31:     int     evtport;
    32: 
    33:   // I think evtports only track socket the socket and not
    34:   // socket*operation, so there's only one remove
    35:   void remove_wakeup(int s);
    36: 
    37:     virtual void  get_evts(bool poll);
    38: public:
    39:   evtport_demuxer();
    40:   virtual ~evtport_demuxer();
    41: 
    42:   virtual int   add_socket_wakeup(socket_wakeup* sv, int flags);
    43: };
    44: 
    45: }} // namespace demux, flx
    46: #endif
    47: 
End cpp section to demux/demux_evtport_demuxer.hpp[1]
Start cpp section to demux/demux_evtport_demuxer.cpp[1 /1 ]
     1: #line 51 "./lpsrc/flx_evtport_demux.ipk"
     2: // Evtports can get timer wakeups with PORT_SOURCE_TIMER.
     3: // Can also pick up aio notifications with PORT_SOURCE_AIO.
     4: 
     5: // looks like this stuff is only in solaris10, and not SunOS 5.8. Damn.
     6: 
     7: #include "demux_evtport_demuxer.hpp"
     8: 
     9: #include <port.h>
    10: #include <poll.h> // POLLIN/POLLOUT
    11: #include <stdio.h>  // printf
    12: #include <unistd.h> // close
    13: #include <assert.h>
    14: 
    15: namespace flx { namespace demux {
    16: 
    17: // header files for this stuff?
    18: // can use port_send for user defined events, to wake up reap loop
    19: // truss to see what's happening
    20: 
    21: evtport_demuxer::evtport_demuxer()
    22: {
    23:   if((evtport = port_create()) < 0)
    24:   {
    25:     perror("port_create");
    26:     throw -1;
    27:   }
    28: }
    29: 
    30: evtport_demuxer::~evtport_demuxer()
    31: {
    32:   async_quit(); // gets waiting thread to quit, returning afterwards
    33: 
    34:   if(-1 != evtport)
    35:   {
    36:     if(close(evtport) != 0)
    37:       perror("evtport close");
    38:   }
    39: }
    40: 
    41: int
    42: evtport_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    43: {
    44:   if(flags & ~(PDEMUX_READ|PDEMUX_WRITE)) // I can't understand you, lady!
    45:     return -1;
    46: 
    47:   int events = 0; // events are flags so we can accumulate them
    48:   int s = sv->s;
    49: 
    50:   if(flags & PDEMUX_READ) events |= POLLIN;
    51:   if(flags & PDEMUX_WRITE) events |= POLLOUT;
    52: 
    53:   // POLLHUP might make sense only for reads... add conditionally?
    54:   events |= (POLLHUP | POLLERR);
    55: 
    56:   // fprintf(stderr,"add_socket_wakeup: %i, sv: %p (%x)\n", s, sv, flags);
    57: 
    58:   // register for event we are interested in...
    59:   // works for files, sockets, timers...
    60:   // sockets are file descriptors in unix, so src is PORT_SOURCE_FD
    61:   if(port_associate(evtport, PORT_SOURCE_FD, (uintptr_t)s, events, sv) == -1)
    62:   {
    63:     perror("add socket_wakeup/port_associate");
    64:     return -1;
    65:   }
    66: 
    67:   return 0;
    68: }
    69: 
    70: // note that these two functions are exactly the same
    71: // we have to remove after a read or write else we can get multiple
    72: // wakeups - usually with a dud user cookie. the fact that there is
    73: // no differentiation between POLLIN & POLLOUT could be a problem for
    74: // mixed read/write things (rare). note that evt_ports let me associate
    75: // the samething twice. I don't know if this means you have to dissociate
    76: // (disassociate) twice.
    77: void
    78: evtport_demuxer::remove_wakeup(int s)
    79: {
    80:   if(port_dissociate(evtport, PORT_SOURCE_FD, s) == -1)
    81:     perror("reading port_dissociate");
    82: }
    83: 
    84: #define POLLPR(ev) if(e->portev_events & ev) fprintf(stderr,#ev", ")
    85: 
    86: static void
    87: print_port_evt(port_event_t* e)
    88: {
    89:   char* srcstr[PORT_SOURCE_ALERT-PORT_SOURCE_AIO+1]
    90:     = { "ALERT", "TIMER", "USER", "FD", "AIO"};
    91:   fprintf(stderr,"e: %p\n\t", e);
    92:   //fprintf(stderr,"portev_events: %x\n\t", e->portev_events);
    93:   fprintf(stderr,"portev_events: ");
    94: 
    95:   // I got these constants from the poll.h file
    96:   POLLPR(POLLIN); POLLPR(POLLOUT); POLLPR(POLLPRI);
    97:   POLLPR(POLLRDNORM); POLLPR(POLLRDBAND); POLLPR(POLLWRBAND);
    98: 
    99:   // in poll these are in a different field. port_event_t doesn't
   100:   // have that field, so lets try here.
   101:   POLLPR(POLLERR); POLLPR(POLLHUP); POLLPR(POLLNVAL); POLLPR(POLLREMOVE);
   102: 
   103:   fprintf(stderr," (%x)\n\t", e->portev_events);
   104: 
   105:   int src = e->portev_source;
   106:   if(PORT_SOURCE_AIO <= src && src <= PORT_SOURCE_ALERT)
   107:   {
   108:     fprintf(stderr,"portev_source: PORT_SOURCE_%s (%x)\n\t",
   109:       srcstr[src-PORT_SOURCE_AIO], src);
   110:   }
   111:   else
   112:   {
   113:     fprintf(stderr,"portev_source: %x\n\t", e->portev_source);
   114:   }
   115: 
   116:   fprintf(stderr,"portev_pad: %x\n\t", e->portev_pad);
   117:   // often our socket
   118:   fprintf(stderr,"portev_object: %x\n\t", e->portev_object);
   119:   fprintf(stderr,"portev_user: %p\n", e->portev_user);
   120: }
   121: 
   122: void
   123: evtport_demuxer::get_evts(bool poll)
   124: {
   125:   // Block until a single event appears on the port. Event will not fire
   126:   // again, so we get max 1 wakeup per event added.
   127: 
   128:   port_event_t  evt;
   129:   timespec    timeout, *tp = NULL;
   130: 
   131:   if(poll)    // effect a poll
   132:   {
   133:     timeout.tv_sec = 0;
   134:     timeout.tv_nsec = 0;
   135:     tp = &timeout;
   136:   }
   137: 
   138:   // wait for single event, no timeout
   139:   if(port_get(evtport, &evt, tp) == -1)
   140:   {
   141:     perror("port_get");
   142:     return;
   143:   }
   144: 
   145:   // fprintf(stderr,"PORT_GET RETURNED: "); print_port_evt(&evt);
   146: 
   147:   // get wakeup obj tucked away in the user cookie.
   148:   socket_wakeup*  sv = (socket_wakeup*)evt.portev_user;
   149:   int       s = evt.portev_object;
   150: 
   151:   assert(sv != NULL);
   152:   if(evt.portev_source != PORT_SOURCE_FD)
   153:   {
   154:     // when polling I often end up in here - we get an unknown evt
   155:     // source and a POLLNVAL event plus lots of other unknown flags.
   156:     // there's interesting looking stuff in the user field and so on,
   157:     // but it's nothing of mine and also undocumented
   158:     // fprintf(stderr,"got non PORT_SOURCE_FD (s=%i, sv=%p, src=%i)\n",
   159:     //  s, sv, evt.portev_source);
   160:     // fprintf(stderr, "skipping out...\n");
   161:     return;
   162:   }
   163: 
   164: 
   165:   // let's see what we've got for the wakeup
   166:   sv->wakeup_flags = 0;
   167: 
   168:   if(evt.portev_events & POLLERR)
   169:   {
   170:     fprintf(stderr,"ERRORS on s = %i, sv = %p\n", s, sv);
   171:     //evt.portev_events &= ~POLLERR;
   172:     //return;
   173:   }
   174: 
   175:   // for bidirectional wakeups, we should be able to get both
   176:   // POLLIN and POLLOUT at the same time, but I've not yet
   177:   // seen it happen, they're coming in one at a time for me.
   178: 
   179: 
   180:   if(evt.portev_events & POLLIN)
   181:   {
   182:     // fprintf(stderr,"GOT POLLIN FOR %p\n", sv);
   183:     sv->wakeup_flags |= PDEMUX_READ;
   184:   }
   185: 
   186:   if(evt.portev_events & POLLOUT)
   187:   {
   188:     // fprintf(stderr,"GOT POLLOUT FOR %p\n", sv);
   189:     sv->wakeup_flags |= PDEMUX_WRITE;
   190:   }
   191: 
   192:   // I never asked for POLLERR, but anyway
   193:   if(evt.portev_events & ~(POLLIN | POLLOUT | POLLERR))
   194:     {
   195:         fprintf(stderr,"UNSOLICITED events in evtport_demuxer (%x)\n",
   196:       evt.portev_events);
   197:     }
   198: 
   199:   assert(sv->wakeup_flags != 0);    // we should've gotten SOMETHING.
   200: 
   201:   if(sv->wakeup_flags)
   202:     sv->wakeup(*this);
   203: }
   204: }}
   205: 
   206: 
End cpp section to demux/demux_evtport_demuxer.cpp[1]
Start cpp section to demux/demux_kqueue_demuxer.hpp[1 /1 ]
     1: #line 3 "./lpsrc/flx_kqueue_demux.ipk"
     2: #ifndef __FLX_DEMUX_KQUEUE_DEMUXER_H__
     3: #define __FLX_DEMUX_KQUEUE_DEMUXER_H__
     4: 
     5: #include "demux_posix_demuxer.hpp"
     6: 
     7: namespace flx { namespace demux {
     8: 
     9: // ********************************************************
    10: /// kqueue demuxer for osx'n'BSD and at least 1 linux
    11: // ********************************************************
    12: class DEMUX_EXTERN kqueue_demuxer : public posix_demuxer {
    13:   int   kq;
    14: protected:
    15:   // this could just be passed the socket_wakeup, if it stored
    16:   // the flags. Those flags are also set, though, which would
    17:   // create a race condition. In and out flags?
    18: 
    19:   int add_kqueue_filter(socket_wakeup* sv, short filter);
    20:   int remove_kqueue_filter(int s, short filter);
    21: 
    22:   int remove_socket_wakeup(int s, int flags);
    23:   void get_evts(bool poll);
    24: public:
    25:   kqueue_demuxer();
    26:   virtual ~kqueue_demuxer();
    27: 
    28:   virtual int add_socket_wakeup(socket_wakeup* sv, int flags);
    29: };
    30: 
    31: }} // namespace demux, flx
    32: #endif
    33: 
End cpp section to demux/demux_kqueue_demuxer.hpp[1]
Start cpp section to demux/demux_kqueue_demuxer.cpp[1 /1 ]
     1: #line 37 "./lpsrc/flx_kqueue_demux.ipk"
     2: // kqueue demuxer for bsd/os x
     3: // N.B. calling close on a file descriptor will remove any kevents that
     4: // reference that descriptor. that would explain remove complaining from
     5: // time to time.
     6: // try EV_EOF to pick up eofs, useful for async file io.
     7: 
     8: #include "demux_kqueue_demuxer.hpp"
     9: 
    10: #include <stdio.h>      // perror
    11: #include <unistd.h>     // close
    12: 
    13: #include <sys/types.h>    // from the kqueue manpage
    14: #include <sys/event.h>    // kernel events
    15: #include <sys/time.h>   // timespec (kevent timeout)
    16: 
    17: // #include <sys/syscall.h> // syscall(SYC_close,kq) close workaround
    18: 
    19: namespace flx { namespace demux {
    20: kqueue_demuxer::kqueue_demuxer()
    21:   : kq(-1)
    22: {
    23:   // Not that you care, but this event queue is not inherited by
    24:   // forked children.
    25:   kq = kqueue();
    26:   if(-1 == kq)
    27:   {
    28:     perror("kqueue");
    29:     throw -1;
    30:   }
    31: }
    32: 
    33: kqueue_demuxer::~kqueue_demuxer()
    34: {
    35:   // calling close on a kq while a thread is waiting in kevent causes close
    36:   // to block! this happens on 10.4. Hard to say on 10.3 as close simply
    37:   // fails there. we need to wake the waiting thread, so we'll use that
    38:   // handy self pipe waker. Luckily, kqueues are responsive to new fds,
    39:   // otherwise we'd need the self pipe waker to be there from the start
    40:   // p.s. it's also bad form to destruct a demuxer while a thread waits
    41:   // on it. top marks to kqueues for making this obvious, passing fail
    42:   // to me for not applying the same to the other async demuxers.
    43:   async_quit();
    44: 
    45:   //if(syscall(SYS_close, kq) == -1)
    46:   // I don't seem to be able to close a kq. can't fstat it either
    47:   if(-1 != kq && close(kq) == -1)
    48:     perror("kqueue close");
    49: }
    50: 
    51: 
    52: // Events of interest to us ERead, EWrite.
    53: // ERead has fflags: NOTE_LOWAT, NOTE_EOF. ident is a descriptor (any?)
    54: 
    55: // if you're using the kqueue_demuxer to do a single biderectional wakeup,
    56: // be aware that it currently breaks the "one shot" rule, that is you
    57: // make get an unexpected wakeup the next time you call wait.
    58: int
    59: kqueue_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    60: {
    61:   // we only know these flags
    62:   if((flags & ~(PDEMUX_READ | PDEMUX_WRITE))) return -1;
    63: 
    64: // could set wakeup_flags here of what's been installed!
    65: 
    66:   // FUCK - can only do one at a time with kqueues
    67:   // For those doing both, if one fails, you're in a bit of trouble.
    68:   if(flags & PDEMUX_READ)
    69:   {
    70:     if(add_kqueue_filter(sv, EVFILT_READ) == -1) return -1;
    71:   }
    72: 
    73:   if(flags & PDEMUX_WRITE)
    74:   {
    75:     if(add_kqueue_filter(sv, EVFILT_WRITE) == -1) return -1;
    76:   }
    77: 
    78:   return 0;
    79: }
    80: 
    81: int
    82: kqueue_demuxer::add_kqueue_filter(socket_wakeup* sv, short filter)
    83: {
    84:   int       s = sv->s;
    85:   struct kevent evt;
    86: 
    87:   // this works just like select if the s is a listening socket
    88:   // *except* works with all types of fds, including pipes, files & fifos
    89:   // can set low water mark for reads with NOTE_LOWAT in fflags and size
    90:   // in data. on return data contains number of bytes available to read
    91:   // on return sets EV_EOF in flags if read dir socket has shutdown and
    92:   // returns a possible socket err in fflags
    93:   // should that be EV_ENABLE | EV_ADD. fflags zero cos I don't know what
    94:   // to put there. pass pb in udata
    95: 
    96:   // adding EV_ONESHOT to save me removing on wakeup (a syscall).
    97:   // I now require that during the evt be removed before wakeup fn.
    98: 
    99:   EV_SET(&evt, s, filter, EV_ADD | EV_ONESHOT, 0, 0, sv);
   100:   // trying to detect when have reached eof with async file io using kq
   101:   //EV_SET(&evt, s, EVFILT_READ, EV_ADD, | EV_ONESHOT NOTE_LOWAT, 16*1024, sv);
   102: 
   103:   // add event
   104:   if(kevent(kq, &evt, 1, NULL, 0, NULL) < 0)
   105:   {
   106:     perror("kevent add_kqueue_filter");
   107:     return -1;
   108:   }
   109:   return 0;
   110: }
   111: 
   112: // useful, but unused atm, thanks to ONESHOT.
   113: // this function skirts the portability boundaries of the demux interface
   114: // for kqueues each event monitor is identified (for sockets) by a pair,
   115: // (s, filter), or for us, (s, {in|out}). This means that we can add
   116: // individual wakeups for reading and writing but not both at once.
   117: // I think epoll can do both, and so can select (and N/A to IOCPs).
   118: // This "both at once" thing can't easily be one shot. There's a good
   119: // case for this behaviour to be defined "undefined". Not many people
   120: // using this part - could be caveat emptor...
   121: int
   122: kqueue_demuxer::remove_kqueue_filter(int s, short filter)
   123: {
   124:   struct kevent evt;
   125: 
   126:   EV_SET(&evt, s, filter, EV_DELETE, 0, 0, NULL);
   127:   if(kevent(kq, &evt, 1, NULL, 0, NULL) < 0)
   128:   {
   129:     perror("kevent remove_socket_wakeup");
   130:     return -1;
   131:   }
   132: 
   133:   return 0;
   134: }
   135: 
   136: int
   137: kqueue_demuxer::remove_socket_wakeup(int s, int flags)
   138: {
   139:   int r1 = 0, r2 = 0;
   140: 
   141:   if(flags & PDEMUX_READ) r1 = remove_kqueue_filter(s, EVFILT_READ);
   142:   if(flags & PDEMUX_WRITE) r2 = remove_kqueue_filter(s, EVFILT_WRITE);
   143: 
   144:   // If you want to know which one failed, you're out of luck.
   145:   if(r1 || r2) return -1;
   146: 
   147:   return 0;
   148: }
   149: 
   150: // from "advanced macos programming", on reading shutdown causes
   151: // the EV_EOF flag to be set in the flags field and returns errno
   152: // in the fflags field. There may still be pending data to read
   153: // when EV_EOF is set. The data field says how many bytes available.
   154: // for writing data says how much you can write. EV_EOF is set
   155: // when the reader "disconnects". Says nothing about errno/fflags
   156: // in this case.
   157: /*
   158:     fprintf(stderr,"readevt on %i, EOF = %s\n",
   159:       s, (ev.flags & EV_EOF) ? "TRUE" : "FALSE");
   160:  */
   161: 
   162: // do that thing where you get the events. can I get them one at a time?
   163: // I bet I can.
   164: void
   165: kqueue_demuxer::get_evts(bool poll)
   166: {
   167:   // event seems to remain unless we remove it
   168:   struct kevent ev;
   169:   int       nevts;
   170: 
   171:   struct timespec timeout, *tptr = NULL;
   172: 
   173:   if(poll)
   174:   {
   175:     timeout.tv_sec = 0;   // effectuate a poll
   176:     timeout.tv_nsec = 0;
   177:     tptr = &timeout;
   178:   }
   179: 
   180:   // timeout.tv_sec = 1;    // timeout every second
   181:   // timeout.tv_nsec = 0; // 10^9 nanoseconds per second
   182: 
   183:   nevts = kevent(kq, NULL, 0, &ev, 1, tptr);  // wait or poll
   184: 
   185:   if(nevts <= 0)
   186:   {
   187:     // error, else timeout & return to allow cancel
   188:     if(nevts < 0)
   189:       perror("kevent event fetch");
   190: 
   191:     return;
   192:   }
   193: 
   194:   // fprintf(stderr,"kqueue wakeup!\n");
   195: 
   196:   socket_wakeup*  sv = (socket_wakeup*)ev.udata;
   197: 
   198:   // The filters are not bit fields, hence they must come in serially.
   199:   // this means you're never going to get both read and write on
   200:   // a kqueue_demuxer wake up. No worries.
   201:   if(ev.filter == EVFILT_READ)
   202:   {
   203:   // this capability is lost for the moment, as we have no way
   204:   // of explaining it to felix. the event stuff isn't so good right now
   205: /*
   206:     // can chunk up on accepts. nice one kqueue
   207:     if(NULL == sv)      // => listener
   208:     {
   209:       int backlog = (int)ev.data;
   210:       // fprintf(stderr,"kq listen backlog: %i\n", backlog);
   211:       for(int i = 0; i < backlog; i++) handle_connection();
   212:     }
   213:     else
   214: */
   215:     // If a socket wakeup were a control block, you'd set the err here.
   216:     if(0 && ev.flags & EV_EOF)
   217:     {
   218:       // errno in fflags!
   219:       fprintf(stderr,
   220:         "got EV_EOF on read, %i bytes remain in buffer, errno=%i\n",
   221:         (int)ev.data, ev.fflags);
   222:     }
   223:     // fprintf(stderr,"EVFILT_READ: got %i bytes coming\n", (int)ev.data);
   224:     // remove_reading_fd(s);      // now useing EV_ONESHOT
   225: // remove other outstanding here...?
   226:     sv->wakeup_flags = PDEMUX_READ;   // Tell 'em what they've won.
   227:     sv->wakeup(*this);
   228:   }
   229:   else if(ev.filter == EVFILT_WRITE)
   230:   {
   231:     // fprintf(stderr,"EVFILT_WRITE: can write (?) %i bytes\n",
   232:     //  (int)ev.data);
   233: 
   234:     // using oneshot mode now.
   235:     // remove_writing_fd(s);
   236: 
   237:     if(ev.flags & EV_EOF)
   238:     {
   239:       // errno in fflags? data should be zero bytes, right?
   240:       // can't write anything
   241:       fprintf(stderr,
   242:         "got EV_EOF on write, data bytes =%i (0?), errno/fflags?=%i\n",
   243:         (int)ev.data, ev.fflags);
   244:     }
   245: // remove other outstanding here?
   246:     sv->wakeup_flags = PDEMUX_WRITE;
   247:     sv->wakeup(*this);
   248:   }
   249:   else
   250:   {
   251:     fprintf(stderr,"unsolicited event from kqueue (%i)...\n", ev.filter);
   252:     // no wakeup
   253:   }
   254: }
   255: }}
   256: 
   257: 
End cpp section to demux/demux_kqueue_demuxer.cpp[1]
Start cpp section to demux/demux_poll_demuxer.hpp[1 /1 ]
     1: #line 3 "./lpsrc/flx_poll_demux.ipk"
     2: #ifndef __FLX_DEMUX_POLL_DEMUXER_H__
     3: #define __FLX_DEMUX_POLL_DEMUXER_H__
     4: 
     5: #include <flx_demux_config.hpp>
     6: #include "demux_posix_demuxer.hpp"
     7: 
     8: // not re-entrant
     9: 
    10: namespace flx { namespace demux {
    11: 
    12: class DEMUX_EXTERN poll_demuxer : public posix_demuxer {
    13:   void*  fd_array;    // make him stop!
    14:   void*  sv_array;
    15: 
    16:   virtual void get_evts(bool poll);
    17: public:
    18:   poll_demuxer();
    19:   virtual ~poll_demuxer();
    20: 
    21:   virtual int add_socket_wakeup(socket_wakeup* sv, int flags);
    22: 
    23:   void get_arrays(void** fds, void** svs);
    24:   int dopoll(void* infds, bool poll_flag);    // returns nevents
    25:   void process_evts(void* infds, void* svs, int nevts);
    26: };
    27: 
    28: } }
    29: #endif
    30: 
End cpp section to demux/demux_poll_demuxer.hpp[1]
Start cpp section to demux/demux_poll_demuxer.cpp[1 /1 ]
     1: #line 34 "./lpsrc/flx_poll_demux.ipk"
     2: #include <stdio.h>      // my friend printf
     3: #include <poll.h>
     4: #include <assert.h>
     5: #include "demux_poll_demuxer.hpp"
     6: 
     7: #include <vector>
     8: 
     9: namespace flx { namespace demux {
    10: 
    11: using namespace std;
    12: 
    13: typedef vector<socket_wakeup*> sockvec;
    14: typedef vector<struct pollfd> fdvec;
    15: 
    16: #define FDS ((fdvec*)fd_array)
    17: #define SOCS ((sockvec*)sv_array)
    18: 
    19: // be aware that under os x 10.3 (and other systems?), poll is a thin
    20: // user level layer on top of select and so of no real advantage.
    21: // the select emulation doesn't seem to give POLLHUP errs, either.
    22: // under 10.4 poll doesn't appear to be calling select, so I guess it's
    23: // all good.
    24: 
    25: poll_demuxer::poll_demuxer()
    26:   : fd_array(0), sv_array(0)
    27: {
    28:   // fprintf(stderr, "poll_demuxer ctor\n");
    29: }
    30: 
    31: poll_demuxer::~poll_demuxer()
    32: {
    33:   // fprintf(stderr, "poll_demuxer dtor\n");
    34:   if(SOCS) delete SOCS;
    35:   if(FDS) delete FDS;
    36: }
    37: 
    38: // breaking up for a thread safe impl
    39: void
    40: poll_demuxer::get_arrays(void** fds, void** svs)
    41: {
    42:   *fds = fd_array;
    43:   *svs = sv_array;
    44: 
    45:   fd_array = 0;
    46:   sv_array = 0;
    47: }
    48: 
    49: // returns nevents
    50: int
    51: poll_demuxer::dopoll(void* infds, bool poll_flag)
    52: {
    53:   fdvec*    fds_copy = (fdvec*)infds;
    54: 
    55:   if(!fds_copy)
    56:   {
    57:     if(!poll_flag) fprintf(stderr, "Warning ::poll(\\inf) on zero fds!\n");
    58:     return 0;
    59:   }
    60: 
    61:   struct pollfd*  fds = &(*fds_copy)[0];
    62:   // I sometimes end up with nothing to watch in poll mode
    63:   // this type doesn't seem to exist for the 10.3 emulated poll.
    64:   unsigned long nfds = (*fds_copy).size();
    65:   // nfds_t  nfds = (*fds_copy).size();
    66:   int      nevts;
    67: 
    68:   // fprintf(stderr, "calling ::poll with %p*%i fds\n", fds, nfds);
    69:   // -1 for timeout means wait indefinitely
    70:   nevts = ::poll(fds, nfds, (poll_flag) ? 0 : -1);
    71: 
    72:   if(-1 == nevts)
    73:   {
    74:     perror("poll_demuxer::get_evts");
    75:     return 0;
    76:   }
    77: 
    78:   return nevts;    // zero => timeout
    79: }
    80: 
    81: // processes events, calls callbacks, deletes fds & svs upon completion.
    82: // Seems a bit busy, no?
    83: void
    84: poll_demuxer::process_evts(void* infds, void* svs, int nevts)
    85: {
    86:   // Optimisation: when no events (due to timeout) and our fds
    87:   // are null (nothing changed in the meantime), just set the
    88:   // fd and svs members to the incoming ones and return
    89: 
    90:   // fprintf(stderr, "poll::process_evts: %i, %p\n", nevts, fd_array);
    91: 
    92:   if(0 == nevts && !fd_array)
    93:   {
    94:     // fprintf(stderr, "Optimising by resetting arrays!\n");
    95:     assert( !sv_array );  // keep in sync
    96:     fd_array = infds;
    97:     sv_array = svs;
    98:     return;
    99:   }
   100: 
   101:   fdvec*    fds_copy = (fdvec*)infds;
   102:   sockvec*  socs_copy = (sockvec*)svs;
   103: 
   104:   struct pollfd*  fds = &(*fds_copy)[0];
   105:   unsigned long nfds = (*fds_copy).size();
   106:   // nfds_t  nfds = (*fds_copy).size();
   107: 
   108:   // sanity check. looks like read and write count as 1 each
   109:   int    evts_encountered = 0;
   110: 
   111:   // examine all fds for signs of life. try early out with nevts?
   112:   // for(nfds_t i = 0; i < nfds; i++, fds++)
   113:   for(unsigned long i = 0; i < nfds; i++, fds++)
   114:   {
   115:     // fprintf(stderr, "fds[%i]->revents=%x\n", i, fds->revents);
   116: 
   117:     socket_wakeup* sv = (*socs_copy)[i];
   118: 
   119:     // accumulate bit field of what we got
   120:     // don't touch original bits, we might have to restore them
   121:     int  wakeup_flags = 0;
   122: 
   123:     bool    wake = false;
   124: 
   125:     // it might be possible to get both a read & write event...
   126:     // in which case I should take out the else below
   127:     if(fds->revents & POLLIN)                // I think this is how you do it
   128:     {
   129:           // fprintf(stderr,"POLLIN for %p->%i\n", sv, sv->s);
   130:       wakeup_flags |= PDEMUX_READ;
   131:       wake = true;
   132:       evts_encountered++;
   133:     }
   134: 
   135:     if(fds->revents & POLLOUT)
   136:     {
   137:       // fprintf(stderr,"POLLOUT for %p->%i\n", sv, sv->s);
   138:       wakeup_flags |= PDEMUX_WRITE;
   139:       wake = true;
   140:       evts_encountered++;
   141:     }
   142: 
   143:     // check here for the unsolicited POLLERR, POLLHUP and POLLNVALs
   144:     if(fds->revents & POLLERR)
   145:     {
   146:       fprintf(stderr, "POLLERR for %p->%i\n", sv, sv->s);
   147:       wakeup_flags |= PDEMUX_ERROR;
   148:       wake = true;    // good to do?
   149:     }
   150: 
   151:     // device has been disconnected. this and POLLOUT are mutually exclusive.
   152:     // a stream can never be writeable again if a hangup has occured.
   153:     // I've seen POLLHUPs come in for shutdown(s, 1). In this case you want
   154:     // the wakeup, at least if you were waiting to write. POLLHUPs also seem
   155:     // to be the message/wake up when reading from a connection that has
   156:     // closed: you get the remaining bytes, but via POLLHUP rathern POLLOUT.
   157:     // perhaps not worth printing, seeing as this usage is quite common
   158:     if(fds->revents & POLLHUP)
   159:     {
   160:       fprintf(stderr, "POLLHUP for %p->%i\n", sv, sv->s);
   161:       assert((fds->revents & POLLOUT) == 0);
   162:       wakeup_flags |= PDEMUX_EOF;
   163:       wake = true;    // good to do? probably.
   164:     }
   165: 
   166:     // Invalid fd. We shouldn't ever get that.
   167:     if(fds->revents & POLLNVAL)
   168:     {
   169:       fprintf(stderr, "POLLNVAL for %p->%i\n", sv, sv->s);
   170:       wake = true;    // good to do?
   171:     }
   172: 
   173:     if(wake)
   174:     {
   175:       // 1-1 wakeups with add_sockets
   176:       // be aware that callback may add back...
   177:       sv->wakeup_flags = wakeup_flags;
   178:       sv->wakeup(*this);
   179:     }
   180:     else
   181:     {
   182:       // reinstall for the next iteration. note that we keep a copy
   183:       // of the flags in sv->wakeup_flags, set on adding. that belongs
   184:       // to us so there should be no problem there.
   185:       //fprintf(stderr, "poll::readding: %i, %x\n",
   186:       //  sv->s, sv->wakeup_flags);
   187:       if(add_socket_wakeup(sv, sv->wakeup_flags) == -1)
   188:         fprintf(stderr, "poll re-add finished immediately!?!\n");
   189:     }
   190:   }
   191: 
   192:   // keep the bastards honest
   193:   if(evts_encountered != nevts)
   194:   {
   195:     fprintf(stderr, "poll seen/nevts mismatch: %i/%i\n",
   196:       evts_encountered, nevts);
   197:   }
   198: 
   199:   // delete all here.
   200:   delete fds_copy;
   201:   delete socs_copy;
   202: }
   203: 
   204: 
   205: // poll is the call, call the bool poll_flag
   206: void
   207: poll_demuxer::get_evts(bool poll_flag)
   208: {
   209:   // fprintf(stderr, "poll_demuxer::get_evts\n");
   210:   void    *fds, *svs;
   211:   int      nevts;
   212: 
   213:   get_arrays(&fds, &svs);    // we now own them. must call process_evts
   214:                 // to give them back.
   215: 
   216:   nevts = this->dopoll(fds, poll_flag);
   217: 
   218:   // don't shortcut based on nevts being zero - pass it on to process_evts
   219:   // it recongnises the optimisation opportunity uation and handles it,
   220:   // which has the advantage of benefiting the threadsafe version too.
   221:   process_evts(fds, svs, nevts);
   222: }
   223: 
   224: // precondition: not currently in get_evts (not reentrant)
   225: int
   226: poll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
   227: {
   228:   // fprintf(stderr, "poll::add_socket_wakeup: %p->%i, %x\n",
   229:   //  sv, sv->s, flags);
   230: 
   231:   if(!FDS)
   232:   {
   233:     // fprintf(stderr, "creating fds and svns\n");
   234:     assert(SOCS == NULL);    // both should be null or non null, no mix
   235: 
   236:     fd_array = new fdvec;    // FDS
   237:     sv_array = new sockvec;    // SOCS
   238:   }
   239: 
   240:   // add to array
   241:   struct pollfd  fd;
   242: 
   243:   // note that we keep a copy of the flags, because we often have to
   244:   // copy the wakeups back that haven't had activity
   245:   sv->wakeup_flags = flags;
   246: 
   247:   fd.fd = sv->s;
   248:   fd.events = 0;
   249:   // set on output, but in the ambiguous case of a poll that returns due
   250:   // to timeout what would it be? If I can guarantee 0, then I can use the
   251:   // same piece of code to re-add the fds in the thread safe version.
   252:   fd.revents = 0;
   253: 
   254:   if(flags & PDEMUX_READ) fd.events |= POLLIN;
   255:   if(flags & PDEMUX_WRITE) fd.events |= POLLOUT;
   256: 
   257:   // don't bother setting  POLLERR or POLLHUP. They're output (revents) only.
   258:   // is the same true for epoll_demuxer? I'd say so...
   259: 
   260:   // fd.revents is set on output by ::poll
   261:   // fprintf(stderr, "turning all revents bits on to test 0 output\n");
   262:   // fd.revents = -1;
   263: 
   264:   assert(0 != fd.events);
   265: 
   266:   // add to array along with sv pointer
   267:   FDS->push_back(fd);
   268:   SOCS->push_back(sv);
   269: 
   270:   return 0;      // there'll be a wakeup
   271: }
   272: 
   273: } }
   274: 
End cpp section to demux/demux_poll_demuxer.cpp[1]
Start cpp section to demux/demux_ts_poll_demuxer.hpp[1 /1 ]
     1: #line 309 "./lpsrc/flx_poll_demux.ipk"
     2: #ifndef __FLX_DEMUX_TS_POLL_DEMUXER_H__
     3: #define __FLX_DEMUX_TS_POLL_DEMUXER_H__
     4: 
     5: // thread safe version of poll_demuxer
     6: 
     7: #include "demux_poll_demuxer.hpp"
     8: #include "demux_self_piper.hpp"     // self pipe trick
     9: #include "pthread_mutex.hpp"
    10: 
    11: namespace flx { namespace demux {
    12: 
    13: class ts_poll_demuxer : public posix_demuxer {
    14:   // lock
    15:   flx::pthread::flx_mutex_t    ham_fist;
    16:   // protects this little fella here.
    17:   poll_demuxer    demux;
    18: 
    19:   self_piper      sp;
    20: protected:
    21:   virtual void    get_evts(bool poll);
    22: public:
    23:   ts_poll_demuxer();
    24:   ~ts_poll_demuxer();
    25: 
    26:   virtual int     add_socket_wakeup(socket_wakeup* sv, int flags);
    27: 
    28:   // pass it on to composed non-threadsafe demuxer
    29:   virtual demux_quit_flag* get_quit_flag() { return demux.get_quit_flag(); }
    30:   virtual void set_quit_flag(demux_quit_flag* f) { demux.set_quit_flag(f); }
    31: };
    32: 
    33: }} // namespace demux, flx
    34: 
    35: #endif
    36: 
End cpp section to demux/demux_ts_poll_demuxer.hpp[1]
Start cpp section to demux/demux_ts_poll_demuxer.cpp[1 /1 ]
     1: #line 346 "./lpsrc/flx_poll_demux.ipk"
     2: #include "demux_ts_poll_demuxer.hpp"
     3: #include <stdio.h>
     4: 
     5: namespace flx { namespace demux {
     6: 
     7: ts_poll_demuxer::ts_poll_demuxer()
     8: {
     9:   //fprintf(stderr, "ts_poll_demuxer installing self-piper\n");
    10:   // install self pipe trick.
    11:   sp.install(&demux);
    12: }
    13: 
    14: ts_poll_demuxer::~ts_poll_demuxer()
    15: {
    16:   //fprintf(stderr, "ts_polling asking thread to quit\n");
    17:   async_quit();  // get async waiting thread to stop waiting
    18:   //fprintf(stderr, "ts_poll async quit finished\n");
    19: }
    20: 
    21: void
    22: ts_poll_demuxer::get_evts(bool poll)
    23: {
    24:   void  *fds, *svs;
    25: 
    26:   // copy args under lock
    27:   {
    28:     flx::pthread::flx_mutex_locker_t  locker(ham_fist);
    29:     demux.get_arrays(&fds, &svs);  // the arrays are now mine
    30:     // lock released
    31:   }
    32: 
    33:   // do the poll
    34:   int  nevts = demux.dopoll(fds, poll);
    35: 
    36:   // regardless of the number of events, I have to copy the pieces back
    37:   // under lock tutelage
    38:   {
    39:     flx::pthread::flx_mutex_locker_t  locker(ham_fist);
    40:     demux.process_evts(fds, svs, nevts);
    41:     // lock released
    42:   }
    43: }
    44: 
    45: // thread safe overloaded functions follow. all acquire the lock
    46: // then call the unthreadsafe version of the function. nice!
    47: int
    48: ts_poll_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    49: {
    50:   // fprintf(stderr, "ts_poll::add_sock(%i, %x)\n", sv->s, flags);
    51:   flx::pthread::flx_mutex_locker_t  locker(ham_fist);
    52: 
    53:   int  res = demux.add_socket_wakeup(sv, flags);
    54:   // I wouldn't touch the sv after this.
    55: 
    56:   if(-1 != res) sp.wake();
    57: 
    58:   return res;
    59: }
    60: }}
    61: 
    62: 
End cpp section to demux/demux_ts_poll_demuxer.cpp[1]
Start cpp section to demux/demux_select_demuxer.hpp[1 /1 ]
     1: #line 3 "./lpsrc/flx_select_demux.ipk"
     2: #ifndef __FLX_DEMUX_SELECT_DEMUXER_H__
     3: #define __FLX_DEMUX_SELECT_DEMUXER_H__
     4: 
     5: #include "demux_posix_demuxer.hpp"
     6: #include <sys/types.h>    // for sys/select.h on osx
     7: #include <sys/select.h>   // for fd_set
     8: #include <sys/time.h>     // GUSI WTF?
     9: #include <unistd.h>       // for bsd
    10: 
    11: // Unlike the other demuxers, this one is NOT thread safe, so wait and
    12: // add socket wakeup must only be called from the same thread.
    13: // if you're looking for the thread safe version, try ts_select_demuxer
    14: 
    15: namespace flx { namespace demux {
    16: 
    17: class DEMUX_EXTERN select_demuxer : public posix_demuxer {
    18:   void  remove_fd(int s);
    19: 
    20:   // thanks Beej!
    21:   fd_set      master_read_set;    // fd watched for reading
    22:   fd_set      master_write_set;   // for writing
    23:   fd_set      master_except_set;    // for exceptions
    24: 
    25:   // read sveglias - note we only have one set, so currently this demuxer
    26:   // cannot have separate wakeups for the same file descriptor. this
    27:   // fits in fine with the "undefined" nature of doing that.
    28:   socket_wakeup*  svs[FD_SETSIZE];    // read sveglias
    29:   //socket_wakeup*  write_svs[FD_SETSIZE];  // write wakeups
    30: 
    31:   int       fdmax;          // high watermark for select
    32: 
    33: protected:
    34:   virtual void  get_evts(bool poll);
    35: 
    36: public:
    37:   // get_evts broken into pieces for thread safe implementations
    38:   void copy_sets(fd_set& rset, fd_set& wset, fd_set& exset);
    39:   // returns true if process_sets should be called.
    40:   bool select(fd_set& rset, fd_set& wset, fd_set& exset, bool poll);
    41:   // these could be consts
    42:   void process_sets(fd_set& rset, fd_set& wset, fd_set& exset);
    43: 
    44:   select_demuxer();
    45: 
    46:   virtual int   add_socket_wakeup(socket_wakeup* sv, int flags);
    47: };
    48: }} // namespace demux, flx
    49: #endif
    50: 
End cpp section to demux/demux_select_demuxer.hpp[1]
Start cpp section to demux/demux_ts_select_demuxer.hpp[1 /1 ]
     1: #line 54 "./lpsrc/flx_select_demux.ipk"
     2: #ifndef __FLX_DEMUX_TS_SELECT_DEMUXER_H__
     3: #define __FLX_DEMUX_TS_SELECT_DEMUXER_H__
     4: 
     5: #include "demux_select_demuxer.hpp"
     6: #include "demux_self_piper.hpp"
     7: #include "pthread_mutex.hpp"
     8: 
     9: namespace flx { namespace demux {
    10: 
    11: // thread safe version of select demuxer
    12: 
    13: class DEMUX_EXTERN ts_select_demuxer : public posix_demuxer {
    14:   // lock
    15:   flx::pthread::flx_mutex_t      ham_fist;
    16:   // protects this little fella here.
    17:   select_demuxer  demux;
    18: 
    19:   // self pipe trick for waking waiting thread when we like.
    20:   // for demuxer responsiveness.
    21:   self_piper sp;
    22: protected:
    23:   virtual void    get_evts(bool poll);
    24: public:
    25:   ts_select_demuxer();
    26:   ~ts_select_demuxer();
    27: 
    28:   virtual int   add_socket_wakeup(socket_wakeup* sv, int flags);
    29: 
    30:   // pass it on to composed non-threadsafe demuxer
    31:   virtual demux_quit_flag* get_quit_flag() { return demux.get_quit_flag(); }
    32:   virtual void set_quit_flag(demux_quit_flag* f) { demux.set_quit_flag(f); }
    33: };
    34: }} // namespace demux, flx
    35: 
    36: #endif
    37: 
End cpp section to demux/demux_ts_select_demuxer.hpp[1]
Start cpp section to demux/demux_select_demuxer.cpp[1 /1 ]
     1: #line 92 "./lpsrc/flx_select_demux.ipk"
     2: // P.S. for current impl don't need the pthreads. WHOO!!!
     3: 
     4: // A very light wrapper around select, that allows the addition
     5: // of new sockets and returns status in a queue.
     6: // on the powerbook with 10.3, FD_SETSIZE is 1024, that means
     7: // max 1024 sockets. That's kind of lame. See IO completion ports
     8: // on NT for a better solution.
     9: 
    10: // See ACE_Handle_Set_Iterator for an optimised seelect bit field examination
    11: // algorithm (p149 C++ network programming, volume1)
    12: 
    13: // see epoll, kqueue & IOCPs
    14: 
    15: // is select level triggered? I think it is.
    16: // strangely, I'm never seeing anything from the exception set.
    17: 
    18: #include "demux_select_demuxer.hpp"
    19: 
    20: #include <assert.h>
    21: #include <string.h>       // memset
    22: 
    23: #include <stdio.h>        // printf debug
    24: #include <stdlib.h>
    25: #include "demux_sockety.hpp"  // get_socket_error
    26: #include <memory>
    27: #include <sys/time.h>
    28: 
    29: namespace flx { namespace demux {
    30: 
    31: select_demuxer::select_demuxer()
    32: {
    33:   // clear these guys. after the thread starts, access to them will have
    34:   // to be via the lock
    35:   FD_ZERO(&master_read_set);
    36:   FD_ZERO(&master_write_set);
    37:   FD_ZERO(&master_except_set);
    38:   fdmax = 0;        // corresponds to stdin, which we're not using
    39: 
    40:   // clear this possibly quite large list
    41:   //memset(svs, 0, sizeof(svs));
    42:   //JS: memset must not be used except for raw data or chars
    43:   std::uninitialized_fill_n(svs,FD_SETSIZE,(socket_wakeup*)0);
    44: }
    45: 
    46: // one select, must not block indefinitely, so choose a timeslice
    47: // or find a way to make it wake on command, like a dummy socket
    48: void
    49: select_demuxer::get_evts(bool poll)
    50: {
    51:   // to use select we must copy our arguments, as it changes them!
    52:   // this code has been broken up in to pieces so that I can implement
    53: 
    54:   fd_set  read_set, write_set, except_set;
    55: 
    56:   copy_sets(read_set, write_set, except_set);
    57: 
    58:   if(select(read_set, write_set, except_set, poll))
    59:     process_sets(read_set, write_set, except_set);
    60: }
    61: 
    62: int
    63: select_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    64: {
    65:   int s = sv->s;
    66: 
    67:   // fprintf(stderr, "adding select wakeup for %i, flags=%x\n", s, flags);
    68: 
    69:   if(s < 0 || s >= FD_SETSIZE) return -1; // weakness of select
    70: 
    71:   assert(svs[s] == NULL);         // sanity check: nothing there
    72: 
    73:   if(flags & PDEMUX_READ) FD_SET(s, &master_read_set);
    74: 
    75:   if(flags & PDEMUX_WRITE) FD_SET(s, &master_write_set);
    76: 
    77:   // does this mean we could add a non-reading, non-writing socket
    78:   // and wait for errors on it?
    79:   FD_SET(s, &master_except_set);
    80: 
    81:   svs[s] = sv;              // record wakeup. ours now.
    82: 
    83: 
    84:   if(s > fdmax) fdmax = s;        // update highwater mark
    85: 
    86:   return 0;
    87: }
    88: 
    89: // removes for both reading AND writing.
    90: void
    91: select_demuxer::remove_fd(int s)
    92: {
    93:   // fprintf(stderr, "removing select fd: %i\n", s);
    94: 
    95:   assert(s >= 0 && s < FD_SETSIZE);
    96:   assert(svs[s] != NULL);         // there should be something there
    97: 
    98:   // clear them all regardless.
    99:   FD_CLR(s, &master_read_set);
   100:   FD_CLR(s, &master_write_set);
   101:   FD_CLR(s, &master_except_set);
   102: 
   103:   svs[s] = NULL;
   104: }
   105: 
   106: // virtual functions to be overridden for thread safe descendent
   107: void
   108: select_demuxer::copy_sets(fd_set& rset, fd_set& wset, fd_set& exset)
   109: {
   110:   rset = master_read_set;
   111:   wset = master_write_set;
   112:   exset = master_except_set;
   113: }
   114: 
   115: bool
   116: select_demuxer::select(fd_set& rset, fd_set& wset, fd_set& exset, bool poll)
   117: {
   118:   // this is depending on my fake socket to wakeup. perhaps use the timer
   119:   // for now.
   120:   struct timeval  tv, *tp = NULL;
   121: 
   122:   if(poll)
   123:   {
   124:     tv.tv_sec = 0;
   125:     tv.tv_usec = 0;
   126:     tp = &tv;
   127:   }
   128: 
   129:   // the return value here actually has significance
   130:   // sometimes I have to try again, or weed out bad fds.
   131:   //if(select(fdmax+1, &read_set, &write_set, &except_set, &tv) == -1)
   132: // nah! wait forever. none of these things shutdown properly yet.
   133: // it'll force the async new wakeup responsiveness
   134:   switch(::select(fdmax+1, &rset, &wset, &exset, tp))
   135:   {
   136:     case 0:
   137:       return false;   // timed out, don't process sets
   138:     break;
   139:     case -1:
   140:     // not the ideal reaction. I think this is where I weed out
   141:     // the bad socket(s). would need error set.
   142: 
   143:     // closing a socket without removing can get us here. that's pretty
   144:     // nasty, because our data would be stale. Try not to do that. I
   145:     // wonder if the except set would tell us when the socket was
   146:     // closed on us? Damn, you have to clear it, else you keep getting
   147:     // the same error.
   148:       perror("select");
   149:       // fall through and examine except set
   150:     break;
   151:   }
   152:   return true;    // call process_sets
   153: }
   154: 
   155: void
   156: select_demuxer::process_sets(fd_set& rset, fd_set& wset, fd_set& exset)
   157: {
   158:   // since we're about to traverse the socket sets anyway, we should
   159:   // note the highest fd seen, and make that the highwater mark.
   160:   // that way we wouldn't be guaranteed monotonically degrading performance.
   161: 
   162:   // might be worth keeping a low water mark as well.
   163:   // I guess this is why select sucks. On osx we can only watch
   164:   // about 1024 sockets. That sucks too. could allocate larger sets
   165:   // with malloc... see c++ network programming book.
   166: 
   167:   // like kqueues, this code could theoretically handle separate wakeups
   168:   // for read and write, should I do it? not right now.
   169:   int new_fdmax = 0;
   170: 
   171:   for(int i = 0; i <= fdmax; i++)
   172:   {
   173:     int   flags = 0;
   174: 
   175:     if(FD_ISSET(i, &rset)) flags |= PDEMUX_READ;
   176: 
   177:     if(FD_ISSET(i, &wset)) flags |= PDEMUX_WRITE;
   178: 
   179:     // sorta suggests that I ought to call the wakeup and pass
   180:     // an error flag on to it.
   181:     if(FD_ISSET(i, &exset))
   182:     {
   183:       // don't remove bad sockets - it's an error to close the socket
   184:       // or deallocate the wakeup without telling the source. when
   185:       // we get socket errors, we'd better hope that there's reading
   186:       // or writing to be done.
   187:       // under cygwin, closing down a socket (read, write or both)
   188:       // causes select to wake up with an exception bit. out of cygwin
   189:       // we only wake up. In both cases, the read bit is set so
   190:       // just handling the stuff seems to work. not sure about write.
   191:       // posix_demuxer::socket_recv thinks the connection's closed, but
   192:       // it all seems to work out. Yours, Confused.
   193: 
   194:       fprintf(stderr, "select error on socket %i, flags=%x\n",
   195:         i, flags);
   196: 
   197:       int err;
   198:       // heh heh, this isn't great to call on the pipe that is used
   199:       // in the self pipe trick. I don't know why it's getting an
   200:       // err anyway.
   201:       if(get_socket_error(i, &err) == -1)
   202:         fprintf(stderr, "get_socket_error failed!?!\n");
   203: 
   204:       fprintf(stderr, "socket err = %i, %s\n", err, strerror(err));
   205:       // don't remove! see below
   206:       // remove_fd(i);
   207:     }
   208: 
   209:     //
   210:     if(flags)
   211:     {
   212:       socket_wakeup*  sv = svs[i];
   213:       // remove before wakeup so wakeup can add itself back,
   214:       // if necessary.
   215:       remove_fd(i);
   216: 
   217:       sv->wakeup_flags = flags;
   218:       sv->wakeup(*this);
   219:     }
   220: 
   221:     // to lower high-watermark, keep track of highest seen.
   222:     if(svs[i]) new_fdmax = i;
   223:   }
   224: 
   225:   // fprintf(stderr, "new_fdmax=%i, fdmax=%i\n", new_fdmax, fdmax);
   226: 
   227:   fdmax = new_fdmax;      // copy it back
   228: }
   229: 
   230: }} // flx, demux
   231: 
End cpp section to demux/demux_select_demuxer.cpp[1]
Start cpp section to demux/demux_ts_select_demuxer.cpp[1 /1 ]
     1: #line 324 "./lpsrc/flx_select_demux.ipk"
     2: #include "demux_ts_select_demuxer.hpp"
     3: 
     4: // #include <stdio.h>
     5: 
     6: namespace flx { namespace demux {
     7: 
     8: ts_select_demuxer::ts_select_demuxer()
     9: {
    10:   // fprintf(stderr, "creating pipe for self-pipe trick\n");
    11:   // install self pipe trick
    12:   sp.install(&demux);
    13: }
    14: 
    15: ts_select_demuxer::~ts_select_demuxer()
    16: {
    17:   async_quit(); // wake thread, ask to leave.
    18: }
    19: 
    20: void
    21: ts_select_demuxer::get_evts(bool poll)
    22: {
    23:   fd_set  rset, wset, exset;
    24: 
    25:   // copy args under lock
    26:   {
    27:     flx::pthread::flx_mutex_locker_t locker(ham_fist);
    28:     demux.copy_sets(rset, wset, exset);
    29:   }
    30: 
    31:   // process arg set under lock. note that the select demuxer is passed
    32:   // to wakeups, so no recursive lock is needed. also, because any
    33:   // readditions caused by the callback are done to the naive demuxer,
    34:   // no selfpipe writes are required either. the only thing to remember
    35:   // is that the wakeup recipient should not be surprised to see a demuxer
    36:   // in its callback different to the one it originally added to.
    37:   if(demux.select(rset, wset, exset, poll))
    38:   {
    39:     flx::pthread::flx_mutex_locker_t locker(ham_fist);
    40:     demux.process_sets(rset, wset, exset);
    41:   }
    42: }
    43: 
    44: // thread safe overloaded functions follow. all acquire the lock
    45: // then call the unthreadsafe version of the function. nice!
    46: int
    47: ts_select_demuxer::add_socket_wakeup(socket_wakeup* sv, int flags)
    48: {
    49:   // fprintf(stderr, "ts_select::add: %p->%i (%s), %x\n",
    50:   //  sv, s, (s == self_pipe_fds[0]) ? "self pipe" : "socket", flags);
    51:   flx::pthread::flx_mutex_locker_t locker(ham_fist);
    52: 
    53:   int res = demux.add_socket_wakeup(sv, flags);
    54:   // I wouldn't touch the sv after this.
    55: 
    56:   // we have a new socket, so wake the event waiting thread
    57:   // for great responsiveness.
    58:   if(-1 != res) sp.wake();
    59: 
    60:   return res;
    61: 
    62: // we need to wake a blocking get_evts call here else we'll have bad
    63: // performance or even a lockup. the question is do we need to do it under
    64: // the tutelage of the lock?
    65: }
    66: 
    67: }}
    68: 
    69: 
End cpp section to demux/demux_ts_select_demuxer.cpp[1]
Start cpp section to demux/demux_iocp_demuxer.hpp[1 /1 ]
     1: #line 3 "./lpsrc/flx_iocp_demux.ipk"
     2: #ifndef __FLX_DEMUX_IOCP_DEMUXER_H__
     3: #define __FLX_DEMUX_IOCP_DEMUXER_H__
     4: 
     5: #include <flx_demux_config.hpp>
     6: //#include <Windows.h>
     7: // be specific - flx_rtl_config.h now jigs it so that windows.h  does not
     8: // include winsock version 1 headers by default. this was making the order
     9: // of inclusion of windows.h and winsock2.h significant with cl.exe.
    10: #include <WinSock2.h>
    11: 
    12: #include "demux_demuxer.hpp"
    13: #include "pthread_sleep_queue.hpp"
    14: 
    15: 
    16: namespace flx { namespace demux {
    17: 
    18: // not here? returns INVALID_SOCKET on failure.
    19: // if *io_port == 0, then a port is chosen and returned in *io_port
    20: SOCKET DEMUX_EXTERN create_listener_socket(int* io_port, int backlog);
    21: // these two probably not used. move to wsockety.h
    22: SOCKET DEMUX_EXTERN nice_accept(SOCKET listener);
    23: SOCKET DEMUX_EXTERN nice_connect(const char* addr, int port);
    24: int DEMUX_EXTERN set_tcp_nodelay(int s, int disable_nagle);
    25: 
    26: // ********************************************************
    27: /// make sure you instantion ONE (1) of these before using winsock
    28: // ********************************************************
    29: class DEMUX_EXTERN winsock_initer
    30: {
    31: public:
    32:   winsock_initer();
    33:   ~winsock_initer();
    34: };
    35: 
    36: // ********************************************************
    37: /// iocp_wakeup base class for users of iocp_demuxer
    38: /// becoming an overlapped call control block
    39: // ********************************************************
    40: class DEMUX_EXTERN iocp_wakeup {
    41: protected:            // folks need to use these in win 32 calls
    42:   OVERLAPPED  ol;
    43:   // store wakeup error here?
    44:   // I didn't want this to be felixy, useful though.
    45:   void clear_overlapped();  // zero the OVERLAPPED structure
    46: public:
    47:   // 2 possibilities for piggybacking data. who could ask for more?
    48:   // udat = per iocp association, olp = per overlapped function call.
    49:   // why don't I need this in the posix version?
    50:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    51:     LPOVERLAPPED olp, int err) = 0;
    52: 
    53:   // start overlapped async operation. returns true if it finished
    54:   // immediately. in this case there will be no iocp_op_finished wakeup.
    55:   // assumes all args ready for call.
    56:   virtual bool start_overlapped() = 0;
    57: 
    58:   // retrieves this pointer from OVERLAPPED pointer
    59:   static iocp_wakeup* from_overlapped(LPOVERLAPPED olp);
    60: };
    61: 
    62: // ********************************************************
    63: // ********************************************************
    64: class DEMUX_EXTERN iocp_demuxer : public demuxer {
    65:   HANDLE    iocp;     // the io completion queue
    66: 
    67:   void get_evts(bool poll);
    68: public:
    69:   iocp_demuxer();
    70:   virtual ~iocp_demuxer();
    71: 
    72:   // udat is the per IOCP object user cookie & the overlapped pointer
    73:   // is the per overlapped operation cookie (sort of), so in the case
    74:   // of acceptex, udat is set when the listener is associated with the
    75:   // iocp and is passed to the subsequent acceptex iocp wakeups.
    76:   // probably won't be used very often
    77:   // the OVERLAPPED retrieved from the iocp is assumed to be part of
    78:   // an iocp_wakeup - beware! returns 0 on success, -1 on failure.
    79:   int associate_with_iocp(HANDLE obj, ULONG_PTR udat);
    80: 
    81: };
    82: 
    83: }} // namespace demux, flx
    84: #endif
    85: 
    86: 
End cpp section to demux/demux_iocp_demuxer.hpp[1]
Start cpp section to demux/demux_iocp_demuxer.cpp[1 /1 ]
     1: #line 90 "./lpsrc/flx_iocp_demux.ipk"
     2: #include "demux_iocp_demuxer.hpp"
     3: #include "demux_quitter.hpp" // for clean threaded iocp takedown
     4: 
     5: #include <stdio.h>      // for printf debugging
     6: #include <stddef.h>     // offsetof
     7: #include <assert.h>
     8: // shoving the win_queue in here for now
     9: 
    10: namespace flx { namespace demux {
    11: 
    12: // this could really do with auto objs. steal the strat stuff?
    13: 
    14: // add windows error processing macros. It's a bore otherwise.
    15: 
    16: // WaitForSingleObject on an kill event in the thread for thread cancel
    17: // kill_event = CreateEvent(NULL, TRUE, FALSE, NULL); (what's that)
    18: // SetEvent(kill_event) to invoke (?): SetEvent sets the event to the
    19: // signalled state. Return value is success flag. GetLastError.
    20: 
    21: // do auto SOCKET wrapper, check closesocket return code.
    22: 
    23: // a completion port is a queue into which the os puts notifications of
    24: // completed overlapped io requests. once the operation completes, a
    25: // notification is sent to a worker thread that can process the result.
    26: // a socket may be associated with a completion port at any point after
    27: // creation.
    28: 
    29: 
    30: // I don't see how to nicely stop a thread, I may have to have my own protocol
    31: // to ask it to exit.
    32: 
    33: // PostQueuedCompletionStatus can be used by threads to wake up a worker
    34: // thread. Could be handy replacement for timeout. "useful for notifying
    35: // worker threads of external events"
    36: 
    37: // working through this: http://msdn.microsoft.com/msdnmag/issues/1000/Winsock/
    38: // example of worker thread here
    39: // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/msmq/msmq_using_reading_msg_98j7.asp
    40: // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/fileio/fs/i_o_completion_ports.asp
    41: // nono, use this onec
    42: // http://msdn.microsoft.com/library/default.asp?url=/library/en-us/msmq/msmq_using_reading_msg_98j7.asp
    43: // oh, wait they're the same
    44: // FormatMessge
    45: 
    46: winsock_initer::winsock_initer()
    47: {
    48:   WSADATA wsaData;
    49: 
    50:   // strange, rt_faio_01 static doesn't work under vs without this.
    51:   // hum.
    52:   // fprintf(stderr, "WINSOCK INIT!!!\n");
    53:   // apparently 2.2's the way to go
    54:   int res= WSAStartup(MAKEWORD(2, 2), &wsaData);
    55:   if(res!= 0)
    56:   {
    57:     //JS: WSAGetLastError CANNOT be called, since WSAStartup failed
    58:     fprintf(stderr,"couldn't find usable winsock dll: %i\n", res);
    59:     throw res;
    60:   }
    61: }
    62: 
    63: winsock_initer::~winsock_initer()
    64: {
    65:   if(WSACleanup() != 0)
    66:   {
    67:     fprintf(stderr,"WSACleanup failed %i\n", WSAGetLastError());
    68:   }
    69: }
    70: 
    71: // iocp_wakeup base class for users of iocp_demuxer
    72: //static
    73: iocp_wakeup*
    74: iocp_wakeup::from_overlapped(LPOVERLAPPED olp)
    75: {
    76:   // calculate the address of this from overlapped member
    77:   // suffer an obligatory offsestof warning from broken gccs.
    78:   return (iocp_wakeup*)((char*)olp-offsetof(iocp_wakeup, ol));
    79: }
    80: 
    81: void
    82: iocp_wakeup::clear_overlapped()
    83: {
    84:   ZeroMemory(&ol, sizeof(ol));  // much better than memset, right?
    85: }
    86: 
    87: 
    88: iocp_demuxer::iocp_demuxer()
    89:   : iocp(NULL)
    90: {
    91:   // Create the completion port
    92:   // not sure what first 3 args do, but by specifying INVALID_HANDLE_VALUE
    93:   // for the first I think I can ignore the rest (apart from the last, numthreads)
    94:   // I still have to create the threads, but only NumberOfConcurrentThreads
    95:   // will wake up from GetQueuedCompletionStatus at a time. This looks to be
    96:   // slightly elastic...
    97: // NT 3.51 doesn't let you pass null filehandle, you've got to have a dummy
    98: // socket. keep that in mind. see InitializeIOCP in IOCPServer.cpp example
    99: // taken from codeproject. GetSystemInfo to find out num CPUs
   100:   //fprintf(stderr,"CreateIoCompletionPort with ONE WORKER THREAD\n");
   101:   iocp = CreateIoCompletionPort(
   102:     INVALID_HANDLE_VALUE,
   103:     NULL,
   104:     (ULONG_PTR)0,
   105:     1       // 1 thread (zero means one for each CPU)
   106:   );
   107: 
   108:   if(NULL == iocp)
   109:   {
   110:     DWORD err = GetLastError();
   111:     fprintf(stderr,"failed to create completion port: %li\n", err);
   112:     throw -1;
   113:   }
   114: }
   115: 
   116: iocp_demuxer::~iocp_demuxer()
   117: {
   118:   //fprintf(stderr, "~iocp (%p) NOW WITH ASYNC QUITTER!\n", iocp);
   119:   try
   120:   {
   121:     demux_quitter q;
   122:     q.quit(this);
   123:   }
   124:   catch(...)
   125:   {
   126:     fprintf(stderr, "~iocp_demuxer async quit threw exception!\n");
   127:     // now what do we do?
   128:   }
   129: 
   130: 
   131:   if(NULL != iocp && !CloseHandle(iocp))
   132:   {
   133:     DWORD err = GetLastError();
   134:     fprintf(stderr,"failed cleanup iocp: %li\n", err);
   135:   }
   136: }
   137: 
   138: int
   139: iocp_demuxer::associate_with_iocp(HANDLE obj, ULONG_PTR udat)
   140: {
   141:   // fprintf(stderr, "associating with iocp=%p: %p, udat: %lx\n",
   142:   //  iocp, obj, udat);
   143: 
   144:   // Any overlapped operations performed on the object will use the
   145:   // completion port for notification. The 3rd param can be used to pass
   146:   // per object context information. we'll just pass that back.
   147:   if(CreateIoCompletionPort(obj, iocp, udat, 0) == NULL) {
   148:     // adding the same obj twice without an intervening get completion
   149:     // status wakup gets an error 87, ERROR_INVALID_PARAMETER
   150:     fprintf(stderr,"CreateIoCompletionPort failed to register object: %li\n",
   151:       GetLastError());
   152:     return -1;
   153:   }
   154: 
   155:   return 0;
   156: }
   157: 
   158: void
   159: iocp_demuxer::get_evts(bool poll) {
   160:   // with multiple threads, this will actually wake up the last to
   161:   // block (lifo)
   162: 
   163:   // get context, call worker_thread
   164:   // need to be able to tell which thing completed, can have extra data
   165:   // following some kind of struct
   166:   // get this pointer
   167: 
   168:   // I guess to avoid swapping of thread context. By calling this on a given
   169:   // completion port this thread is associated with it until exit or respec
   170:   DWORD     nbytes;   // number of bytes in io transaction
   171:   ULONG_PTR     udat;   // user data - not using this atm
   172:   LPOVERLAPPED  olp;    // we get iocp_wakeup from this.
   173: 
   174: // If a socket handle associated with a completion port is closed,
   175: // GetQueuedCompletionStatus returns ERROR_SUCCESS, with *lpOverlapped
   176: // non-NULL and lpNumberOfBytes equal zero.
   177: 
   178:   int err = NO_ERROR;
   179: 
   180:   // No timeout. What does false mean? Eh. Could need a timeout to bring
   181:   // the thread down.
   182:   if(!GetQueuedCompletionStatus(iocp, &nbytes, &udat, &olp,
   183:     (poll) ? 0: INFINITE))
   184:   {
   185:     // That's strange - I sometimes get my ConnectEx errors popping
   186:     // out here (ERROR_SEM_TIMEOUT=121, ERROR_CONNECTION_REFUSED=1225)
   187:     // it looks like my args (overlapped, etc) are still filled out, so
   188:     // I can still awake the sleeper
   189:     err = GetLastError();   // doco says this & not WSALastError.
   190: 
   191:     // let's see: yep - there's my overlapped
   192:     // fprintf(stderr,"!iocp::wait: nbytes=%li, udat=%lx, io=%p, err=%i\n",
   193:     //  nbytes, udat, olp, err);
   194: 
   195:     if(WAIT_TIMEOUT == err)
   196:     {
   197:       // we get this a lot now that we can poll the iocp, so no output
   198:       // interestingly, nbytes = 1. what could that mean?
   199:       return;         // no wakeup
   200:     }
   201:     else if(ERROR_OPERATION_ABORTED == err)
   202:     {
   203:       // that's real bad manners. Or I could just ignore it. Anyway,
   204:       // any overlapped received is stale.
   205:       fprintf(stderr, "WHOA!!! - disassociate before killing handle\n");
   206:       return;         // no wakeup
   207:     }
   208:     else
   209:     {
   210:       fprintf(stderr,"GetQueuedCompletionStatus returned false: %i\n",
   211:         err);
   212:       // return here? relying on olp being NULL, to stop us dereffing
   213:     }
   214: 
   215:     // I'm going to assume that there's a good wakeup, and fall through
   216:     // We need to wakeup on some errors (like ERROR_CONNECTION_REFUSED)
   217:     // FALL THROUGH
   218:   }
   219: 
   220: // An IOCP is a very general event mechanism. It tells you not only about
   221: // the completion of reads & writes, but also of pretty much any asynchronous
   222: // event's completion. It doesn't quite fit in with my select style interfaces.
   223: // I've got general overlapped things completing here. I don't want them to
   224: // know about demuxers & so forth so I'll have to know about them.
   225: 
   226:   //fprintf(stderr,"HOLEY! Woke up!\n");
   227:   //fprintf(stderr,"nbytes=%li, udat=%lx, olp=%p, err=%i\n",
   228:   //  nbytes, udat, olp, err);
   229: 
   230:   // with polling it's normal not to get an overlapped pointer, because
   231:   // we may simply have timed out
   232:   assert( olp );
   233: 
   234:   // tell someone that some overlapped op finished
   235:   iocp_wakeup*  wakeup = iocp_wakeup::from_overlapped(olp);
   236: 
   237:   // passing olp may be redundant, seeing as it's contained in iocp_wakeup
   238:   wakeup->iocp_op_finished(nbytes, udat, olp, err);
   239: }
   240: 
   241: 
   242: 
   243: // simple utility fn, shouldn't be here. creates listener on any interface.
   244: // this could benifit from a SOCKET class. in failure returns INVALID_SOCKET
   245: // CURRENTLY EATS ERROR, SO DON'T BOTHER CHECKING
   246: SOCKET
   247: create_listener_socket(int* io_port, int backlog)
   248: {
   249:   fprintf(stderr,"creating_listener_socket\n");
   250:   SOCKET        listener;
   251: 
   252:   // could use WSASocket, but these seem to be turning out overlapped anyway
   253:   // at least after tangling with overlapped functions.
   254:   // socket returns INVALID_SOCKET on failure.
   255:   listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
   256: 
   257:   if (INVALID_SOCKET == listener)
   258:   {
   259:     fprintf(stderr,"listener create failed: %i\n", WSAGetLastError());
   260:     return INVALID_SOCKET;
   261:   }
   262: 
   263:   SOCKADDR_IN   addr;
   264: 
   265:   // msdn code examples don't zero the sockaddr. That makes me nervous.
   266:   ZeroMemory(&addr, sizeof(addr));
   267:   addr.sin_family = AF_INET;
   268:   addr.sin_addr.s_addr = htonl(INADDR_ANY);
   269:   addr.sin_port = htons(*io_port);
   270: 
   271:   // bind our name to the socket
   272:   int         res;
   273:   res = bind(listener, (LPSOCKADDR)&addr, sizeof(addr));
   274: 
   275:   if (SOCKET_ERROR == res)
   276:   {
   277:     fprintf(stderr,"bind() failed %i\n", WSAGetLastError());
   278:     if(closesocket(listener) == SOCKET_ERROR)
   279:     {
   280:       fprintf(stderr,"closesocket failed on listener: %i\n",
   281:         WSAGetLastError());
   282:     }
   283:     return INVALID_SOCKET;
   284:   }
   285: 
   286:   // if user wanted port chosen tell them what it turned out to be
   287:   if(0 == *io_port)
   288:   {
   289:     int namelen = sizeof(addr);
   290:     if (getsockname(listener, (struct sockaddr *)&addr, &namelen)
   291:       == SOCKET_ERROR)
   292:     {
   293:       fprintf(stderr, "getsockname failed (%i)\n", WSAGetLastError());
   294: 
   295:       if(closesocket(listener) == SOCKET_ERROR)
   296:       {
   297:         fprintf(stderr,"closesocket failed on listener: %i\n",
   298:           WSAGetLastError());
   299:       }
   300:       return INVALID_SOCKET;
   301:     }
   302: 
   303:     *io_port = ntohs(addr.sin_port);
   304:   }
   305: 
   306:   // Set the socket to listen
   307:   res = listen(listener, backlog);
   308:   if (SOCKET_ERROR == res)
   309:   {
   310:     fprintf(stderr,"listen() failed %i\n", WSAGetLastError());
   311: 
   312:     if(closesocket(listener) == SOCKET_ERROR)
   313:     {
   314:       fprintf(stderr,"closesocket failed on listener: %i\n",
   315:         WSAGetLastError());
   316:     }
   317: 
   318:     return INVALID_SOCKET;
   319:   }
   320: 
   321:   return listener;
   322: }
   323: 
   324: // currently the following aren't used. Look forward to warnings
   325: // about them.
   326: 
   327: // the posix version of this made the socket nonblocking.
   328: // I don't seem to have to do that when using iocp. if you
   329: // want to create a nonblocking socket (or overlapped) pass
   330: // WSA_FLAG_OVERLAPPED to WSASocket. I've never had to
   331: // actually do this. How do you make accept do this? (supposing
   332: // you wanted to) WSAAccept doesn't have a flag for it (however
   333: // it does let you do conditional accepting).
   334: // There doesn't seem to be a sockopt
   335: // returns INVALID_SOCKET on failure. eats the err.
   336: SOCKET
   337: nice_accept(SOCKET listener)
   338: {
   339:   struct sockaddr_in  remoteaddr;
   340:   int         addrlen = sizeof(remoteaddr);
   341:   SOCKET        s;
   342: 
   343:   // accept returns INVALID_SOCKET when it fails
   344:   s = accept(listener, (struct sockaddr*)&remoteaddr, &addrlen);
   345: 
   346:   if(INVALID_SOCKET == s)
   347:   {
   348:     fprintf(stderr,"nice_accept failed (%i)\n", WSAGetLastError());
   349:   }
   350: 
   351:   // the posix version makes the socket nonblocking here
   352:   // we're not bothering
   353: 
   354:   return s;
   355: }
   356: 
   357: // returns SOCKET_ERROR on failure, with err in WSAGetLastError()
   358: static int
   359: connect_sock(SOCKET s, const char* addr, int port)
   360: {
   361:   struct sockaddr_in  sock_addr;
   362: 
   363:   memset(&sock_addr, 0, sizeof(sock_addr));
   364:   sock_addr.sin_family = AF_INET;
   365:   sock_addr.sin_addr.s_addr = inet_addr(addr);
   366:   sock_addr.sin_port = htons(port);
   367: 
   368:   return connect(s, (struct sockaddr *)&sock_addr, sizeof(sock_addr));
   369: }
   370: 
   371: // returns INVALID_SOCKET on failure, eats last error with WSAGetLastError
   372: // unlike the posix version, this does not make the socket nonblocking.
   373: SOCKET
   374: nice_connect(const char* addr, int port)
   375: {
   376:   SOCKET      s;
   377: 
   378:   if((s = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET
   379:     && connect_sock(s, addr, port) != SOCKET_ERROR)
   380:   {
   381:     return s;   /* success! */
   382:   }
   383: 
   384:   /* something happened (not as good as catch 22) */
   385:   fprintf(stderr,"nice_connect failed (%i)\n", WSAGetLastError());
   386: 
   387:   if(INVALID_SOCKET != s && closesocket(s) == SOCKET_ERROR)
   388:     fprintf(stderr,"nice close failed (%i)\n", WSAGetLastError());
   389: 
   390:   return INVALID_SOCKET;
   391: }
   392: 
   393: // returns -1 on error with errno in WSAGetLastError. 0 otherwise.
   394: // kind of crap.
   395: int
   396: set_tcp_nodelay(int s, int disable)
   397: {
   398:   BOOL  disable_nagle = (disable) ? true : false;
   399: 
   400:   int res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY,
   401:       (const char*)&disable_nagle, sizeof(disable_nagle));
   402: 
   403:   return (res == SOCKET_ERROR) ? -1 : 0;
   404: }
   405: 
   406: }}
   407: 
   408: 
End cpp section to demux/demux_iocp_demuxer.cpp[1]
Start cpp section to demux/demux_overlapped.cpp[1 /1 ]
     1: #line 499 "./lpsrc/flx_iocp_demux.ipk"
     2: #include "demux_overlapped.hpp"
     3: #include <stdio.h>      // fprintf
     4: #include <assert.h>
     5: 
     6: // cygwin's copy of mswsock.h leaves something to be desired...
     7: #ifndef WSAID_CONNECTEX
     8: typedef
     9: BOOL
    10: (PASCAL FAR * LPFN_CONNECTEX) (
    11:     IN SOCKET s,
    12:     IN const struct sockaddr FAR *name,
    13:     IN int namelen,
    14:     IN PVOID lpSendBuffer OPTIONAL,
    15:     IN DWORD dwSendDataLength,
    16:     OUT LPDWORD lpdwBytesSent,
    17:     IN LPOVERLAPPED lpOverlapped
    18:     );
    19: 
    20: #define WSAID_CONNECTEX \
    21:     {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}}
    22: #endif
    23: 
    24: namespace flx { namespace demux {
    25: 
    26: // windows includes files here? vs will be fussy.
    27: 
    28: // AcceptEx
    29: 
    30: // return async finished flag (error flags - can be transmitted via class)
    31: // AcceptEx is the way to get accept connections via the IOCP
    32: bool
    33: acceptex_control_block::start_overlapped()
    34: {
    35:   clear_overlapped();
    36: 
    37: // I've seen two examples get the pointer to AcceptEx, just in case it
    38: // isn't implemented...
    39:   // fprintf(stderr,"AcceptExing: listen backlog => can succeed immediately\n");
    40: 
    41:   // this is only set when acceptex receives data and returns immediately.
    42:   // can't hurt to set it.
    43:   DWORD nbytes = 0;
    44:   BOOL  success;
    45: 
    46:   // note that in order to get the wakeup packet, the listener must
    47:   // already be associated with the iocp. for future async io, the acceptor
    48:   // must be associated too.
    49:   success = AcceptEx(listener, acceptor,
    50:     accept_buf,       // required - near/far address
    51:     0,            // receive data size - don't yet want this
    52:     ACCEPTEX_ADDR_SIZE,   // must be nonzero
    53:     ACCEPTEX_ADDR_SIZE,   // must be nonzero
    54:     &nbytes,        // only set if fn completes. should be 0
    55:     &ol);         // oblig. gets us back to the this ptr
    56: 
    57:   // if there is a backlog of connections, AcceptEx can return immediately
    58:   if(success)
    59:   {
    60:     // must clear the wait
    61:     fprintf(stderr,"WHOA! AcceptEx RETURNED SUCCESS IMMEDIATELY!\n");
    62:     fprintf(stderr, "This could be bad, as wait should call op_finish\n");
    63:     fprintf(stderr, "We also lose the udat cookie (set to NULL)\n");
    64:     // handle the successful wakeup
    65:     // complete_async_op(demux, drv, nbytes, NO_ERROR);
    66:     // I hope they don't want the udat pointer, because I
    67:     // just made it up (0=NULL). Not using it anyway.
    68:     iocp_op_finished(nbytes, 0, &ol, NO_ERROR);
    69:     return false;   // means no completion packet coming (correct?)
    70:   }
    71:   else
    72:   {
    73:     int err = WSAGetLastError();
    74:     // can also return WSACONNRESET, which isn't so bad
    75:     if(ERROR_IO_PENDING == err)
    76:     {
    77:       // fprintf(stderr,"AcceptEx returned ERROR_IO_PENDING - that's normal\n");
    78:       // This is the normal situation, fall through, leaving thread
    79:       // to sleep on wakeup call.
    80:     }
    81:     else
    82:     {
    83:       fprintf(stderr,"AcceptEx failed: %i\n", err);
    84:       fprintf(stderr,"returning true should wake thread to detect failure.\n");
    85:       return true;    // have self woken
    86:     }
    87:   }
    88:   return false;       // async not finished
    89: 
    90: }
    91: 
    92: // ConnectEx
    93: #if 0
    94: // apparently we're supposed to do this now to make the acceptee inherit
    95: // the listener's state. it is currently in the default state
    96: //err = setsockopt( sAcceptSocket,
    97: //  SOL_SOCKET,
    98: //  SO_UPDATE_ACCEPT_CONTEXT,
    99: //  (char *)&sListenSocket,
   100: //  sizeof(sListenSocket) );
   101: #endif
   102: 
   103: // what a pain in the arse (zzz)
   104: // This doesn't exist in win2000, so it'll need to be synchronous there.
   105: static int
   106: GetConnectExAddr(SOCKET s, LPFN_CONNECTEX* conn_fn)
   107: {
   108:   *conn_fn = NULL;
   109:   GUID      GuidConnectEx = WSAID_CONNECTEX;
   110:   DWORD     dwBytes;
   111:   int       err;
   112: 
   113:   err = WSAIoctl(s,   // why do I need this?
   114:     SIO_GET_EXTENSION_FUNCTION_POINTER,
   115:     &GuidConnectEx,
   116:     sizeof(GuidConnectEx),
   117:     conn_fn,
   118:     sizeof(*conn_fn),
   119:     &dwBytes,
   120:     NULL, NULL);    // no overlapped, no completion fun ptr
   121: //  fprintf(stderr,"Get addr dwbytes: %li\n", dwBytes);
   122:   return err;
   123: }
   124: 
   125: // this is the weirdest. To use ConnectEx, the socket must be already bound.
   126: // By trial and error, I found that it had to be bound to INADDR_ANY: 0.
   127: // So strange. Apparently I don't have to do it again if I want to reuse.
   128: static int
   129: bind_socket(SOCKET s)
   130: {
   131:   SOCKADDR_IN   addr;
   132: 
   133:   ZeroMemory(&addr, sizeof(addr));
   134:   addr.sin_family = AF_INET;
   135:   addr.sin_addr.s_addr = htonl(INADDR_ANY);
   136:   addr.sin_port = htons(0);
   137: 
   138:   return bind(s, (LPSOCKADDR)&addr, sizeof(addr));
   139: }
   140: 
   141: bool
   142: connectex_control_block::start_overlapped()
   143: {
   144:   clear_overlapped();
   145: 
   146:   // why not get this directly from the ConnectEx?
   147:   socket_err = ERROR_IO_PENDING;
   148: 
   149: 
   150:   DWORD bytes_sent = 0;   // we're not sending data on connect (yet)
   151:   BOOL  success;
   152: 
   153:   LPFN_CONNECTEX  pfConnectEx;
   154: 
   155:   // unfortunate, will fix up later.
   156:   // fprintf(stderr,"Getting ConnectEx address\n");
   157: 
   158:   // Turns out that ConnectEx isn't defined anywhere; I have to load its
   159:   // addr via WSAIoctl
   160:   // this is a bad way. make the driver cache it. why on earth is this
   161:   // call per-socket? does it really need to be that way?
   162:   if(GetConnectExAddr(s, &pfConnectEx) == SOCKET_ERROR)
   163:   {
   164:     fprintf(stderr,"GetConnectExAddr failed: %i\n", WSAGetLastError());
   165:     return true;
   166:   }
   167: 
   168:   // fprintf(stderr,"about to connectex to %s:%i, %i\n", addy, p, s);
   169: 
   170:   // this is so strange - I have to bind the socket to the localhost.
   171:   // if I don't, ConnectEx returns EINVAL. in any case, I won't need
   172:   // to do this again if I reuse this socket.
   173:   if(bind_socket(s) == SOCKET_ERROR)
   174:     fprintf(stderr,"ConnectEx bind failed: %i\n", WSAGetLastError());
   175: 
   176:   // I hope ConnectEx doesn't want this to hang around, because it's
   177:   // going to drop off the stack after this.
   178:   SOCKADDR_IN   addr;
   179: 
   180:   // some examples don't zero the addr. That makes me nervous.
   181:   ZeroMemory(&addr, sizeof(addr));
   182:   addr.sin_family = AF_INET;
   183:   addr.sin_addr.s_addr = inet_addr(addy);
   184:   addr.sin_port = htons(p);
   185: 
   186:   // in order to receive the wakeup packet, s must already be associated
   187:   // with the iocp. this is best done at socket creation time. for these
   188:   // sockets it's probably best to also bind them at the same time.
   189:   // that requires "purposed" sockets (CreateConnectSocket?).
   190:   // p.s. the default (waio_base) wakeup is doing fine for now.
   191: 
   192:   success = (*pfConnectEx)(s, // socket
   193:     (LPSOCKADDR)&addr,    // connect address
   194:     sizeof(addr),     // size thereof
   195:     NULL,         // not sending any data yet, but we could
   196:     0,            // ditto
   197:     NULL,         // should be zero until this changes
   198:     &ol);         // oblig. gets us back to the this ptr
   199: 
   200: // there's a caveat about the type of socket s becomes after ConnectEx.
   201: // It's in some kind of default state and cannot be used with shutdown
   202: // change it with setsockopt (?)
   203:   if(success)
   204:   {
   205:     fprintf(stderr,"WHOA! ConnectEx RETURNED SUCCESS IMMEDIATELY!\n");
   206:     fprintf(stderr, "BAD: calls op_finish and loses udat cookie\n");
   207:     // handle the successful wakeup. (udat=0, olp=&ol)
   208:     iocp_op_finished(bytes_sent, 0, &ol, NO_ERROR);
   209:     return false;   // means no completion packet coming (correct?)
   210:   }
   211:   else
   212:   {
   213:     int err = WSAGetLastError();
   214: 
   215:     if(ERROR_IO_PENDING == err)
   216:     {
   217:       // fprintf(stderr,"ConnectEx pending...\n");
   218:       // This is the normal situation, fall through, leaving thread
   219:       // to sleep on wakeup call.
   220:     }
   221:     else
   222:     {
   223:       // maybe store the error here. that could work for all
   224:       // windows wakeups
   225:       fprintf(stderr,"ConnectEx failed: %i\n", err);
   226:       return true;    // have self woken
   227:     }
   228:   }
   229:   return false;       // not finished
   230: }
   231: 
   232: // TransmitFile
   233: 
   234: bool
   235: transmitfile_control_block::start_overlapped()
   236: {
   237:   clear_overlapped();
   238: 
   239:   // 0 bytes => transmit entire file
   240:   // the second zero means use the default chunk size
   241:   // the NULL is for mem buffers to bookend the file with. nothing yet.
   242:   // the final zero is for various flags, including a way of doing
   243:   // DisconnectEx style socket reuse (more widely compatible?)
   244: 
   245:   // in order to receive the wakeup, s must already be associated with the
   246:   // iocp. this is best done at socket creation time.
   247:   if(TransmitFile(s, file, 0, 0, &ol, NULL, flags))
   248:   {
   249:     fprintf(stderr,"Transmit file succeeded immediately! waking...\n");
   250:     return true;
   251:   }
   252:   else
   253:   {
   254:     DWORD err = WSAGetLastError();
   255: 
   256:     // will need to actually signal something
   257:     // fprintf(stderr,"signal TransmitFile failure!\n");
   258:     if(ERROR_IO_PENDING != err && WSA_IO_PENDING != err)
   259:       fprintf(stderr,"genuine error from TransmitFile: %li\n", err);
   260:   }
   261:   return false;
   262: }
   263: 
   264: 
   265: // SOCKET io using WSASend and WSARecv
   266: 
   267: // windows style control blocks
   268: wsasocketio_control_block::wsasocketio_control_block(SOCKET src, sel_param* pb,
   269:   bool inread)
   270:   : s(src), ppb(pb), reading(inread)
   271: {
   272: }
   273: 
   274: bool
   275: wsasocketio_control_block::start_overlapped()
   276: {
   277:   clear_overlapped();
   278: 
   279:   error = 0;
   280: 
   281:   // num bytes received IF recv completes immediately.
   282:   DWORD imm_bytes;
   283:   int   recv_res;
   284: 
   285:   // set up the single wbuf, bearing in mind we may be part way.
   286:   wbufs[0].len = ppb->buffer_size - ppb->bytes_written;
   287:   wbufs[0].buf = ppb->buffer + ppb->bytes_written;
   288: 
   289: // fprintf(stderr, "sockio: %p->finished = %i, reading = %i\n",
   290: //  ppb, ppb->finished(), reading);
   291: 
   292:   // Ideally, we would like to be able to use MSG_WAITALL, which would
   293:   // let us only get a completion packet when either all the data was
   294:   // available or the connection had been closed or shutdown.
   295:   // Unfortunately this is not possible for non-blocking sockets, so
   296:   // we have to take whatever we get and then call WSARecv again.
   297: 
   298:   //#define MSG_WAITALL 0   // not defined in cygwin - apparently this
   299:   //DWORD flags = MSG_WAITALL;
   300: 
   301:   // ah, unfortunately MSG_WAITALL is not supported for non blocking sockets
   302:   // we'll just have to do it ourselves
   303:   DWORD flags = MSG_PARTIAL;
   304: 
   305:   // completion routines! (unused)
   306:   if(reading)
   307:     recv_res = WSARecv(s, wbufs, NUM_WBUFS, &imm_bytes, &flags, &ol, NULL);
   308:   else
   309:     recv_res = WSASend(s, wbufs, NUM_WBUFS, &imm_bytes, flags, &ol, NULL);
   310: 
   311:   // don't know if I need to check non winsock errs
   312: 
   313:   switch(recv_res)
   314:   {
   315:     case 0:
   316:     {
   317:       // flags are updated to indicate what? if there was a callback, it
   318:       // would be scheduled to be called when this thread is in the
   319:       // waitable state, whatever that means.
   320:       //fprintf(stderr,
   321:       //  "WSA%s completed immediately!!! nbytes: %li, flags: %lx\n",
   322:       //    (reading) ? "Recv" : "Send", imm_bytes, flags);
   323: 
   324:       // looks like we get the completion packet even if we do finish
   325:       // immediately so let the iocp wake us. note that this method
   326:       // of manually calling iocp_op_finished is not so great as we
   327:       // don't know the (as yet unused) udat cookie and so set it to 0.
   328:       // fprintf(stderr, "calling finished manually (ppb=%p)\n", ppb);
   329:       // iocp_op_finished(imm_bytes, 0, &ol, NO_ERROR);
   330: 
   331:       // false because iocp_op_finished will wake us. I guess false from
   332:       // these guys means that a completion packet is in the mail and
   333:       // true means that it isn't (in the mail).
   334:       return false;
   335:     }
   336:     break;
   337:     case SOCKET_ERROR:
   338:     {
   339:       DWORD err = WSAGetLastError();
   340: 
   341:       // normal mode - wait for completion
   342:       // fyi, xp pro seems to mostly give us ERROR_IO_PENDING
   343:       if(ERROR_IO_PENDING == err || WSA_IO_PENDING == err)
   344:       {
   345:         // fprintf(stderr,"WSA%s pending completion (%li)\n",
   346:         //  (reading) ? "Recv" : "Send", err);
   347:         return false;
   348:       }
   349: 
   350:       fprintf(stderr,"WSARecv/Send returned SOCKET_ERR: %li\n", err);
   351:       return true;    // assume it's bad and we won't get a wakeup
   352:     }
   353:     break;
   354:     default:
   355:     {
   356:       fprintf(stderr,"WSARecv/Send returned other error: %i, GetLastError: %li\n",
   357:         recv_res, GetLastError());
   358:       return true;        // wake up
   359:     }
   360:     break;
   361:   }
   362: 
   363:   return false;
   364: }
   365: 
   366: // NB: called by iocp::wait, so be aware of which thread is doing what, lest
   367: // you be surprised by this being called asynchronously.
   368: void
   369: wsasocketio_control_block::iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
   370:   LPOVERLAPPED olp, int err)
   371: {
   372:   error = err;        // copy back for others to look at.
   373:                 // perhaps move back to iocpwakeup
   374: // fprintf(stderr, "wsasocketio::finished: ppb=%p, nbytes=%li, err=%i, read=%i\n",
   375: //  ppb, nbytes, err, reading);
   376: 
   377:   if(err)
   378:   {
   379:     fprintf(stderr, "wsasocketio, got error: %i\n", err);
   380:   }
   381: 
   382:   // fprintf(stderr,"wsa_socketio wakeup, nb: %li, err: %i\n", nbytes, err );
   383: assert( !ppb->finished() );
   384:   // keep track of bytes received.
   385:   ppb->bytes_written += nbytes;
   386: 
   387:   if(0 == nbytes)
   388:   {
   389:     fprintf(stderr, "wsaiosocket got zero bytes: err=%i, read=%i\n",
   390:       err, reading);
   391:   }
   392: 
   393:   // if we're not finished, we have to reinstall our request
   394:   // zero bytes indicates shutdown/closure, right?
   395:   // might be using this for WSASend. Instead of broken pipes on win32,
   396:   // instead we get WSAECONNRESET (pretty sure) on write. On read?
   397: // not sure about this - I don't think we have to check nbytes == 0
   398:   if(0 == nbytes || ppb->finished())
   399:   {
   400:     return;
   401:   }
   402:   else
   403:   {
   404:     // go back around again
   405:     fprintf(stderr,"didn\'t get everything (%li of %li bytes)\n",
   406:       ppb->bytes_written, ppb->buffer_size);
   407:     if(start_overlapped())
   408:     {
   409:       fprintf(stderr, "UM, socket restart finished immediately\n");
   410:       fprintf(stderr, "causes new wakeup? or should I loop around?\n");
   411:     }
   412:   }
   413: }
   414: 
   415: 
   416: // file/pipe io using ReadFile and WriteFile
   417: 
   418: winfileio_control_block::winfileio_control_block(HANDLE f, void* buf, int len,
   419:   bool inread)
   420:   : reading(inread), file(f)
   421: {
   422:   // pb is not so useful here. we only want to
   423:   // know num bytes written/processed.
   424:   pb.buffer = (char*)buf;
   425:   pb.buffer_size = len;
   426:   pb.bytes_written = 0;
   427: }
   428: 
   429: // if file is opened with FILE_FLAG_OVERLAPPED, we can do "immutable file ptr"
   430: // ops & set the desired offset within the overlapped. can also stick an
   431: // event to signal in there.
   432: bool
   433: winfileio_control_block::start_overlapped()
   434: {
   435:   // fprintf(stderr,"winfileio_cb::start_overlapped: reading=%i\n", reading);
   436:   clear_overlapped();
   437: 
   438:   // DWORD  imm_bytes;
   439:   BOOL  success;
   440: 
   441:   // don't need bytes read, written when we have an OVERLAPPED
   442:   if(reading)
   443:     // success = ReadFile(file, pb.buffer, pb.buffer_size, &imm_bytes, &ol);
   444:     success = ReadFile(file, pb.buffer, pb.buffer_size, NULL, &ol);
   445:   else
   446:     //success = WriteFile(file, pb.buffer, pb.buffer_size, &imm_bytes, &ol);
   447:     success = WriteFile(file, pb.buffer, pb.buffer_size, NULL, &ol);
   448: 
   449:   // fprintf(stderr,"immbytes = %li\n", imm_bytes);
   450: 
   451:   if(!success)
   452:   {
   453:     int err = GetLastError();
   454: 
   455:     // I'm getting this for unfinished
   456:     if(ERROR_IO_PENDING == err)
   457:     {
   458:        return false;  // sleep on
   459:     }
   460:     else
   461:     {
   462:        fprintf(stderr,"%sFile failed! (%li)\n", (reading) ? "Read" : "Write",
   463:          err);
   464:        return true;      // ask for wakeup
   465:     }
   466: 
   467:     // fprintf(stderr,"%sFile failed! (%li)\n", (reading) ? "Read" : "Write",
   468:     //  GetLastError());
   469:     // fprintf(stderr,"do I still get completion packet???\n");
   470:     // assume not
   471:   }
   472: 
   473:   return false;       // sleep on
   474: }
   475: 
   476: }}
   477: 
End cpp section to demux/demux_overlapped.cpp[1]
Start cpp section to demux/demux_overlapped.hpp[1 /1 ]
     1: #line 977 "./lpsrc/flx_iocp_demux.ipk"
     2: #ifndef __FLX_DEMUX_OVERLAPPED_H__
     3: #define __FLX_DEMUX_OVERLAPPED_H__
     4: #include "flx_demux_config.hpp"
     5: 
     6: // visual studio is quite sensitve about how you do these includes.
     7: #include <WinSock2.h>              // Winsock2 (WSABufs, etc) must come before Windows.h
     8: #include "demux_iocp_demuxer.hpp"  // this header file include Windows.h
     9: #include <MSWSock.h>  // AcceptEx, TF_REUSE_SOCKET, etc
    10: 
    11: namespace flx { namespace demux {
    12: 
    13: // rename these to control block something or other
    14: // get rid of default constructors - faio can worry about that.
    15: 
    16: // WARNING: in some "immediate completion" cases I have to call
    17: // the finished function myself - in these cases I set the udat to 0, making
    18: // it not very reliable. Either make sure the user understands immediate
    19: // finish (and does it themselves) or keep a copy of udat.
    20: 
    21: // listener socket must be already associated with an IOCP
    22: // in doing an AcceptEx, it might succeed immediately - do you still
    23: // get the IOCP wakeup?
    24: class DEMUX_EXTERN acceptex_control_block : public iocp_wakeup {
    25:   enum { ACCEPTEX_ADDR_SIZE = sizeof(SOCKADDR_IN) + 16 };
    26: 
    27: public:
    28:   SOCKET  listener, acceptor;
    29:   // there are two of these!
    30:   char  accept_buf[2*ACCEPTEX_ADDR_SIZE];
    31: 
    32:   virtual bool start_overlapped();
    33: 
    34:   acceptex_control_block()
    35:     : listener(INVALID_SOCKET), acceptor(INVALID_SOCKET) {}
    36: };
    37: 
    38: class DEMUX_EXTERN connectex_control_block : public iocp_wakeup
    39: {
    40: public:
    41:   // move further back?
    42:   int socket_err;         // outgoing
    43: 
    44:   // can have buffer to be sent on connection
    45:   SOCKET    s;          // previously unbound socket
    46:   const char* addy;       // ipv4 address
    47:   int     p;          // port number
    48: 
    49:   // socket_err undefined
    50:   connectex_control_block() : s(INVALID_SOCKET), addy(0), p(0) {}
    51: 
    52:   // see posix version of this, try to keep them in sync. give socket_err
    53:   // initial definition that works with this?
    54:   bool finished() { return ERROR_IO_PENDING != socket_err; }
    55: 
    56:   virtual bool start_overlapped();
    57: };
    58: 
    59: // TransmitFile here (requires file handle)
    60: class DEMUX_EXTERN transmitfile_control_block : public iocp_wakeup {
    61:   SOCKET  s;
    62:   HANDLE  file;
    63:   DWORD flags;                // for possible socket reuse.
    64: public:
    65: 
    66:   transmitfile_control_block(SOCKET dst)      // for reuse of socket
    67:     : s(dst), file(NULL), flags(TF_DISCONNECT | TF_REUSE_SOCKET) {}
    68: 
    69:   transmitfile_control_block(SOCKET dst, HANDLE src)  // actual transmitfile
    70:     : s(dst), file(src), flags(0) {}
    71: 
    72:   virtual bool start_overlapped();
    73: };
    74: 
    75: 
    76: // handles both WSASend & WSARecv
    77: class DEMUX_EXTERN wsasocketio_control_block : public iocp_wakeup {
    78: protected:
    79:   enum { NUM_WBUFS = 1 }; // just one for now, but can do scattered send/recvs
    80:   WSABUF    wbufs[NUM_WBUFS];
    81: public:
    82:   SOCKET    s;
    83:   sel_param*  ppb;      // on input what you want, on output what you got
    84:   int     error;
    85:   bool    reading;  // else use WSASend
    86: 
    87:   // watch the memory interfaces here, going back and forth between threads.
    88:   wsasocketio_control_block(SOCKET src, sel_param* pb, bool read);
    89: 
    90:   virtual bool start_overlapped();
    91: 
    92:   virtual void iocp_op_finished( DWORD nbytes, ULONG_PTR udat,
    93:     LPOVERLAPPED olp, int err);
    94: };
    95: 
    96: // winfileio_control_block, much like wsasocketio_control_block
    97: class DEMUX_EXTERN winfileio_control_block : public iocp_wakeup {
    98:   bool    reading;
    99: public:
   100:   sel_param pb;
   101:   HANDLE    file; // I like to modify this from the outside
   102: 
   103:   // offset?
   104:   winfileio_control_block(HANDLE f, void* buf, int len, bool read);
   105: 
   106:   virtual bool start_overlapped();
   107: 
   108:   // NB: no iocp_op_finished callback. defined by users of the class.
   109: };
   110: 
   111: }}
   112: 
   113: #endif
   114: 
End cpp section to demux/demux_overlapped.hpp[1]
Start cpp section to demux/demux_wself_piper.hpp[1 /1 ]
     1: #line 1092 "./lpsrc/flx_iocp_demux.ipk"
     2: #ifndef __FLX_DEMUX_WSELF_PIPER_H__
     3: #define __FLX_DEMUX_WSELF_PIPER_H__
     4: 
     5: #include <flx_demux_config.hpp>
     6: #include "demux_overlapped.hpp"  // I use readfile control block
     7: #include "demux_wself_piper.hpp"
     8: 
     9: namespace flx { namespace demux {
    10: 
    11: class DEMUX_EXTERN auto_handle {
    12: public:
    13:   HANDLE h;
    14: 
    15:   auto_handle();
    16:   ~auto_handle();
    17: };
    18: 
    19: // win32 self pipe trick (what a pain!)
    20: class DEMUX_EXTERN wpipe_pair {
    21:   enum { READ_END, WRITE_END };
    22:   // HANDLE pipe[2];  // 0 = read end, 1 = write end
    23:   auto_handle pipe[2];
    24: public:
    25:   wpipe_pair();
    26: 
    27:   void write_byte();
    28:   HANDLE get_read_end() { return pipe[READ_END].h; }
    29: };
    30: 
    31: // use a winfileio_control_block to install a nonblocking ReadFile on the
    32: // read end of the pipe pair. when we get a byte we can execute whatever
    33: // the user wanted which for win32 which seems to be naturally responsive
    34: // to new sockets/handles. demux quit will probably be the only operation
    35: // needed.
    36: class DEMUX_EXTERN wself_piper_wakeup : public winfileio_control_block
    37: {
    38:   char the_byte;
    39: 
    40: public:
    41:   // possibly null, if not, called on iocp_op_finished
    42:   demux_callback* cb;
    43: 
    44:   // the demuxer. doesn't actually get passed by iocp_op_finished
    45:   iocp_demuxer* d;
    46: 
    47: 
    48:   wself_piper_wakeup();
    49: 
    50:   // detect when single byte read has finished and exec callback,
    51:   // re-arming.
    52:   virtual void iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
    53:     LPOVERLAPPED olp, int err);
    54: 
    55:   void arm();
    56: };
    57: 
    58: // at the very least, the read end must be nonblocking and associated
    59: // with the iocp.
    60: class DEMUX_EXTERN wself_piper {
    61:   wpipe_pair pp;
    62:   wself_piper_wakeup spw;
    63: public:
    64:   void install(demuxer* demux, demux_callback* cb = 0);
    65:   void wake(); // wakes demuxer which calls callback
    66: };
    67: 
    68: } } // demux, flx
    69: 
    70: #endif
    71: 
End cpp section to demux/demux_wself_piper.hpp[1]
Start cpp section to demux/demux_wself_piper.cpp[1 /1 ]
     1: #line 1164 "./lpsrc/flx_iocp_demux.ipk"
     2: 
     3: #include "demux_wself_piper.hpp"
     4: #include <stdio.h>
     5: 
     6: namespace flx { namespace demux {
     7: 
     8: auto_handle::auto_handle()
     9: {
    10:   h = INVALID_HANDLE_VALUE;
    11: }
    12: 
    13: auto_handle::~auto_handle()
    14: {
    15:   if(INVALID_HANDLE_VALUE == h) return; // done
    16: 
    17:   if(!CloseHandle(h))
    18:     fprintf(stderr, "auto CloseHandle failed: %i\n", GetLastError());
    19: }
    20: 
    21: wpipe_pair::wpipe_pair()
    22: {
    23:   // fprintf(stderr, "wpipe CTOR\n");
    24:   // made bufsize 1 as we only ever read and write 1 byte at a time
    25: 
    26:   // looks like I can't use anonymous pipes with iocp. will have to
    27:   // use a specially setup named pipe - one that allows repetitions...
    28:   // this doesn't sound good.
    29:   // This will be a problem if we have multiple event waiting threads.
    30:   // If I don't get the flags right it will also be a problem if we
    31:   // have multiple instances/apps running. I don't wan't this resource
    32:   // to be globally visible. Flags?
    33:   const char* pname = "\\\\.\\pipe\\flx_iocp_quitter";
    34: 
    35:   // don't actually need duplex, nor those buffers. 1 byte at a time suffices
    36:   // I probably don't need both ends to be marked as nonblocking either, should
    37:   // only need the read end nonblocking.
    38: 
    39:   // create pipe
    40:   pipe[READ_END].h = CreateNamedPipe(pname,
    41:    PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
    42:    PIPE_TYPE_BYTE, 1, 256, 256, 0, NULL);
    43: 
    44:   if(INVALID_HANDLE_VALUE == pipe[READ_END].h)
    45:   {
    46:     fprintf(stderr, "couldn't create named pipe: %i\n", GetLastError());
    47:     throw -1;
    48:   }
    49: 
    50:   // this is the part that I don't like - this pipe's name isn't unique
    51:   // and so theoretically another iocp quitter could join here. a race!
    52: 
    53:   // connect to it. note that overlapped isn't needed for write end as
    54:   // we want to block.
    55:   pipe[WRITE_END].h = CreateFile(pname, FILE_READ_DATA | FILE_WRITE_DATA,
    56:     FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING,
    57:     FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
    58: 
    59:   if(INVALID_HANDLE_VALUE == pipe[WRITE_END].h)
    60:   {
    61:     fprintf(stderr, "failed to open named pipe: %i\n", GetLastError());
    62:     throw -1;
    63:   }
    64: 
    65:   // anonymous pipes can't be made nonblocking/iocpable on windows!
    66:   // What a shame!
    67: /*
    68:   if(!CreatePipe(&pipe[READ_END], &pipe[WRITE_END], NULL, 1))
    69:   {
    70:     fprintf(stderr, "wpipe_pair CreatePipe failed: %i\n", GetLastError());
    71:     throw -1;
    72:   }
    73: */
    74: }
    75: 
    76: void
    77: wpipe_pair::write_byte()
    78: {
    79:   // I think I want a blocking write here.
    80:   char  b = 1;
    81:   DWORD bytes_written;
    82:   // last arg is overlapped pointer, unused, we want to block.
    83:   if(!WriteFile(pipe[WRITE_END].h, &b, 1, &bytes_written, NULL))
    84:     fprintf(stderr, "wpipe_pair failed to write byte: %i\n",
    85:       GetLastError());
    86: }
    87: 
    88: void
    89: wself_piper::install(demuxer* d, demux_callback* cb)
    90: {
    91:   fprintf(stderr, "wself_piper::install(%p, %p)\n", d, cb);
    92:   iocp_demuxer* demux = static_cast<iocp_demuxer*>(d);
    93: 
    94:   // make read end non blocking and associate with iocp
    95:   HANDLE read_end = pp.get_read_end();
    96: 
    97: #if 0
    98:   // make the anonymous pipe non blocking. this function is for named pipes,
    99:   // but I've heard talk that it works for anon pipes too. Nope, doesn't work.
   100:   DWORD pipe_mode = PIPE_NOWAIT;
   101:   if(!SetNamedPipeHandleState(read_end, &pipe_mode, NULL, NULL))
   102:   {
   103:     fprintf(stderr, "SetNamedPipeHandleState failed: %i\n", GetLastError());
   104:     return; // not much to be done here.
   105:   }
   106: #endif
   107: 
   108:   if(0 != demux->associate_with_iocp(read_end, NULL))
   109:   {
   110:     fprintf(stderr, "failed to install self pipe in IOCP!!!\n");
   111:     return; // error code?
   112:   }
   113: 
   114:   // copy into the self pipe wakeup, for its later use.
   115:   spw.d = demux;
   116:   spw.cb = cb;
   117:   spw.file = read_end;
   118: 
   119:   fprintf(stderr, "initial self pipe arm\n");
   120:   spw.arm();
   121: }
   122: 
   123: // wakes demuxer
   124: void
   125: wself_piper::wake()
   126: {
   127:   fprintf(stderr, "wself_piper::wake - write a byte\n");
   128:   pp.write_byte();
   129: }
   130: 
   131: wself_piper_wakeup::wself_piper_wakeup()
   132:   // configure the control block for ReadFile on the read end pipe
   133:   // will set pipe handle later, read = true
   134:   : winfileio_control_block(INVALID_HANDLE_VALUE, NULL, 0, true),
   135:     cb(0), d(0)
   136: {
   137:   // I'll probably need to reset the byte address
   138:   fprintf(stderr, "SET UP THE PIPE HANDLE!\n");
   139: }
   140: 
   141: // at this point the byte has already been read. we want to re-arm for future
   142: // wakeups.
   143: void
   144: wself_piper_wakeup::iocp_op_finished(DWORD nbytes, ULONG_PTR udat,
   145:     LPOVERLAPPED olp, int err)
   146: {
   147:   fprintf(stderr, "wself_piper_wakeup::iocp_op_finished\n");
   148:   fprintf(stderr, "nbytes=%i, err=%i\n", nbytes, err);
   149:   fprintf(stderr, "about to callback %p(%p)\n", cb, d);
   150: 
   151:   if(cb) cb->callback(d);
   152: 
   153:   arm();  // re-arm
   154: }
   155: 
   156: void
   157: wself_piper_wakeup::arm()
   158: {
   159:   // exec another nonblocking ReadFile on read end of pipe
   160:   fprintf(stderr, "wself_piper_wakeup::arm\n");
   161:   pb.buffer = &the_byte;
   162:   pb.buffer_size = 1;
   163:   pb.bytes_written = 0;
   164:   if(start_overlapped())
   165:     fprintf(stderr, "WARNING: wslef_pipe install completed immediately\n");
   166: }
   167: 
   168: } }
   169: 
   170: 
   171: 
   172: 
End cpp section to demux/demux_wself_piper.cpp[1]
Start cpp section to demux/demux_win_timer_queue.hpp[1 /1 ]
     1: #line 3 "./lpsrc/flx_wintimer_demux.ipk"
     2: #ifndef __FLX_DEMUX_WIN_TIMER_QUEUE_H__
     3: #define __FLX_DEMUX_WIN_TIMER_QUEUE_H__
     4: 
     5: #include "flx_demux_config.hpp"
     6: #include <Windows.h>
     7: 
     8: #include "demux_timer_queue.hpp"
     9: 
    10: namespace flx { namespace demux {
    11: 
    12: class DEMUX_EXTERN win_timer_queue : public timer_queue
    13: {
    14:   HANDLE    timer_queue;
    15: 
    16:   static VOID CALLBACK timer_callback(PVOID, BOOLEAN);
    17: public:
    18:   win_timer_queue();
    19:   ~win_timer_queue();
    20: 
    21:   virtual void add_sleep_request(sleep_task* st, double delta);
    22:   virtual void add_abs_sleep_request(sleep_task* st, double when);
    23: 
    24: };
    25: 
    26: }}
    27: 
    28: #endif // __SLEEP_TASK__
    29: 
End cpp section to demux/demux_win_timer_queue.hpp[1]
Start cpp section to demux/demux_win_timer_queue.cpp[1 /1 ]
     1: #line 33 "./lpsrc/flx_wintimer_demux.ipk"
     2: #include "flx_demux_config.hpp"
     3: #include <Windows.h>
     4: #include <assert.h>
     5: 
     6: // simply wrapped windows timer queue. requires windows 5.00, which is
     7: // quite high (xp?) because I couldn't get the waitable timers to work.
     8: // must be careful with this stuff lest it create millions of threads.
     9: #include "demux_win_timer_queue.hpp"
    10: 
    11: #include <stdio.h>
    12: 
    13: namespace flx { namespace demux {
    14: 
    15: timer_queue *mk_timer_queue() { return new win_timer_queue; }
    16: 
    17: #define MIL 1000000    // 1 metric million
    18: 
    19: typedef struct
    20: {
    21:   sleep_task*  st;        // so we can make it fire
    22:   HANDLE    timer;      // we need to delete the timer, so we keep it
    23:   HANDLE    timer_queue;  // AND its queue (no back ptrs, I guess)
    24: } timer_cookie;
    25: 
    26: win_timer_queue::win_timer_queue()
    27: {
    28:   // fprintf(stderr,"win_timer_queue ctor\n");
    29: 
    30:   timer_queue = CreateTimerQueue();
    31:   if(!timer_queue)
    32:   {
    33:     fprintf(stderr, "CreateTimerQueue failed: %i\n", GetLastError());
    34:     throw -1;
    35:   }
    36:   // fprintf(stderr, "created timer queue: %p\n", timer_queue);
    37: }
    38: 
    39: 
    40: win_timer_queue::~win_timer_queue()
    41: {
    42:   // INVALID_HANDLE_VALUE indicates that DeleteTimerQueueEx should wait for
    43:   // all callback functions to complete before returning. One would hope that
    44:   // calling this causes all the timers to go off before their time (what
    45:   // else would the "actually fired" callback flag be for?). The alternative
    46:   // of waiting for some ever distant timer to go off would be too stupid
    47:   // for words. As usual, the msdn glosses over the important details like
    48:   // this one. Anyway, it's easy to test out... No, that flag's always true
    49:   // for timers, and this wait option doesn't work - maybe with other types
    50:   // flags for CreateTimerQueueTimer?
    51:   if(!DeleteTimerQueueEx(timer_queue, INVALID_HANDLE_VALUE))
    52:   {
    53:     fprintf(stderr, "DeleteTimerQueueEx failed: %i\n", GetLastError());
    54:     // whatcha gonna do about it?
    55:   }
    56:   // fprintf(stderr, "finished - did it wait?\n");
    57: }
    58: 
    59: // note: may not need time to be in sleep_task. could pass time here.
    60: // thread safe
    61: void
    62: win_timer_queue::add_sleep_request(sleep_task* st, double delta)
    63: {
    64:   // fprintf(stderr,"add_sleep_request: %lf to %p\n", delta, timer_queue);
    65: 
    66:   timer_cookie*  tc = new timer_cookie;
    67: 
    68:   // copy in the sleep_task and the timer queue
    69:   tc->st = st;
    70:   tc->timer_queue = timer_queue;
    71: 
    72:   // the timer thread may not be the best solution as nothing is stopping
    73:   // anyone from performing long operations with this structure, however
    74:   // in all likelihood, it'll just be felix adding threads back to its queue.
    75:   if(!CreateTimerQueueTimer(
    76:     &tc->timer,          // resulting timer in timer_cookie
    77:     timer_queue,
    78:     //NULL,            // add to default timer queue
    79:     timer_callback,        // should get called in delta seconds
    80:     tc,             // timer cookie is user data
    81:     (DWORD)(delta*1000),    // millisecond timer
    82:     0,              // zero period => signal once
    83:     WT_EXECUTEINTIMERTHREAD))  // NB: for short tasks (will this do?)
    84:   {
    85:     fprintf(stderr, "CreateTimerQueueTimer failed: %i\n", GetLastError());
    86:     delete tc;          // at least try not to leak
    87:     return;
    88:   }
    89: }
    90: 
    91: // this is a c callback - all the c++ code should probably be wrapped
    92: // in a try/catch. timer_or_wait_fired is always true for timers.
    93: VOID CALLBACK
    94: win_timer_queue::timer_callback(PVOID udat, BOOLEAN timer_or_wait_fired)
    95: {
    96:   timer_cookie*  tc = (timer_cookie*)udat;
    97: 
    98:   // fprintf(stderr, "timer queue callback fired: %p, %i\n",
    99:   //  tc, timer_or_wait_fired);
   100: 
   101:   if(!tc)
   102:   {
   103:     // Nothing that we can do in this situation.
   104:     fprintf(stderr, "WHOA - NULL queue cookie! (fired: %i)\n",
   105:       timer_or_wait_fired);
   106:     return;            // outta here
   107:   }
   108: 
   109:   // NULL means delete the thing now, INVALID_HANDLE_VALUE means wait until
   110:   // callback finishes. We're in the callback, so we can't do that (=deadlock
   111:   // of the timer thread, which isn't good). We're all adults here, the timer
   112:   // has expired, we know what we're doing, so lets just delete it.
   113:   tc->st->fire();
   114: 
   115:   // on my box this returns ERROR_IO_PENDING, on others it doesn't
   116:   // msdn says this should be ok, but I'm not so sure.
   117:   if(!DeleteTimerQueueTimer(tc->timer_queue, tc->timer, NULL))
   118:   {
   119:     int  err = GetLastError();
   120: 
   121:     if( ERROR_IO_PENDING != err)
   122:     {
   123:       fprintf(stderr, "DeleteTimerQueueTimer of %p failed: %i\n",
   124:         tc->timer, err);
   125:     }
   126:     else
   127:     {
   128:       // I'm not so sure, see if it leaks.
   129:       fprintf(stderr, "DeleteTimerQueueTimer = ERROR_IO_PENDING\n");
   130:       fprintf(stderr, "Apparently this is ok...\n");
   131:     }
   132:   }
   133:   delete tc;
   134: 
   135:   // fprintf(stderr, "leaving timer callback\n");
   136: }
   137: 
   138: // in seconds from some ref pt (UTC for this fn)
   139: // N.B. declared in base class!
   140: void
   141: timer_queue::get_time(double& t)
   142: {
   143:   SYSTEMTIME  sysnow;
   144:   GetSystemTime(&sysnow);
   145:   // now convert to seconds
   146:   // via FILETIME?
   147: 
   148:   // kinda sucks, but is the msdn recommended way of doing calculations
   149:   // on dates.
   150:   FILETIME  fnow;
   151:   if(!SystemTimeToFileTime(&sysnow, &fnow))
   152:   {
   153:     fprintf(stderr, "SystemTimeToFileTime failed: %i\n", GetLastError());
   154:     t = 0;
   155:     return;
   156:   }
   157: 
   158:   ULARGE_INTEGER now;  // so we can do some maths
   159: 
   160:   assert(sizeof(now) == sizeof(fnow));
   161:   memcpy(&now, &fnow, sizeof(now));
   162: 
   163:   // and now we have a big integer containing an offset jan 1, 1601 (UTC)
   164:   // 100 nanosecond intervals
   165:   t = now.QuadPart*MIL*10;  // *10 to microseconds, *MIL to seconds
   166: }
   167: 
   168: void
   169: win_timer_queue::add_abs_sleep_request(sleep_task* st, double when)
   170: {
   171:   // win timer queue works with relative offsets, so convert this absolute
   172:   double  now;
   173:   get_time(now);
   174:   double  delta = when-now;
   175:   if(delta < 0.0) delta = 0.0;
   176:   add_sleep_request(st, delta);
   177: }
   178: 
   179: }}
   180: 
   181: 
End cpp section to demux/demux_win_timer_queue.cpp[1]
Start cpp section to demux/demux_posix_timer_queue.hpp[1 /1 ]
     1: #line 4 "./lpsrc/flx_posixtimer_demux.ipk"
     2: #ifndef __FLX_DEMUX_POSIX_TIMER_QUEUE_H__
     3: #define __FLX_DEMUX_POSIX_TIMER_QUEUE_H__
     4: 
     5: #include "pthread_thread.hpp"  // flx_thread_t
     6: #include "pthread_mutex.hpp"  // flx_mutex_t
     7: #include "pthread_condv.hpp"  // flx_condv_t
     8: #include "demux_timer_queue.hpp" // base class
     9: #include <sys/time.h>        // timespecs, gettimeofday
    10: 
    11: namespace flx { namespace demux {
    12: 
    13: // looks like a worker queue, but couldn't quite mash it into one
    14: class DEMUX_EXTERN posix_timer_queue : public timer_queue
    15: {
    16:     flx::pthread::flx_mutex_t lock; // factor to prio queue?
    17:     flx::pthread::flx_condv_t sleep_cond;
    18:     flx::pthread::flx_thread_t sleep_thread; // joinable, we join later
    19:     void*        opaque_prio_queue;        // less fat
    20: 
    21:     static void thread_start(void*);    // passed "this"
    22:     bool thread_loop_body();
    23: 
    24: 
    25:     void wakeup_thread();                // we can do this!
    26: 
    27:     void add_sleep_request(sleep_task* st, timespec* abs);
    28: public:
    29:     posix_timer_queue();
    30:     ~posix_timer_queue();
    31: 
    32:     // thread safe.
    33:     virtual void add_sleep_request(sleep_task* st, double delta);
    34: 
    35:     // in seconds, relative to same base as timer::get_time.
    36:     virtual void add_abs_sleep_request(sleep_task* st, double when);
    37: };
    38: 
    39: }}
    40: 
    41: #endif // __POSIX_TIMER_QUEUE__
    42: 
End cpp section to demux/demux_posix_timer_queue.hpp[1]
Start cpp section to demux/demux_posix_timer_queue.cpp[1 /1 ]
     1: #line 47 "./lpsrc/flx_posixtimer_demux.ipk"
     2: #include "demux_posix_timer_queue.hpp"
     3: 
     4: // a prio queue that executes tasks in a given order
     5: // factor out prio_queue? could be like queue.
     6: 
     7: // try to make work like the worker thread thing, fix it do so?.
     8: // remove time from sleep task...
     9: 
    10: #include <queue>    // stl seems to have a prio_queue
    11: #include <sys/time.h> // gettimeofday for calculating "now"
    12: #include <cstdio>
    13: 
    14: //using namespace flx::pthread;
    15: namespace flx { namespace demux {
    16: 
    17: timer_queue *mk_timer_queue() { return new posix_timer_queue; }
    18: 
    19: 
    20: #define MIL 1000000        // one million
    21: #define BIL (MIL*1000)    // one billion (metric)
    22: 
    23: using namespace std;
    24: 
    25: // it could happen!
    26: // factor
    27: class future_evt
    28: {
    29: public:
    30:     timespec    when;
    31:     sleep_task*    task;
    32: 
    33:     // ignore the direction, just trying to sort with smallest first
    34:     bool operator<(const future_evt& rhs) const
    35:     {
    36:         if(when.tv_sec != rhs.when.tv_sec)    // precedence to more significant
    37:             return when.tv_sec > rhs.when.tv_sec;
    38:         else                                // else check the less significant
    39:             return when.tv_nsec > rhs.when.tv_nsec;
    40:     }
    41: };
    42: 
    43: typedef priority_queue<future_evt> void_prio_queue;
    44: #define PRIOQ ((void_prio_queue*)opaque_prio_queue)
    45: 
    46: posix_timer_queue::posix_timer_queue()
    47: {
    48:     opaque_prio_queue = new void_prio_queue;    // a.k.a. PRIOQ
    49:     //fprintf(stderr,"initing timer sleep thread\n");
    50: 
    51:     // NEED'S TO CHECK RETURN VAL AND HANDLE ERROR
    52:     if(sleep_thread.init(thread_start, this))
    53:       fprintf(stderr, "failed to create posix timer queue thread!\n");
    54: }
    55: 
    56: posix_timer_queue::~posix_timer_queue()
    57: {
    58:     // the sleep_thread uses the prioq, so we must explicitly shut it
    59:     // down now, before we delete the prioq. left to its own devices,
    60:     // c++ destructs it at the end of this destructor.
    61: 
    62:     // take down the thread first because it uses all the other stuff.
    63:     // I actually don't need to do anything special to bring the thread
    64:     // down because all pthread_cond_*wait* are cancel aware. Or so they
    65:     // should be. As far as I can tell only the 64bit osx10.4.2 is, so
    66:     // for now the explicit cancel + wakeup followed by explicit
    67:     // cancel test stays.
    68: 
    69:     // fprintf(stderr, "asking timer thread to quit\n");
    70:     add_sleep_request(NULL, 0.0);    // super secret quit thread quit request
    71:     wakeup_thread();                // wakeup, cause to goto a cancel pt
    72: 
    73:     sleep_thread.join();            // will join
    74:     //fprintf(stderr,"about to delete PRIOQ\n");
    75:     delete PRIOQ;
    76: }
    77: 
    78: static void
    79: get_now(timespec* now)
    80: {
    81:     struct timeval tp;
    82: 
    83:     if(gettimeofday(&tp, NULL) == -1)
    84:         perror("gettimeofday");
    85: 
    86:     // (10^6-1)*1000 = 3B9AC618 = max usec -> nsec fits in a 32bit long.
    87:     now->tv_sec = tp.tv_sec;
    88:     now->tv_nsec = tp.tv_usec*1000;        // fits!
    89: 
    90:     // fprintf(stderr,"get_now = %li, %li\n", now->tv_sec, now->tv_nsec);
    91: }
    92: 
    93: // LIMIT!
    94: // seconds to microseconds - signed this gives a bit over half an hour
    95: #define SEC2TIMESPEC(ts, t) long    wait_musec = (long)(t*MIL);    \
    96:     timespec    ts = { wait_musec / MIL, (wait_musec % MIL)*1000 }
    97: 
    98: 
    99: // offset delta from "now" and store in "when"
   100: static void
   101: calc_when(timespec* when, double delta)
   102: {
   103: // how to use the posix abstime versions of timed waits? what kind of absolute
   104: // is abstime? pthread_get_expiration_np looks useful, but it too is np.
   105: // abstime is apparently in seconds since the Epoch, UTC.
   106: // To get now there's clock_gettime (not portable) or gettimeofday with
   107: // null timezone.
   108: 
   109:     timespec    now;
   110:     get_now(&now);
   111: 
   112:     // limit!
   113:     // seconds to microseconds - signed this gives a bit over half an hour
   114:     // long    wait_musec = (long)(delta*MIL);
   115:     // timespec    delay = { wait_musec / MIL, (wait_musec % MIL)*1000 };
   116:     SEC2TIMESPEC(delay, delta);
   117: 
   118:     // (10^6-1)*1000 = 3B9AC618 = max usec -> nsec fits in a 32bit long.
   119:     when->tv_sec = now.tv_sec + delay.tv_sec;
   120:     when->tv_nsec = now.tv_nsec + delay.tv_nsec;
   121: 
   122:     if(when->tv_nsec >= BIL)            // overflow of nanoseconds?
   123:     {
   124:         // fprintf(stderr,"OVERFLOW = %li, %li\n", when->tv_sec, when->tv_nsec);
   125:         // x, y < BIL, x + y < 2BIL
   126:         when->tv_sec++;
   127:         when->tv_nsec -= BIL;
   128:         // when->tv_sec += when->tv_nsec/BIL;
   129:         // when->tv_nsec %= BIL;
   130:     }
   131: 
   132:     // fprintf(stderr,"when = %li, %li\n", when->tv_sec, when->tv_nsec);
   133:     // tp contains tv_sec (seconds) & tv_usec (microseconds) both longs.
   134:     // however, if nonposix works everywhere...
   135: }
   136: 
   137: // absolute time
   138: void
   139: posix_timer_queue::add_sleep_request(sleep_task* st, timespec* abs)
   140: {
   141:     future_evt    evt;
   142:     evt.task = st;
   143:     evt.when = *abs;
   144: 
   145:     flx::pthread::flx_mutex_locker_t    locker(lock);
   146: 
   147:     PRIOQ->push(evt);
   148: 
   149:     // we may have inserted at sooner than any other evt, so wake up thread
   150:     // to figure it out (if need be). I seemed to be getting more wakeups
   151:     // with this. Turned off for now. Not sure how that works.
   152:     if(1 || PRIOQ->top().task == st)
   153:     {
   154:       // fprintf(stderr,"WE PUSHED IN - waking thread\n");
   155:       wakeup_thread();
   156:     }
   157: }
   158: 
   159: // note: may not need time to be in sleep_task. could pass time here.
   160: // thread safe
   161: void
   162: posix_timer_queue::add_sleep_request(sleep_task* st, double delta)
   163: {
   164:     // fprintf(stderr,"add_sleep_request: %lf\n", delta);
   165:     timespec    when;
   166:     calc_when(&when, delta);        // calculate when (t a delta)
   167: 
   168:     add_sleep_request(st, &when);
   169: }
   170: 
   171: void
   172: posix_timer_queue::add_abs_sleep_request(sleep_task* st, double when)
   173: {
   174:     // absolute version is closer to the posix implementation
   175:     SEC2TIMESPEC(abs_time, when);
   176:     add_sleep_request(st, &abs_time);
   177: }
   178: 
   179: // cause the timer wait thread to wake up. useful for asking it to
   180: // exit or re-evaluate a changed sleep queue.
   181: void
   182: posix_timer_queue::wakeup_thread()
   183: {
   184:     sleep_cond.signal();
   185: }
   186: 
   187: void
   188: posix_timer_queue::thread_start(void* udat)
   189: {
   190:     posix_timer_queue*    q = (posix_timer_queue*)udat;
   191:     //fprintf(stderr,"sleeper thread\n");
   192: 
   193:     while(q->thread_loop_body()) ;
   194: }
   195: 
   196: bool
   197: posix_timer_queue::thread_loop_body()
   198: {
   199:     // lock on. lock off when waiting on condition
   200:     flx::pthread::flx_mutex_locker_t    locker(lock);
   201: 
   202:     // int        res;
   203: 
   204:     // pthread_cond_wait & pthread_cond_timedwait (& np rel version?) are
   205:     // cancellation points. doco notes for timed & untimed waits that the
   206:     // predicate should be rechecked as there can be spurious wakeups.
   207:     // no worries, when we wakeup the lock has been acquired.
   208: 
   209:     while(!PRIOQ->empty())
   210:     {
   211:         future_evt    evt = PRIOQ->top();
   212: 
   213:         // quit request
   214:         if(!evt.task) return false;
   215: 
   216:         future_evt  now;        // "now' has no task, just a dummy.
   217:         get_now(&now.when);
   218: 
   219:         // if(evt < now)        // would prefer <=, eh.
   220:         // < is arse backwards because I don't know how to use the stl
   221:         if(now < evt)        // would prefer <=, eh.
   222:         {
   223:             // fprintf(stderr,"firing of (%li, %li) at (%li, %li)!\n",
   224:             //    evt.when.tv_sec, evt.when.tv_nsec,
   225:             //    now.when.tv_sec, now.when.tv_nsec);
   226:             evt.task->fire();
   227:             PRIOQ->pop();
   228:         }
   229:         else    // we have an event in future, so sleep for that long
   230:         {
   231:             // remember that condition waits are exit points...
   232:             // so I don't need to test - check that.
   233:             // fprintf(stderr,"sleeping from %li, %li until %li, %li\n",
   234:             //    now.when.tv_sec, now.when.tv_nsec,
   235:             //    evt.when.tv_sec, evt.when.tv_nsec);
   236:             (void)sleep_cond.timedwait(&lock, &evt.when);
   237: 
   238:             // if using posix abstime timed wait we make get EINVAL here for
   239:             // abstimes in the past. must handle this.
   240:             //JS: It's handled now, waiting for a time in the past is OK
   241: 
   242:             // fprintf(stderr,"pthread_cond_timedwait woke up! (%i)\n", res);
   243:         }
   244:     }
   245: 
   246:     // if we got here then the queue is empty, so sleep indefinitely
   247:     // that we don't really need the mainloop testcancel because the condition
   248:     // wait functions are cancellation points.
   249:     // fprintf(stderr,"no sleep task, sleeping indefinitely\n");
   250:     sleep_cond.wait(&lock);
   251:     // fprintf(stderr,"pthread_cond_wait woke up! (%i)\n", res);
   252: 
   253:     // lock released here
   254:     return true;                    // keep going
   255: }
   256: 
   257: 
   258: // in seconds from some ref pt
   259: // N.B. declared in base class!
   260: void
   261: timer_queue::get_time(double& t)
   262: {
   263:     timespec    now;
   264:     get_now(&now);        // just calls gettimeofday (msec) and converts
   265:                         // to timespec (sec, nsec). could skip that
   266:                         // and call directly, avoiding conversion
   267:     t = now.tv_sec + (now.tv_nsec*BIL);
   268: }
   269: 
   270: }}
   271: 
   272: 
End cpp section to demux/demux_posix_timer_queue.cpp[1]